diff --git a/av/av.go b/av/av.go index 8f30975..d59a344 100644 --- a/av/av.go +++ b/av/av.go @@ -191,7 +191,7 @@ const avCodecTypeMagic = 233333 // CodecData is some important bytes for initializing audio/video decoder, // can be converted to VideoCodecData or AudioCodecData using: // -// codecdata.(AudioCodecData) or codecdata.(VideoCodecData) +// codecdata.(AudioCodecData) or codecdata.(VideoCodecData) // // for H264, CodecData is AVCDecoderConfigure bytes, includes SPS/PPS. type CodecData interface { @@ -253,8 +253,9 @@ type Packet struct { Idx int8 // stream index in container format CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame Time time.Duration // packet decode time - Duration time.Duration //packet duration + Duration time.Duration // packet duration Data []byte // packet data + RealTimestamp int64 // packet real timestamp (ms) } // Raw audio frame. @@ -266,33 +267,33 @@ type AudioFrame struct { Data [][]byte // data array for planar format len(Data) > 1 } -func (self AudioFrame) Duration() time.Duration { - return time.Second * time.Duration(self.SampleCount) / time.Duration(self.SampleRate) +func (af AudioFrame) Duration() time.Duration { + return time.Second * time.Duration(af.SampleCount) / time.Duration(af.SampleRate) } // Check this audio frame has same format as other audio frame. -func (self AudioFrame) HasSameFormat(other AudioFrame) bool { - if self.SampleRate != other.SampleRate { +func (af AudioFrame) HasSameFormat(other AudioFrame) bool { + if af.SampleRate != other.SampleRate { return false } - if self.ChannelLayout != other.ChannelLayout { + if af.ChannelLayout != other.ChannelLayout { return false } - if self.SampleFormat != other.SampleFormat { + if af.SampleFormat != other.SampleFormat { return false } return true } // Split sample audio sample from this frame. -func (self AudioFrame) Slice(start int, end int) (out AudioFrame) { +func (af AudioFrame) Slice(start int, end int) (out AudioFrame) { if start > end { panic(fmt.Sprintf("av: AudioFrame split failed start=%d end=%d invalid", start, end)) } - out = self + out = af out.Data = append([][]byte(nil), out.Data...) out.SampleCount = end - start - size := self.SampleFormat.BytesPerSample() + size := af.SampleFormat.BytesPerSample() for i := range out.Data { out.Data[i] = out.Data[i][start*size : end*size] } @@ -300,8 +301,8 @@ func (self AudioFrame) Slice(start int, end int) (out AudioFrame) { } // Concat two audio frames. -func (self AudioFrame) Concat(in AudioFrame) (out AudioFrame) { - out = self +func (af AudioFrame) Concat(in AudioFrame) (out AudioFrame) { + out = af out.Data = append([][]byte(nil), out.Data...) out.SampleCount += in.SampleCount for i := range out.Data { diff --git a/format/rtsp/sdp/parser.go b/format/rtsp/sdp/parser.go index ea5ef51..12876a3 100644 --- a/format/rtsp/sdp/parser.go +++ b/format/rtsp/sdp/parser.go @@ -85,6 +85,8 @@ func Parse(content string) (sess Session, medias []Media) { media.Rtpmap, _ = strconv.Atoi(val) case "x-framerate": media.FPS, _ = strconv.Atoi(val) + case "framerate": + media.FPS, _ = strconv.Atoi(val) } } keyval = strings.Split(field, "/") diff --git a/format/rtspv2/client.go b/format/rtspv2/client.go index 87246aa..128f704 100644 --- a/format/rtspv2/client.go +++ b/format/rtspv2/client.go @@ -97,7 +97,12 @@ type RTSPClient struct { sequenceNumber int end int offset int + realVideoTs int64 + keyFrameRealVideoTs int64 + keyFrameIterateDrua int64 + preDuration time.Duration status string + lastPausedTime time.Time } type RTSPClientOptions struct { @@ -114,8 +119,8 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { client := &RTSPClient{ headers: make(map[string]string), Signals: make(chan int, 100), - OutgoingProxyQueue: make(chan *[]byte, 3000), - OutgoingPacketQueue: make(chan *av.Packet, 3000), + OutgoingProxyQueue: make(chan *[]byte, 300), + OutgoingPacketQueue: make(chan *av.Packet, 300), BufferRtpPacket: bytes.NewBuffer([]byte{}), videoID: -1, audioID: -2, @@ -266,7 +271,6 @@ func (client *RTSPClient) startStream() { timer := time.Now() oneb := make([]byte, 1) header := make([]byte, 4) - var fixed bool for { err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { @@ -281,18 +285,19 @@ func (client *RTSPClient) startStream() { } timer = time.Now() } - if client.status == PAUSE { - // client.Println("RTSP Client PAUSE") - continue - } - if !fixed { - nb, err := io.ReadFull(client.connRW, header) - if err != nil || nb != 4 { - client.Println("RTSP Client RTP Read Header", err) - return + + nb, err := io.ReadFull(client.connRW, header) + if err != nil || nb != 4 { + if client.status == PAUSE && strings.Contains(err.Error(), "timeout") { + if time.Since(client.lastPausedTime) >= time.Second*300 { + return + } + time.Sleep(100 * time.Millisecond) + continue } + client.Println("RTSP Client RTP Read Header", err) + return } - fixed = false switch header[0] { case 0x24: length := int32(binary.BigEndian.Uint16(header[2:])) @@ -326,7 +331,7 @@ func (client *RTSPClient) startStream() { } for _, i2 := range pkt { - if len(client.OutgoingPacketQueue) > 2000 { + if len(client.OutgoingPacketQueue) > 200 { client.Println("RTSP Client OutgoingPacket Chanel Full") return } @@ -514,6 +519,7 @@ func (client *RTSPClient) Pause() error { return err } client.status = PAUSE + client.lastPausedTime = time.Now() return nil } @@ -531,6 +537,7 @@ func (client *RTSPClient) Seek(customHeaders map[string]string, target int64) er return err } client.status = PLAY + client.PreVideoTS = 0 return nil } diff --git a/format/rtspv2/demuxer.go b/format/rtspv2/demuxer.go index ac61184..a4092d5 100644 --- a/format/rtspv2/demuxer.go +++ b/format/rtspv2/demuxer.go @@ -2,7 +2,6 @@ package rtspv2 import ( "encoding/binary" - "fmt" "math" "time" @@ -27,6 +26,15 @@ func (client *RTSPClient) containsPayloadType(pt int) bool { return exist } +func (client *RTSPClient) durationFromSDP() time.Duration { + for _, sdp := range client.mediaSDP { + if sdp.AVType == VIDEO && sdp.FPS != 0 { + return time.Duration((int(1000) / sdp.FPS) * int(time.Millisecond)) + } + } + return 0 +} + func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { content := *payloadRAW firstByte := content[4] @@ -36,18 +44,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { payloadType := int(content[5] & 0x7f) sequenceNumber := int(binary.BigEndian.Uint16(content[6:8])) timestamp := int64(binary.BigEndian.Uint32(content[8:12])) + // SSRC := binary.BigEndian.Uint32(content[12:16]) if isRTCPPacket(content) { client.Println("skipping RTCP packet") return nil, false } if !client.containsPayloadType(payloadType) { - client.Println(fmt.Sprintf("skipping RTP packet, paytload type: %v", payloadType)) + // client.Println(fmt.Sprintf("skipping RTP packet, paytload type: %v", payloadType)) return nil, false } - // client.Println(fmt.Sprintf("padding: %v, extension: %v, csrccnt: %d, sequence number: %d.payload type: %d, timestamp: %d", - // padding, extension, CSRCCnt, sequenceNumber, payloadType, timestamp)) + // client.Println(fmt.Sprintf("padding: %v, extension: %v, csrccnt: %d, sequence number: %d.payload type: %d, timestamp: %d", padding, extension, CSRCCnt, sequenceNumber, payloadType, timestamp)) client.offset = RTPHeaderSize client.sequenceNumber = sequenceNumber client.timestamp = timestamp @@ -58,13 +66,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { if extension && len(content) < 4+client.offset+2+2 { return nil, false } + var realTimestamp int64 if extension && client.end-client.offset >= 4 { - extLen := 4 * int(binary.BigEndian.Uint16(content[4+client.offset+2:])) - client.offset += 4 + extWords := int(binary.BigEndian.Uint16(content[4+client.offset+2:])) + extLen := 4 * extWords + client.offset += 4 // this is profile(2 byte) + ext length(2 byte) + realTimestamp = int64(binary.BigEndian.Uint32(content[client.offset+4 : client.offset+4*2])) if client.end-client.offset >= extLen { client.offset += extLen } } + client.realVideoTs = realTimestamp + if padding && client.end-client.offset > 0 { paddingLen := int(content[client.end-1]) if client.end-client.offset >= paddingLen { @@ -286,12 +299,31 @@ func (client *RTSPClient) appendAudioPacket(retmap []*av.Packet, nal []byte, dur } func (client *RTSPClient) appendVideoPacket(retmap []*av.Packet, nal []byte, isKeyFrame bool) []*av.Packet { + duration := time.Duration(float32(client.timestamp-client.PreVideoTS)/TimeBaseFactor) * time.Millisecond + sdpDuration := client.durationFromSDP() + if sdpDuration != 0 && duration > sdpDuration { + duration = sdpDuration + } + if duration == 0 { + duration = client.preDuration + } + client.preDuration = duration + if isKeyFrame { + client.keyFrameRealVideoTs = client.realVideoTs + client.keyFrameIterateDrua = 0 + } else { + client.keyFrameIterateDrua += duration.Milliseconds() + } + realVideoMs := client.keyFrameRealVideoTs * 1000 + + realVideoMs += client.keyFrameIterateDrua return append(retmap, &av.Packet{ Data: append(binSize(len(nal)), nal...), CompositionTime: time.Duration(TimeDelay) * time.Millisecond, Idx: client.videoIDX, IsKeyFrame: isKeyFrame, - Duration: time.Duration(float32(client.timestamp-client.PreVideoTS)/TimeBaseFactor) * time.Millisecond, + Duration: duration, Time: time.Duration(client.timestamp/TimeBaseFactor) * time.Millisecond, + RealTimestamp: realVideoMs, }) }