Merge pull request #1 from quanqigu/feature/rtsp
Add publish, play, pause and announce method to rtsp
This commit is contained in:
commit
405d5d2638
1
av/av.go
1
av/av.go
@ -254,6 +254,7 @@ type Packet struct {
|
||||
Time time.Duration // packet decode time
|
||||
Duration time.Duration //packet duration
|
||||
Data []byte // packet data
|
||||
NaluType byte
|
||||
}
|
||||
|
||||
// Raw audio frame.
|
||||
|
@ -42,7 +42,10 @@ const (
|
||||
DESCRIBE = "DESCRIBE"
|
||||
OPTIONS = "OPTIONS"
|
||||
PLAY = "PLAY"
|
||||
PAUSE = "PAUSE"
|
||||
SETUP = "SETUP"
|
||||
RECORD = "RECORD"
|
||||
ANNOUNCE = "ANNOUNCE"
|
||||
TEARDOWN = "TEARDOWN"
|
||||
)
|
||||
|
||||
@ -113,6 +116,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
AudioTimeScale: 8000,
|
||||
}
|
||||
client.headers["User-Agent"] = "Lavf58.20.100"
|
||||
Debug = options.Debug
|
||||
err := client.parseURL(html.UnescapeString(client.options.URL))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -127,43 +131,43 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
}
|
||||
client.conn = conn
|
||||
client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
err = client.request(OPTIONS, nil, client.pURL.String(), false, false)
|
||||
err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), false, false)
|
||||
err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ch int
|
||||
for _, i2 := range client.mediaSDP {
|
||||
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
|
||||
for _, m := range client.mediaSDP {
|
||||
if (m.AVType != VIDEO && m.AVType != AUDIO) || (client.options.DisableAudio && m.AVType == AUDIO) {
|
||||
continue
|
||||
}
|
||||
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(i2.Control), false, false)
|
||||
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(ch) + "-" + strconv.Itoa(ch+1)}, client.ControlTrack(m.Control), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i2.AVType == VIDEO {
|
||||
if i2.Type == av.H264 {
|
||||
if len(i2.SpropParameterSets) > 1 {
|
||||
if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(i2.SpropParameterSets[0], i2.SpropParameterSets[1]); err == nil {
|
||||
client.sps = i2.SpropParameterSets[0]
|
||||
client.pps = i2.SpropParameterSets[1]
|
||||
if m.AVType == VIDEO {
|
||||
if m.Type == av.H264 {
|
||||
if len(m.SpropParameterSets) > 1 {
|
||||
if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(m.SpropParameterSets[0], m.SpropParameterSets[1]); err == nil {
|
||||
client.sps = m.SpropParameterSets[0]
|
||||
client.pps = m.SpropParameterSets[1]
|
||||
client.CodecData = append(client.CodecData, codecData)
|
||||
}
|
||||
} else {
|
||||
client.CodecData = append(client.CodecData, h264parser.CodecData{})
|
||||
client.WaitCodec = true
|
||||
}
|
||||
client.FPS = i2.FPS
|
||||
client.FPS = m.FPS
|
||||
client.videoCodec = av.H264
|
||||
} else if i2.Type == av.H265 {
|
||||
if len(i2.SpropVPS) > 1 && len(i2.SpropSPS) > 1 && len(i2.SpropPPS) > 1 {
|
||||
if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(i2.SpropVPS, i2.SpropSPS, i2.SpropPPS); err == nil {
|
||||
client.vps = i2.SpropVPS
|
||||
client.sps = i2.SpropSPS
|
||||
client.pps = i2.SpropPPS
|
||||
} else if m.Type == av.H265 {
|
||||
if len(m.SpropVPS) > 1 && len(m.SpropSPS) > 1 && len(m.SpropPPS) > 1 {
|
||||
if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(m.SpropVPS, m.SpropSPS, m.SpropPPS); err == nil {
|
||||
client.vps = m.SpropVPS
|
||||
client.sps = m.SpropSPS
|
||||
client.pps = m.SpropPPS
|
||||
client.CodecData = append(client.CodecData, codecData)
|
||||
}
|
||||
} else {
|
||||
@ -175,23 +179,23 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
// client.WaitCodec = true
|
||||
// client.videoCodec = av.H264
|
||||
} else {
|
||||
client.Println("SDP Video Codec Type Not Supported", i2.Type)
|
||||
client.Println("SDP Video Codec Type Not Supported", m.Type)
|
||||
}
|
||||
client.videoIDX = int8(len(client.CodecData) - 1)
|
||||
client.videoID = ch
|
||||
}
|
||||
if i2.AVType == AUDIO {
|
||||
if m.AVType == AUDIO {
|
||||
client.audioID = ch
|
||||
var CodecData av.AudioCodecData
|
||||
switch i2.Type {
|
||||
switch m.Type {
|
||||
case av.AAC:
|
||||
CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(i2.Config)
|
||||
CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(m.Config)
|
||||
if err == nil {
|
||||
client.Println("Audio AAC bad config")
|
||||
}
|
||||
case av.OPUS:
|
||||
var cl av.ChannelLayout
|
||||
switch i2.ChannelCount {
|
||||
switch m.ChannelCount {
|
||||
case 1:
|
||||
cl = av.CH_MONO
|
||||
case 2:
|
||||
@ -199,7 +203,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
default:
|
||||
cl = av.CH_MONO
|
||||
}
|
||||
CodecData = codec.NewOpusCodecData(i2.TimeScale, cl)
|
||||
CodecData = codec.NewOpusCodecData(m.TimeScale, cl)
|
||||
case av.PCM_MULAW:
|
||||
CodecData = codec.NewPCMMulawCodecData()
|
||||
case av.PCM_ALAW:
|
||||
@ -207,21 +211,21 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
case av.PCM:
|
||||
CodecData = codec.NewPCMCodecData()
|
||||
default:
|
||||
client.Println("Audio Codec", i2.Type, "not supported")
|
||||
client.Println("Audio Codec", m.Type, "not supported")
|
||||
}
|
||||
if CodecData != nil {
|
||||
client.CodecData = append(client.CodecData, CodecData)
|
||||
client.audioIDX = int8(len(client.CodecData) - 1)
|
||||
client.audioCodec = CodecData.Type()
|
||||
if i2.TimeScale != 0 {
|
||||
client.AudioTimeScale = int64(i2.TimeScale)
|
||||
if m.TimeScale != 0 {
|
||||
client.AudioTimeScale = int64(m.TimeScale)
|
||||
}
|
||||
}
|
||||
}
|
||||
ch += 2
|
||||
}
|
||||
//test := map[string]string{"Scale": "1.000000", "Speed": "1.000000", "Range": "clock=20210929T210000Z-20210929T211000Z"}
|
||||
err = client.request(PLAY, nil, client.control, false, false)
|
||||
// test := map[string]string{"Scale": "1.000000", "Speed": "1.000000", "Range": "clock=20210929T210000Z-20210929T211000Z"}
|
||||
err = client.request(PLAY, nil, client.control, nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -229,6 +233,72 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func Publish(options RTSPClientOptions, sdp []byte) (*RTSPClient, error) {
|
||||
client := &RTSPClient{
|
||||
headers: make(map[string]string),
|
||||
Signals: make(chan int, 100),
|
||||
OutgoingProxyQueue: make(chan *[]byte, 3000),
|
||||
OutgoingPacketQueue: make(chan *av.Packet, 3000),
|
||||
BufferRtpPacket: bytes.NewBuffer([]byte{}),
|
||||
videoID: -1,
|
||||
audioID: -2,
|
||||
videoIDX: -1,
|
||||
audioIDX: -2,
|
||||
options: options,
|
||||
AudioTimeScale: 8000,
|
||||
}
|
||||
client.headers["User-Agent"] = "Lavf58.20.100"
|
||||
err := client.parseURL(html.UnescapeString(client.options.URL))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := net.DialTimeout("tcp", client.pURL.Host, client.options.DialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.conn = conn
|
||||
client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = client.request(ANNOUNCE, map[string]string{"Content-Type": "application/sdp", "Content-Length": fmt.Sprintf("%d", len(sdp))}, client.pURL.String(), sdp, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//err = client.request(OPTIONS, nil, client.pURL.String(), nil, false, false)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//
|
||||
//err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), nil,false, false)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
// interleved
|
||||
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=0-1;mode=record"}, client.pURL.String(), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = client.request(RECORD, map[string]string{"Range": "npt=0.000-"}, client.pURL.String(), nil, false, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, err
|
||||
}
|
||||
|
||||
func (client *RTSPClient) SendRTP(pack []byte) {
|
||||
client.conn.Write(pack)
|
||||
}
|
||||
|
||||
func (client *RTSPClient) ControlTrack(track string) string {
|
||||
if strings.Contains(track, "rtsp://") {
|
||||
return track
|
||||
@ -254,7 +324,7 @@ func (client *RTSPClient) startStream() {
|
||||
return
|
||||
}
|
||||
if int(time.Now().Sub(timer).Seconds()) > 25 {
|
||||
err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, false, true)
|
||||
err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, nil, false, true)
|
||||
if err != nil {
|
||||
client.Println("RTSP Client RTP keep-alive", err)
|
||||
return
|
||||
@ -277,10 +347,10 @@ func (client *RTSPClient) startStream() {
|
||||
return
|
||||
}
|
||||
content := make([]byte, length+4)
|
||||
content[0] = header[0]
|
||||
content[1] = header[1]
|
||||
content[2] = header[2]
|
||||
content[3] = header[3]
|
||||
content[0] = header[0] // magic 0x24
|
||||
content[1] = header[1] // channel
|
||||
content[2] = header[2] // length byte high
|
||||
content[3] = header[3] // length byte low
|
||||
n, rerr := io.ReadFull(client.connRW, content[4:length+4])
|
||||
if rerr != nil || n != int(length) {
|
||||
client.Println("RTSP Client RTP ReadFull", err)
|
||||
@ -332,6 +402,7 @@ func (client *RTSPClient) startStream() {
|
||||
break
|
||||
}
|
||||
}
|
||||
client.Println("RTSP RESPONSE<<< ", string(responseTmp))
|
||||
default:
|
||||
client.Println("RTSP Client RTP Read DeSync")
|
||||
return
|
||||
@ -339,7 +410,28 @@ func (client *RTSPClient) startStream() {
|
||||
}
|
||||
}
|
||||
|
||||
func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, one bool, nores bool) (err error) {
|
||||
func (client *RTSPClient) Options() error {
|
||||
err := client.request(OPTIONS, nil, client.pURL.String(), nil, false, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *RTSPClient) Play(customHeaders map[string]string) error {
|
||||
err := client.request(PLAY, customHeaders, client.pURL.String(), nil, false, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *RTSPClient) Pause() error {
|
||||
err := client.request(PAUSE, nil, client.pURL.String(), nil, false, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *RTSPClient) Announce(sdp []byte) error {
|
||||
err := client.request(ANNOUNCE, nil, client.pURL.String(), sdp, false, true)
|
||||
return err
|
||||
}
|
||||
|
||||
// request nores 如果流已经在拉的话,nores必须为true,否则client.connRW.ReadLine() 和 startStream中会因为并发读取导致slice的数量不够,导致outbounds of slice panic.
|
||||
func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, body []byte, one bool, nores bool) (err error) {
|
||||
err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
|
||||
if err != nil {
|
||||
return
|
||||
@ -360,6 +452,11 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
|
||||
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
|
||||
}
|
||||
builder.WriteString(fmt.Sprintf("\r\n"))
|
||||
if body != nil && len(body) > 0 {
|
||||
builder.WriteString(string(body))
|
||||
builder.WriteString(fmt.Sprintf("\r\n"))
|
||||
}
|
||||
|
||||
client.Println(builder.String())
|
||||
s := builder.String()
|
||||
_, err = client.connRW.WriteString(s)
|
||||
@ -401,6 +498,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
|
||||
res[splits[0]] = splits[1]
|
||||
}
|
||||
}
|
||||
client.Println(builder.String())
|
||||
if val, ok := res["WWW-Authenticate"]; ok {
|
||||
if strings.Contains(val, "Digest") {
|
||||
client.realm = stringInBetween(val, "realm=\"", "\"")
|
||||
@ -411,7 +509,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
|
||||
client.clientBasic = true
|
||||
}
|
||||
if !one {
|
||||
err = client.request(method, customHeaders, uri, true, false)
|
||||
err = client.request(method, customHeaders, uri, nil, true, false)
|
||||
return
|
||||
}
|
||||
err = errors.New("RTSP Client Unauthorized 401")
|
||||
@ -468,7 +566,7 @@ func (client *RTSPClient) request(method string, customHeaders map[string]string
|
||||
func (client *RTSPClient) Close() {
|
||||
if client.conn != nil {
|
||||
client.conn.SetDeadline(time.Now().Add(time.Second))
|
||||
client.request(TEARDOWN, nil, client.control, false, true)
|
||||
client.request(TEARDOWN, nil, client.control, nil, false, true)
|
||||
err := client.conn.Close()
|
||||
client.Println("RTSP Client Close", err)
|
||||
}
|
||||
@ -520,11 +618,20 @@ func stringInBetween(str string, start string, end string) (result string) {
|
||||
func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
content := *payloadRAW
|
||||
firstByte := content[4]
|
||||
version := (firstByte >> 6) & 0x03
|
||||
padding := (firstByte>>5)&1 == 1
|
||||
extension := (firstByte>>4)&1 == 1
|
||||
CSRCCnt := int(firstByte & 0x0f)
|
||||
marker := (content[5]>>7)&1 == 1
|
||||
payloadType := content[5] & 0x7f
|
||||
SequenceNumber := int(binary.BigEndian.Uint16(content[6:8]))
|
||||
timestamp := int64(binary.BigEndian.Uint32(content[8:12]))
|
||||
SSRC := binary.BigEndian.Uint32(content[12:16])
|
||||
if Debug {
|
||||
log.Printf("version: %d, padding: %v, extension: %v, csrccnt: %d, marker: %v, payload type: %d, sequence number: %d. timestamp: %d, ssrc: %d",
|
||||
version, padding, extension, CSRCCnt, marker, payloadType, SequenceNumber, timestamp, SSRC)
|
||||
}
|
||||
|
||||
offset := RTPHeaderSize
|
||||
|
||||
end := len(content)
|
||||
@ -535,8 +642,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
return nil, false
|
||||
}
|
||||
if extension && end-offset >= 4 {
|
||||
extLen := 4 * int(binary.BigEndian.Uint16(content[4+offset+2:]))
|
||||
offset += 4
|
||||
extWords := int(binary.BigEndian.Uint16(content[4+offset+2:]))
|
||||
extLen := 4 * extWords
|
||||
offset += 4 // this is profile(2 byte) + ext length(2 byte)
|
||||
if Debug {
|
||||
for i := 0; i < extWords; i++ {
|
||||
ext := binary.BigEndian.Uint32(content[offset+4*i : offset+4*(i+1)])
|
||||
log.Printf("extension: %d: %d", i+1, ext)
|
||||
if i == 1 {
|
||||
log.Print("timestamp: ", time.Unix(int64(ext), 0).String())
|
||||
}
|
||||
}
|
||||
}
|
||||
if end-offset >= extLen {
|
||||
offset += extLen
|
||||
}
|
||||
@ -577,6 +694,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
CompositionTime: time.Duration(1) * time.Millisecond,
|
||||
Idx: client.videoIDX,
|
||||
IsKeyFrame: false,
|
||||
NaluType: naluType,
|
||||
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
|
||||
Time: time.Duration(timestamp/90) * time.Millisecond,
|
||||
})
|
||||
@ -604,6 +722,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
CompositionTime: time.Duration(1) * time.Millisecond,
|
||||
Idx: client.videoIDX,
|
||||
IsKeyFrame: naluType == h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
|
||||
NaluType: naluType,
|
||||
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
|
||||
Time: time.Duration(timestamp/90) * time.Millisecond,
|
||||
})
|
||||
@ -616,12 +735,14 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
|
||||
} else if client.videoCodec == av.H264 {
|
||||
naluType := nal[0] & 0x1f
|
||||
//log.Printf("nalu type: %d", naluType)
|
||||
switch {
|
||||
case naluType >= 1 && naluType <= 5:
|
||||
retmap = append(retmap, &av.Packet{
|
||||
Data: append(binSize(len(nal)), nal...),
|
||||
CompositionTime: time.Duration(1) * time.Millisecond,
|
||||
Idx: client.videoIDX,
|
||||
NaluType: naluType,
|
||||
IsKeyFrame: naluType == 5,
|
||||
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
|
||||
Time: time.Duration(timestamp/90) * time.Millisecond,
|
||||
@ -670,6 +791,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||
Duration: time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
|
||||
Idx: client.videoIDX,
|
||||
IsKeyFrame: naluTypef == 5,
|
||||
NaluType: naluTypef,
|
||||
Time: time.Duration(timestamp/90) * time.Millisecond,
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user