feat: Added RTSP receiver reports

This commit is contained in:
Dave Taddei 2022-09-19 10:47:13 +01:00
parent 1a6f9470d6
commit ed4b056c08

View File

@ -106,6 +106,11 @@ type RTSPClientOptions struct {
InsecureSkipVerify bool InsecureSkipVerify bool
} }
type RTPMetadata struct {
SSRC uint32
SeqNumber int
}
func Dial(options RTSPClientOptions) (*RTSPClient, error) { func Dial(options RTSPClientOptions) (*RTSPClient, error) {
client := &RTSPClient{ client := &RTSPClient{
headers: make(map[string]string), headers: make(map[string]string),
@ -261,8 +266,8 @@ func (client *RTSPClient) startStream() {
header := make([]byte, 4) header := make([]byte, 4)
first := make([]byte, 2) first := make([]byte, 2)
failed := 0 failed := 0
var ssrc uint32 var metaData RTPMetadata
ssrc = 0 metaData.SSRC = 0
for { for {
err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil { if err != nil {
@ -276,8 +281,8 @@ func (client *RTSPClient) startStream() {
return return
} }
if ssrc != 0 { if metaData.SSRC != 0 {
client.sendRR(ssrc) client.sendRR(metaData)
} }
timer = time.Now() timer = time.Now()
} }
@ -329,8 +334,8 @@ func (client *RTSPClient) startStream() {
return return
} }
} }
pkt, got, _ssrc := client.RTPDemuxer(&content) pkt, got, _metaData := client.RTPDemuxer(&content)
ssrc = _ssrc metaData = _metaData
if !got { if !got {
continue continue
} }
@ -373,7 +378,8 @@ func (client *RTSPClient) startStream() {
failed = failed + 1 failed = failed + 1
client.Println("RTSP Client RTP Read DeSync: Failed frames", failed) client.Println("RTSP Client RTP Read DeSync: Failed frames", failed)
ssrc = 0 metaData.SSRC = 0
metaData.SeqNumber = 0
if failed > 500 { if failed > 500 {
return return
@ -382,8 +388,8 @@ func (client *RTSPClient) startStream() {
} }
} }
func (client *RTSPClient) sendRR(ssrc uint32) (err error) { func (client *RTSPClient) sendRR(metaData RTPMetadata) (err error) {
client.Println("Sending ReceiverReport", ssrc) client.Println("Sending ReceiverReport", metaData)
err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil { if err != nil {
return return
@ -393,12 +399,12 @@ func (client *RTSPClient) sendRR(ssrc uint32) (err error) {
rr := rtcp.ReceiverReport{} rr := rtcp.ReceiverReport{}
recepReport := rtcp.ReceptionReport{} recepReport := rtcp.ReceptionReport{}
rr.SSRC = ssrc rr.SSRC = metaData.SSRC
recepReport.SSRC = ssrc recepReport.SSRC = metaData.SSRC
recepReport.FractionLost = 0 recepReport.FractionLost = 0
recepReport.TotalLost = 0 recepReport.TotalLost = 0
recepReport.LastSequenceNumber = 0 recepReport.LastSequenceNumber = uint32(metaData.SeqNumber)
recepReport.Jitter = 0 recepReport.Jitter = 0
recepReport.LastSenderReport = 0 recepReport.LastSenderReport = 0
recepReport.Delay = 0 recepReport.Delay = 0
@ -618,7 +624,7 @@ func stringInBetween(str string, start string, end string) (result string) {
return str return str
} }
func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, uint32) { func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, RTPMetadata) {
content := *payloadRAW content := *payloadRAW
firstByte := content[4] firstByte := content[4]
padding := (firstByte>>5)&1 == 1 padding := (firstByte>>5)&1 == 1
@ -628,6 +634,11 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
timestamp := int64(binary.BigEndian.Uint32(content[8:12])) timestamp := int64(binary.BigEndian.Uint32(content[8:12]))
ssrc := uint32(binary.BigEndian.Uint32(content[12:16])) ssrc := uint32(binary.BigEndian.Uint32(content[12:16]))
var metaData RTPMetadata
metaData.SSRC = ssrc
metaData.SeqNumber = SequenceNumber
if isRTCPPacket(content) { if isRTCPPacket(content) {
client.Println("Processing RTCP packet") client.Println("Processing RTCP packet")
rtcpPacketType := content[5] rtcpPacketType := content[5]
@ -639,7 +650,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
} else { } else {
client.Println("skipping RTCP packet") client.Println("skipping RTCP packet")
} }
return nil, false, 0 return nil, false, metaData
} }
offset := RTPHeaderSize offset := RTPHeaderSize
@ -649,7 +660,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
offset += 4 * CSRCCnt offset += 4 * CSRCCnt
} }
if extension && len(content) < 4+offset+2+2 { if extension && len(content) < 4+offset+2+2 {
return nil, false, 0 return nil, false, metaData
} }
if extension && end-offset >= 4 { if extension && end-offset >= 4 {
extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:])) extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:]))
@ -689,7 +700,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
} }
nalRaw, _ := h264parser.SplitNALUs(content[offset:end]) nalRaw, _ := h264parser.SplitNALUs(content[offset:end])
if len(nalRaw) == 0 || len(nalRaw[0]) == 0 { if len(nalRaw) == 0 || len(nalRaw[0]) == 0 {
return nil, false, 0 return nil, false, metaData
} }
var retmap []*av.Packet var retmap []*av.Packet
for _, nal := range nalRaw { for _, nal := range nalRaw {
@ -829,7 +840,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
} }
if len(retmap) > 0 { if len(retmap) > 0 {
client.PreVideoTS = timestamp client.PreVideoTS = timestamp
return retmap, true, ssrc return retmap, true, metaData
} }
case client.audioID: case client.audioID:
if client.PreAudioTS == 0 { if client.PreAudioTS == 0 {
@ -903,12 +914,12 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool, ui
} }
if len(retmap) > 0 { if len(retmap) > 0 {
client.PreAudioTS = timestamp client.PreAudioTS = timestamp
return retmap, true, ssrc return retmap, true, metaData
} }
default: default:
//client.Println("Unsuported Intervaled data packet", int(content[1]), content[offset:end]) //client.Println("Unsuported Intervaled data packet", int(content[1]), content[offset:end])
} }
return nil, false, ssrc return nil, false, metaData
} }
func (client *RTSPClient) CodecUpdateSPS(val []byte) { func (client *RTSPClient) CodecUpdateSPS(val []byte) {