From 1a6f9470d6e4a89d1ef258399b02c127544ee96c Mon Sep 17 00:00:00 2001 From: Dave Taddei Date: Mon, 19 Sep 2022 10:20:15 +0100 Subject: [PATCH] feat: Added RTSP receiver reports --- format/rtspv2/client.go | 83 +++++++++++++++++++++++++++++++++-------- go.mod | 2 +- go.sum | 2 + 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/format/rtspv2/client.go b/format/rtspv2/client.go index 523656a..fb45e1a 100644 --- a/format/rtspv2/client.go +++ b/format/rtspv2/client.go @@ -25,6 +25,7 @@ import ( "github.com/honuworx/vdk/codec/h264parser" "github.com/honuworx/vdk/codec/h265parser" "github.com/honuworx/vdk/format/rtsp/sdp" + "github.com/pion/rtcp" ) const ( @@ -260,6 +261,8 @@ func (client *RTSPClient) startStream() { header := make([]byte, 4) first := make([]byte, 2) failed := 0 + var ssrc uint32 + ssrc = 0 for { err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { @@ -272,6 +275,10 @@ func (client *RTSPClient) startStream() { client.Println("RTSP Client RTP keep-alive", err) return } + + if ssrc != 0 { + client.sendRR(ssrc) + } timer = time.Now() } @@ -283,7 +290,6 @@ func (client *RTSPClient) startStream() { client.Println("RTSP Client RTP Read Header", err) return } - client.Println("Searching for 0x24 or 0x52 headers: ", first) } header[0] = first[0] @@ -295,9 +301,6 @@ func (client *RTSPClient) startStream() { return } - length := int32(binary.BigEndian.Uint16(header[2:])) - client.Println("Got frame header: ", header, length) - switch header[0] { case 0x24: failed = 0 @@ -326,7 +329,8 @@ func (client *RTSPClient) startStream() { return } } - pkt, got := client.RTPDemuxer(&content) + pkt, got, _ssrc := client.RTPDemuxer(&content) + ssrc = _ssrc if !got { continue } @@ -369,6 +373,8 @@ func (client *RTSPClient) startStream() { failed = failed + 1 client.Println("RTSP Client RTP Read DeSync: Failed frames", failed) + ssrc = 0 + if failed > 500 { return } @@ -376,6 +382,44 @@ func (client *RTSPClient) startStream() { } } +func (client *RTSPClient) sendRR(ssrc uint32) (err error) { + client.Println("Sending ReceiverReport", ssrc) + err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) + if err != nil { + return + } + client.seq++ + + rr := rtcp.ReceiverReport{} + recepReport := rtcp.ReceptionReport{} + + rr.SSRC = ssrc + + recepReport.SSRC = ssrc + recepReport.FractionLost = 0 + recepReport.TotalLost = 0 + recepReport.LastSequenceNumber = 0 + recepReport.Jitter = 0 + recepReport.LastSenderReport = 0 + recepReport.Delay = 0 + + rr.Reports = append(rr.Reports, recepReport) + + data, err := rr.Marshal() + + if err != nil { + return + } + + nb, err := client.connRW.Write(data) + + if err != nil || nb == 0 { + return + } + + return +} + func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, one bool, nores bool) (err error) { err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { @@ -574,19 +618,28 @@ func stringInBetween(str string, start string, end string) (result string) { return str } -func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { - +func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, uint32) { content := *payloadRAW firstByte := content[4] padding := (firstByte>>5)&1 == 1 extension := (firstByte>>4)&1 == 1 CSRCCnt := int(firstByte & 0x0f) SequenceNumber := int(binary.BigEndian.Uint16(content[6:8])) - timestamp := int64(binary.BigEndian.Uint32(content[8:16])) + timestamp := int64(binary.BigEndian.Uint32(content[8:12])) + ssrc := uint32(binary.BigEndian.Uint32(content[12:16])) if isRTCPPacket(content) { - client.Println("skipping RTCP packet") - return nil, false + client.Println("Processing RTCP packet") + rtcpPacketType := content[5] + if rtcpPacketType == RTCPSenderReport { + srr := rtcp.SenderReport{} + srr.Unmarshal(content) + + client.Println(srr) + } else { + client.Println("skipping RTCP packet") + } + return nil, false, 0 } offset := RTPHeaderSize @@ -596,7 +649,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { offset += 4 * CSRCCnt } if extension && len(content) < 4+offset+2+2 { - return nil, false + return nil, false, 0 } if extension && end-offset >= 4 { extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:])) @@ -636,7 +689,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { } nalRaw, _ := h264parser.SplitNALUs(content[offset:end]) if len(nalRaw) == 0 || len(nalRaw[0]) == 0 { - return nil, false + return nil, false, 0 } var retmap []*av.Packet for _, nal := range nalRaw { @@ -776,7 +829,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { } if len(retmap) > 0 { client.PreVideoTS = timestamp - return retmap, true + return retmap, true, ssrc } case client.audioID: if client.PreAudioTS == 0 { @@ -850,12 +903,12 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { } if len(retmap) > 0 { client.PreAudioTS = timestamp - return retmap, true + return retmap, true, ssrc } default: //client.Println("Unsuported Intervaled data packet", int(content[1]), content[offset:end]) } - return nil, false + return nil, false, ssrc } func (client *RTSPClient) CodecUpdateSPS(val []byte) { diff --git a/go.mod b/go.mod index 9f4df6b..c666bac 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/pion/mdns v0.0.5 // indirect github.com/pion/quic v0.1.1 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.9 // indirect + github.com/pion/rtcp v1.2.10 // indirect github.com/pion/rtp v1.7.13 // indirect github.com/pion/sctp v1.8.2 // indirect github.com/pion/sdp/v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index d2d74e9..89d598c 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,8 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I= github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U= github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo= +github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= +github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI= github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=