From aa7284e097c420270b1f4f6162bb66dfdcd04ac9 Mon Sep 17 00:00:00 2001 From: Quanqi Gu Date: Thu, 14 Oct 2021 17:04:14 +0800 Subject: [PATCH] Add play pause method to rtsp --- av/av.go | 1 + format/rtspv2/client.go | 198 ++++++++++++++++++++++++++++++++-------- 2 files changed, 161 insertions(+), 38 deletions(-) diff --git a/av/av.go b/av/av.go index 232b2c8..d21cc79 100644 --- a/av/av.go +++ b/av/av.go @@ -242,6 +242,7 @@ type Packet struct { Time time.Duration // packet decode time Duration time.Duration //packet duration Data []byte // packet data + NaluType byte } // Raw audio frame. diff --git a/format/rtspv2/client.go b/format/rtspv2/client.go index f3cad8f..dc5a578 100644 --- a/format/rtspv2/client.go +++ b/format/rtspv2/client.go @@ -42,7 +42,10 @@ const ( DESCRIBE = "DESCRIBE" OPTIONS = "OPTIONS" PLAY = "PLAY" + PAUSE = "PAUSE" SETUP = "SETUP" + RECORD = "RECORD" + ANNOUNCE = "ANNOUNCE" TEARDOWN = "TEARDOWN" ) @@ -113,6 +116,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { AudioTimeScale: 8000, } client.headers["User-Agent"] = "Lavf58.20.100" + Debug = options.Debug err := client.parseURL(html.UnescapeString(client.options.URL)) if err != nil { return nil, err @@ -127,43 +131,43 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { } client.conn = conn client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - err = client.request(OPTIONS, nil, client.pURL.String(), false, false) + err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false) if err != nil { return nil, err } - err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), false, false) + err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil, false, false) if err != nil { return nil, err } var ch int - for _, i2 := range client.mediaSDP { - if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) { + for _, m := range client.mediaSDP { + if (m.AVType != VIDEO && m.AVType != AUDIO) || (client.options.DisableAudio && m.AVType == AUDIO) { continue } - err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(i2.Control), false, false) + err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(m.Control), nil, false, false) if err != nil { return nil, err } - if i2.AVType == VIDEO { - if i2.Type == av.H264 { - if len(i2.SpropParameterSets) > 1 { - if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(i2.SpropParameterSets[0], i2.SpropParameterSets[1]); err == nil { - client.sps = i2.SpropParameterSets[0] - client.pps = i2.SpropParameterSets[1] + if m.AVType == VIDEO { + if m.Type == av.H264 { + if len(m.SpropParameterSets) > 1 { + if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(m.SpropParameterSets[0], m.SpropParameterSets[1]); err == nil { + client.sps = m.SpropParameterSets[0] + client.pps = m.SpropParameterSets[1] client.CodecData = append(client.CodecData, codecData) } } else { client.CodecData = append(client.CodecData, h264parser.CodecData{}) client.WaitCodec = true } - client.FPS = i2.FPS + client.FPS = m.FPS client.videoCodec = av.H264 - } else if i2.Type == av.H265 { - if len(i2.SpropVPS) > 1 && len(i2.SpropSPS) > 1 && len(i2.SpropPPS) > 1 { - if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(i2.SpropVPS, i2.SpropSPS, i2.SpropPPS); err == nil { - client.vps = i2.SpropVPS - client.sps = i2.SpropSPS - client.pps = i2.SpropPPS + } else if m.Type == av.H265 { + if len(m.SpropVPS) > 1 && len(m.SpropSPS) > 1 && len(m.SpropPPS) > 1 { + if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(m.SpropVPS, m.SpropSPS, m.SpropPPS); err == nil { + client.vps = m.SpropVPS + client.sps = m.SpropSPS + client.pps = m.SpropPPS client.CodecData = append(client.CodecData, codecData) } } else { @@ -171,23 +175,23 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { } client.videoCodec = av.H265 } else { - client.Println("SDP Video Codec Type Not Supported", i2.Type) + client.Println("SDP Video Codec Type Not Supported", m.Type) } client.videoIDX = int8(len(client.CodecData) - 1) client.videoID = ch } - if i2.AVType == AUDIO { + if m.AVType == AUDIO { client.audioID = ch var CodecData av.AudioCodecData - switch i2.Type { + switch m.Type { case av.AAC: - CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(i2.Config) + CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(m.Config) if err == nil { client.Println("Audio AAC bad config") } case av.OPUS: var cl av.ChannelLayout - switch i2.ChannelCount { + switch m.ChannelCount { case 1: cl = av.CH_MONO case 2: @@ -195,7 +199,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { default: cl = av.CH_MONO } - CodecData = codec.NewOpusCodecData(i2.TimeScale, cl) + CodecData = codec.NewOpusCodecData(m.TimeScale, cl) case av.PCM_MULAW: CodecData = codec.NewPCMMulawCodecData() case av.PCM_ALAW: @@ -203,21 +207,21 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { case av.PCM: CodecData = codec.NewPCMCodecData() default: - client.Println("Audio Codec", i2.Type, "not supported") + client.Println("Audio Codec", m.Type, "not supported") } if CodecData != nil { client.CodecData = append(client.CodecData, CodecData) client.audioIDX = int8(len(client.CodecData) - 1) client.audioCodec = CodecData.Type() - if i2.TimeScale != 0 { - client.AudioTimeScale = int64(i2.TimeScale) + if m.TimeScale != 0 { + client.AudioTimeScale = int64(m.TimeScale) } } } ch += 2 } - err = client.request(PLAY, nil, client.control, false, false) + err = client.request(PLAY, nil, client.control, nil, false, false) if err != nil { return nil, err } @@ -225,6 +229,72 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) { return client, nil } +func Publish(options RTSPClientOptions, sdp []byte) (*RTSPClient, error) { + client := &RTSPClient{ + headers: make(map[string]string), + Signals: make(chan int, 100), + OutgoingProxyQueue: make(chan *[]byte, 3000), + OutgoingPacketQueue: make(chan *av.Packet, 3000), + BufferRtpPacket: bytes.NewBuffer([]byte{}), + videoID: -1, + audioID: -2, + videoIDX: -1, + audioIDX: -2, + options: options, + AudioTimeScale: 8000, + } + client.headers["User-Agent"] = "Lavf58.20.100" + err := client.parseURL(html.UnescapeString(client.options.URL)) + if err != nil { + return nil, err + } + conn, err := net.DialTimeout("tcp", client.pURL.Host, client.options.DialTimeout) + if err != nil { + return nil, err + } + err = conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) + if err != nil { + return nil, err + } + client.conn = conn + client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false) + if err != nil { + return nil, err + } + + err = client.request(ANNOUNCE, map[string]string{"Content-Type": "application/sdp", "Content-Length": fmt.Sprintf("%d", len(sdp))}, client.pURL.String(), sdp, false, false) + if err != nil { + return nil, err + } + + //err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false) + //if err != nil { + // return nil, err + //} + // + //err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil,false, false) + //if err != nil { + // return nil, err + //} + + // interleved + err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}, client.pURL.String(), nil, false, false) + if err != nil { + return nil, err + } + + err = client.request(RECORD, map[string]string{"Range": "npt=0.000-"}, client.pURL.String(), nil, false, false) + if err != nil { + return nil, err + } + return client, err +} + +func (client *RTSPClient) SendRTP(pack []byte) { + client.conn.Write(pack) +} + func (client *RTSPClient) ControlTrack(track string) string { if strings.Contains(track, "rtsp://") { return track @@ -250,7 +320,7 @@ func (client *RTSPClient) startStream() { return } if int(time.Now().Sub(timer).Seconds()) > 25 { - err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, false, true) + err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, nil, false, true) if err != nil { client.Println("RTSP Client RTP keep-alive", err) return @@ -273,10 +343,10 @@ func (client *RTSPClient) startStream() { return } content := make([]byte, length+4) - content[0] = header[0] - content[1] = header[1] - content[2] = header[2] - content[3] = header[3] + content[0] = header[0] // magic 0x24 + content[1] = header[1] // channel + content[2] = header[2] // length byte high + content[3] = header[3] // length byte low n, rerr := io.ReadFull(client.connRW, content[4:length+4]) if rerr != nil || n != int(length) { client.Println("RTSP Client RTP ReadFull", err) @@ -328,6 +398,7 @@ func (client *RTSPClient) startStream() { break } } + client.Println("RTSP RESPONSE<<< ", string(responseTmp)) default: client.Println("RTSP Client RTP Read DeSync") return @@ -335,7 +406,28 @@ func (client *RTSPClient) startStream() { } } -func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, one bool, nores bool) (err error) { +func (client *RTSPClient) Options() error { + err := client.request(OPTIONS, nil, client.pURL.String(), nil, false, true) + return err +} + +func (client *RTSPClient) Play(customHeaders map[string]string) error { + err := client.request(PLAY, customHeaders, client.pURL.String(), nil, false, true) + return err +} + +func (client *RTSPClient) Pause() error { + err := client.request(PAUSE, nil, client.pURL.String(), nil, false, true) + return err +} + +func (client *RTSPClient) Announce(sdp []byte) error { + err := client.request(ANNOUNCE, nil, client.pURL.String(), sdp, false, true) + return err +} + +// request nores 如果流已经在拉的话,nores必须为true,否则client.connRW.ReadLine() 和 startStream中会因为并发读取导致slice的数量不够,导致outbounds of slice panic. +func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, body []byte, one bool, nores bool) (err error) { err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout)) if err != nil { return @@ -356,6 +448,11 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)) } builder.WriteString(fmt.Sprintf("\r\n")) + if body != nil && len(body) > 0 { + builder.WriteString(string(body)) + builder.WriteString(fmt.Sprintf("\r\n")) + } + client.Println(builder.String()) s := builder.String() _, err = client.connRW.WriteString(s) @@ -397,6 +494,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string res[splits[0]] = splits[1] } } + client.Println(builder.String()) if val, ok := res["WWW-Authenticate"]; ok { if strings.Contains(val, "Digest") { client.realm = stringInBetween(val, "realm=\"", "\"") @@ -407,7 +505,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string client.clientBasic = true } if !one { - err = client.request(method, customHeaders, uri, true, false) + err = client.request(method, customHeaders, uri, nil, true, false) return } err = errors.New("RTSP Client Unauthorized 401") @@ -464,7 +562,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string func (client *RTSPClient) Close() { if client.conn != nil { client.conn.SetDeadline(time.Now().Add(time.Second)) - client.request(TEARDOWN, nil, client.control, false, true) + client.request(TEARDOWN, nil, client.control, nil, false, true) err := client.conn.Close() client.Println("RTSP Client Close", err) } @@ -516,11 +614,20 @@ func stringInBetween(str string, start string, end string) (result string) { func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { content := *payloadRAW firstByte := content[4] + version := (firstByte >> 6) & 0x03 padding := (firstByte>>5)&1 == 1 extension := (firstByte>>4)&1 == 1 CSRCCnt := int(firstByte & 0x0f) + marker := (content[5]>>7)&1 == 1 + payloadType := content[5] & 0x7f SequenceNumber := int(binary.BigEndian.Uint16(content[6:8])) timestamp := int64(binary.BigEndian.Uint32(content[8:12])) + SSRC := binary.BigEndian.Uint32(content[12:16]) + if Debug { + log.Printf("version: %d, padding: %v, extension: %v, csrccnt: %d, marker: %v, payload type: %d, sequence number: %d. timestamp: %d, ssrc: %d", + version, padding, extension, CSRCCnt, marker, payloadType, SequenceNumber, timestamp, SSRC) + } + offset := RTPHeaderSize end := len(content) @@ -531,8 +638,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { return nil, false } if extension && end-offset >= 4 { - extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:])) - offset += 4 + extWords := int(binary.BigEndian.Uint16(content[4+offset+2:])) + extLen := 4 * extWords + offset += 4 // this is profile(2 byte) + ext length(2 byte) + if Debug { + for i := 0; i < extWords; i++ { + ext := binary.BigEndian.Uint32(content[offset+4*i : offset+4*(i+1)]) + log.Printf("extension: %d: %d", i+1, ext) + if i == 1 { + log.Print("timestamp: ", time.Unix(int64(ext), 0).String()) + } + } + } if end-offset >= extLen { offset += extLen } @@ -573,6 +690,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { CompositionTime: time.Duration(1) * time.Millisecond, Idx: client.videoIDX, IsKeyFrame: false, + NaluType: naluType, Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond, Time: time.Duration(timestamp/90) * time.Millisecond, }) @@ -600,6 +718,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { CompositionTime: time.Duration(1) * time.Millisecond, Idx: client.videoIDX, IsKeyFrame: naluType == h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL, + NaluType: naluType, Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond, Time: time.Duration(timestamp/90) * time.Millisecond, }) @@ -612,12 +731,14 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { } else if client.videoCodec == av.H264 { naluType := nal[0] & 0x1f + //log.Printf("nalu type: %d", naluType) switch { case naluType >= 1 && naluType <= 5: retmap = append(retmap, &av.Packet{ Data: append(binSize(len(nal)), nal...), CompositionTime: time.Duration(1) * time.Millisecond, Idx: client.videoIDX, + NaluType: naluType, IsKeyFrame: naluType == 5, Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond, Time: time.Duration(timestamp/90) * time.Millisecond, @@ -666,6 +787,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond, Idx: client.videoIDX, IsKeyFrame: naluTypef == 5, + NaluType: naluTypef, Time: time.Duration(timestamp/90) * time.Millisecond, }) }