From ed4b056c087c007ccb76eab109a41a88f9e0c8c5 Mon Sep 17 00:00:00 2001 From: Dave Taddei Date: Mon, 19 Sep 2022 10:47:13 +0100 Subject: [PATCH] feat: Added RTSP receiver reports --- format/rtspv2/client.go | 49 +++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/format/rtspv2/client.go b/format/rtspv2/client.go index fb45e1a..dd57351 100644 --- a/format/rtspv2/client.go +++ b/format/rtspv2/client.go @@ -106,6 +106,11 @@ type RTSPClientOptions struct { InsecureSkipVerify bool } +type RTPMetadata struct { + SSRC uint32 + SeqNumber int +} + func Dial(options RTSPClientOptions) (*RTSPClient, error) { client := &RTSPClient{ headers: make(map[string]string), @@ -261,8 +266,8 @@ func (client *RTSPClient) startStream() { header := make([]byte, 4) first := make([]byte, 2) failed := 0 - var ssrc uint32 - ssrc = 0 + var metaData RTPMetadata + metaData.SSRC = 0 for { err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { @@ -276,8 +281,8 @@ func (client *RTSPClient) startStream() { return } - if ssrc != 0 { - client.sendRR(ssrc) + if metaData.SSRC != 0 { + client.sendRR(metaData) } timer = time.Now() } @@ -329,8 +334,8 @@ func (client *RTSPClient) startStream() { return } } - pkt, got, _ssrc := client.RTPDemuxer(&content) - ssrc = _ssrc + pkt, got, _metaData := client.RTPDemuxer(&content) + metaData = _metaData if !got { continue } @@ -373,7 +378,8 @@ func (client *RTSPClient) startStream() { failed = failed + 1 client.Println("RTSP Client RTP Read DeSync: Failed frames", failed) - ssrc = 0 + metaData.SSRC = 0 + metaData.SeqNumber = 0 if failed > 500 { return @@ -382,8 +388,8 @@ func (client *RTSPClient) startStream() { } } -func (client *RTSPClient) sendRR(ssrc uint32) (err error) { - client.Println("Sending ReceiverReport", ssrc) +func (client *RTSPClient) sendRR(metaData RTPMetadata) (err error) { + client.Println("Sending ReceiverReport", metaData) err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { return @@ -393,12 +399,12 @@ func (client *RTSPClient) sendRR(ssrc uint32) (err error) { rr := rtcp.ReceiverReport{} recepReport := rtcp.ReceptionReport{} - rr.SSRC = ssrc + rr.SSRC = metaData.SSRC - recepReport.SSRC = ssrc + recepReport.SSRC = metaData.SSRC recepReport.FractionLost = 0 recepReport.TotalLost = 0 - recepReport.LastSequenceNumber = 0 + recepReport.LastSequenceNumber = uint32(metaData.SeqNumber) recepReport.Jitter = 0 recepReport.LastSenderReport = 0 recepReport.Delay = 0 @@ -618,7 +624,7 @@ func stringInBetween(str string, start string, end string) (result string) { return str } -func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, uint32) { +func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, RTPMetadata) { content := *payloadRAW firstByte := content[4] padding := (firstByte>>5)&1 == 1 @@ -628,6 +634,11 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui timestamp := int64(binary.BigEndian.Uint32(content[8:12])) ssrc := uint32(binary.BigEndian.Uint32(content[12:16])) + var metaData RTPMetadata + + metaData.SSRC = ssrc + metaData.SeqNumber = SequenceNumber + if isRTCPPacket(content) { client.Println("Processing RTCP packet") rtcpPacketType := content[5] @@ -639,7 +650,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui } else { client.Println("skipping RTCP packet") } - return nil, false, 0 + return nil, false, metaData } offset := RTPHeaderSize @@ -649,7 +660,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui offset += 4 * CSRCCnt } if extension && len(content) < 4+offset+2+2 { - return nil, false, 0 + return nil, false, metaData } if extension && end-offset >= 4 { extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:])) @@ -689,7 +700,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui } nalRaw, _ := h264parser.SplitNALUs(content[offset:end]) if len(nalRaw) == 0 || len(nalRaw[0]) == 0 { - return nil, false, 0 + return nil, false, metaData } var retmap []*av.Packet for _, nal := range nalRaw { @@ -829,7 +840,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui } if len(retmap) > 0 { client.PreVideoTS = timestamp - return retmap, true, ssrc + return retmap, true, metaData } case client.audioID: if client.PreAudioTS == 0 { @@ -903,12 +914,12 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui } if len(retmap) > 0 { client.PreAudioTS = timestamp - return retmap, true, ssrc + return retmap, true, metaData } default: //client.Println("Unsuported Intervaled data packet", int(content[1]), content[offset:end]) } - return nil, false, ssrc + return nil, false, metaData } func (client *RTSPClient) CodecUpdateSPS(val []byte) {