Add play pause method to rtsp

This commit is contained in:
Quanqi Gu 2021-10-14 17:04:14 +08:00
parent a77315fe6e
commit aa7284e097
2 changed files with 161 additions and 38 deletions

View File

@ -242,6 +242,7 @@ type Packet struct {
Time time.Duration // packet decode time
Duration time.Duration //packet duration
Data []byte // packet data
NaluType byte
}
// Raw audio frame.

View File

@ -42,7 +42,10 @@ const (
DESCRIBE = "DESCRIBE"
OPTIONS = "OPTIONS"
PLAY = "PLAY"
PAUSE = "PAUSE"
SETUP = "SETUP"
RECORD = "RECORD"
ANNOUNCE = "ANNOUNCE"
TEARDOWN = "TEARDOWN"
)
@ -113,6 +116,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
AudioTimeScale: 8000,
}
client.headers["User-Agent"] = "Lavf58.20.100"
Debug = options.Debug
err := client.parseURL(html.UnescapeString(client.options.URL))
if err != nil {
return nil, err
@ -127,43 +131,43 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
}
client.conn = conn
client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = client.request(OPTIONS, nil, client.pURL.String(), false, false)
err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
if err != nil {
return nil, err
}
err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), false, false)
err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil, false, false)
if err != nil {
return nil, err
}
var ch int
for _, i2 := range client.mediaSDP {
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
for _, m := range client.mediaSDP {
if (m.AVType != VIDEO && m.AVType != AUDIO) || (client.options.DisableAudio && m.AVType == AUDIO) {
continue
}
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(i2.Control), false, false)
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(m.Control), nil, false, false)
if err != nil {
return nil, err
}
if i2.AVType == VIDEO {
if i2.Type == av.H264 {
if len(i2.SpropParameterSets) > 1 {
if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(i2.SpropParameterSets[0], i2.SpropParameterSets[1]); err == nil {
client.sps = i2.SpropParameterSets[0]
client.pps = i2.SpropParameterSets[1]
if m.AVType == VIDEO {
if m.Type == av.H264 {
if len(m.SpropParameterSets) > 1 {
if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(m.SpropParameterSets[0], m.SpropParameterSets[1]); err == nil {
client.sps = m.SpropParameterSets[0]
client.pps = m.SpropParameterSets[1]
client.CodecData = append(client.CodecData, codecData)
}
} else {
client.CodecData = append(client.CodecData, h264parser.CodecData{})
client.WaitCodec = true
}
client.FPS = i2.FPS
client.FPS = m.FPS
client.videoCodec = av.H264
} else if i2.Type == av.H265 {
if len(i2.SpropVPS) > 1 && len(i2.SpropSPS) > 1 && len(i2.SpropPPS) > 1 {
if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(i2.SpropVPS, i2.SpropSPS, i2.SpropPPS); err == nil {
client.vps = i2.SpropVPS
client.sps = i2.SpropSPS
client.pps = i2.SpropPPS
} else if m.Type == av.H265 {
if len(m.SpropVPS) > 1 && len(m.SpropSPS) > 1 && len(m.SpropPPS) > 1 {
if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(m.SpropVPS, m.SpropSPS, m.SpropPPS); err == nil {
client.vps = m.SpropVPS
client.sps = m.SpropSPS
client.pps = m.SpropPPS
client.CodecData = append(client.CodecData, codecData)
}
} else {
@ -171,23 +175,23 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
}
client.videoCodec = av.H265
} else {
client.Println("SDP Video Codec Type Not Supported", i2.Type)
client.Println("SDP Video Codec Type Not Supported", m.Type)
}
client.videoIDX = int8(len(client.CodecData) - 1)
client.videoID = ch
}
if i2.AVType == AUDIO {
if m.AVType == AUDIO {
client.audioID = ch
var CodecData av.AudioCodecData
switch i2.Type {
switch m.Type {
case av.AAC:
CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(i2.Config)
CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(m.Config)
if err == nil {
client.Println("Audio AAC bad config")
}
case av.OPUS:
var cl av.ChannelLayout
switch i2.ChannelCount {
switch m.ChannelCount {
case 1:
cl = av.CH_MONO
case 2:
@ -195,7 +199,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
default:
cl = av.CH_MONO
}
CodecData = codec.NewOpusCodecData(i2.TimeScale, cl)
CodecData = codec.NewOpusCodecData(m.TimeScale, cl)
case av.PCM_MULAW:
CodecData = codec.NewPCMMulawCodecData()
case av.PCM_ALAW:
@ -203,21 +207,21 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
case av.PCM:
CodecData = codec.NewPCMCodecData()
default:
client.Println("Audio Codec", i2.Type, "not supported")
client.Println("Audio Codec", m.Type, "not supported")
}
if CodecData != nil {
client.CodecData = append(client.CodecData, CodecData)
client.audioIDX = int8(len(client.CodecData) - 1)
client.audioCodec = CodecData.Type()
if i2.TimeScale != 0 {
client.AudioTimeScale = int64(i2.TimeScale)
if m.TimeScale != 0 {
client.AudioTimeScale = int64(m.TimeScale)
}
}
}
ch += 2
}
err = client.request(PLAY, nil, client.control, false, false)
err = client.request(PLAY, nil, client.control, nil, false, false)
if err != nil {
return nil, err
}
@ -225,6 +229,72 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
return client, nil
}
func Publish(options RTSPClientOptions, sdp []byte) (*RTSPClient, error) {
client := &RTSPClient{
headers: make(map[string]string),
Signals: make(chan int, 100),
OutgoingProxyQueue: make(chan *[]byte, 3000),
OutgoingPacketQueue: make(chan *av.Packet, 3000),
BufferRtpPacket: bytes.NewBuffer([]byte{}),
videoID: -1,
audioID: -2,
videoIDX: -1,
audioIDX: -2,
options: options,
AudioTimeScale: 8000,
}
client.headers["User-Agent"] = "Lavf58.20.100"
err := client.parseURL(html.UnescapeString(client.options.URL))
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", client.pURL.Host, client.options.DialTimeout)
if err != nil {
return nil, err
}
err = conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil {
return nil, err
}
client.conn = conn
client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
if err != nil {
return nil, err
}
err = client.request(ANNOUNCE, map[string]string{"Content-Type": "application/sdp", "Content-Length": fmt.Sprintf("%d", len(sdp))}, client.pURL.String(), sdp, false, false)
if err != nil {
return nil, err
}
//err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
//if err != nil {
// return nil, err
//}
//
//err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil,false, false)
//if err != nil {
// return nil, err
//}
// interleved
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}, client.pURL.String(), nil, false, false)
if err != nil {
return nil, err
}
err = client.request(RECORD, map[string]string{"Range": "npt=0.000-"}, client.pURL.String(), nil, false, false)
if err != nil {
return nil, err
}
return client, err
}
func (client *RTSPClient) SendRTP(pack []byte) {
client.conn.Write(pack)
}
func (client *RTSPClient) ControlTrack(track string) string {
if strings.Contains(track, "rtsp://") {
return track
@ -250,7 +320,7 @@ func (client *RTSPClient) startStream() {
return
}
if int(time.Now().Sub(timer).Seconds()) > 25 {
err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, false, true)
err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, nil, false, true)
if err != nil {
client.Println("RTSP Client RTP keep-alive", err)
return
@ -273,10 +343,10 @@ func (client *RTSPClient) startStream() {
return
}
content := make([]byte, length+4)
content[0] = header[0]
content[1] = header[1]
content[2] = header[2]
content[3] = header[3]
content[0] = header[0] // magic 0x24
content[1] = header[1] // channel
content[2] = header[2] // length byte high
content[3] = header[3] // length byte low
n, rerr := io.ReadFull(client.connRW, content[4:length+4])
if rerr != nil || n != int(length) {
client.Println("RTSP Client RTP ReadFull", err)
@ -328,6 +398,7 @@ func (client *RTSPClient) startStream() {
break
}
}
client.Println("RTSP RESPONSE<<< ", string(responseTmp))
default:
client.Println("RTSP Client RTP Read DeSync")
return
@ -335,7 +406,28 @@ func (client *RTSPClient) startStream() {
}
}
func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, one bool, nores bool) (err error) {
func (client *RTSPClient) Options() error {
err := client.request(OPTIONS, nil, client.pURL.String(), nil, false, true)
return err
}
func (client *RTSPClient) Play(customHeaders map[string]string) error {
err := client.request(PLAY, customHeaders, client.pURL.String(), nil, false, true)
return err
}
func (client *RTSPClient) Pause() error {
err := client.request(PAUSE, nil, client.pURL.String(), nil, false, true)
return err
}
func (client *RTSPClient) Announce(sdp []byte) error {
err := client.request(ANNOUNCE, nil, client.pURL.String(), sdp, false, true)
return err
}
// request nores 如果流已经在拉的话nores必须为true否则client.connRW.ReadLine() 和 startStream中会因为并发读取导致slice的数量不够导致outbounds of slice panic.
func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, body []byte, one bool, nores bool) (err error) {
err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil {
return
@ -356,6 +448,11 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
builder.WriteString(fmt.Sprintf("\r\n"))
if body != nil && len(body) > 0 {
builder.WriteString(string(body))
builder.WriteString(fmt.Sprintf("\r\n"))
}
client.Println(builder.String())
s := builder.String()
_, err = client.connRW.WriteString(s)
@ -397,6 +494,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
res[splits[0]] = splits[1]
}
}
client.Println(builder.String())
if val, ok := res["WWW-Authenticate"]; ok {
if strings.Contains(val, "Digest") {
client.realm = stringInBetween(val, "realm=\"", "\"")
@ -407,7 +505,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
client.clientBasic = true
}
if !one {
err = client.request(method, customHeaders, uri, true, false)
err = client.request(method, customHeaders, uri, nil, true, false)
return
}
err = errors.New("RTSP Client Unauthorized 401")
@ -464,7 +562,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
func (client *RTSPClient) Close() {
if client.conn != nil {
client.conn.SetDeadline(time.Now().Add(time.Second))
client.request(TEARDOWN, nil, client.control, false, true)
client.request(TEARDOWN, nil, client.control, nil, false, true)
err := client.conn.Close()
client.Println("RTSP Client Close", err)
}
@ -516,11 +614,20 @@ func stringInBetween(str string, start string, end string) (result string) {
func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
content := *payloadRAW
firstByte := content[4]
version := (firstByte >> 6) & 0x03
padding := (firstByte>>5)&1 == 1
extension := (firstByte>>4)&1 == 1
CSRCCnt := int(firstByte & 0x0f)
marker := (content[5]>>7)&1 == 1
payloadType := content[5] & 0x7f
SequenceNumber := int(binary.BigEndian.Uint16(content[6:8]))
timestamp := int64(binary.BigEndian.Uint32(content[8:12]))
SSRC := binary.BigEndian.Uint32(content[12:16])
if Debug {
log.Printf("version: %d, padding: %v, extension: %v, csrccnt: %d, marker: %v, payload type: %d, sequence number: %d. timestamp: %d, ssrc: %d",
version, padding, extension, CSRCCnt, marker, payloadType, SequenceNumber, timestamp, SSRC)
}
offset := RTPHeaderSize
end := len(content)
@ -531,8 +638,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
return nil, false
}
if extension && end-offset >= 4 {
extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:]))
offset += 4
extWords := int(binary.BigEndian.Uint16(content[4+offset+2:]))
extLen := 4 * extWords
offset += 4 // this is profile(2 byte) + ext length(2 byte)
if Debug {
for i := 0; i < extWords; i++ {
ext := binary.BigEndian.Uint32(content[offset+4*i : offset+4*(i+1)])
log.Printf("extension: %d: %d", i+1, ext)
if i == 1 {
log.Print("timestamp: ", time.Unix(int64(ext), 0).String())
}
}
}
if end-offset >= extLen {
offset += extLen
}
@ -573,6 +690,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
CompositionTime: time.Duration(1) * time.Millisecond,
Idx: client.videoIDX,
IsKeyFrame: false,
NaluType: naluType,
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
Time: time.Duration(timestamp/90) * time.Millisecond,
})
@ -600,6 +718,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
CompositionTime: time.Duration(1) * time.Millisecond,
Idx: client.videoIDX,
IsKeyFrame: naluType == h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
NaluType: naluType,
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
Time: time.Duration(timestamp/90) * time.Millisecond,
})
@ -612,12 +731,14 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
} else if client.videoCodec == av.H264 {
naluType := nal[0] & 0x1f
//log.Printf("nalu type: %d", naluType)
switch {
case naluType >= 1 && naluType <= 5:
retmap = append(retmap, &av.Packet{
Data: append(binSize(len(nal)), nal...),
CompositionTime: time.Duration(1) * time.Millisecond,
Idx: client.videoIDX,
NaluType: naluType,
IsKeyFrame: naluType == 5,
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
Time: time.Duration(timestamp/90) * time.Millisecond,
@ -666,6 +787,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
Idx: client.videoIDX,
IsKeyFrame: naluTypef == 5,
NaluType: naluTypef,
Time: time.Duration(timestamp/90) * time.Millisecond,
})
}