vdk/format/rtspv2/client.go
2024-03-23 17:47:59 +08:00

724 lines
20 KiB
Go

package rtspv2
import (
"bufio"
"bytes"
"crypto/md5"
"crypto/tls"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"html"
"io"
"log"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"github.com/deepch/vdk/format/rtsp/sdp"
)
const (
SignalStreamRTPStop = iota
SignalCodecUpdate
)
const (
VIDEO = "video"
AUDIO = "audio"
)
const (
RTPHeaderSize = 12
RTCPSenderReport = 200
RTCPReceiverReport = 201
)
const (
DESCRIBE = "DESCRIBE"
OPTIONS = "OPTIONS"
PLAY = "PLAY"
PAUSE = "PAUSE"
SETUP = "SETUP"
TEARDOWN = "TEARDOWN"
)
type RTSPClient struct {
control string
seq int
session string
realm string
nonce string
username string
password string
startVideoTS int64
startAudioTS int64
videoID int
audioID int
videoIDX int8
audioIDX int8
mediaSDP []sdp.Media
SDPRaw []byte
conn net.Conn
connRW *bufio.ReadWriter
pURL *url.URL
headers map[string]string
Signals chan int
OutgoingProxyQueue chan *[]byte
OutgoingPacketQueue chan *av.Packet
clientDigest bool
clientBasic bool
fuStarted bool
options RTSPClientOptions
BufferRtpPacket *bytes.Buffer
vps []byte
sps []byte
pps []byte
CodecData []av.CodecData
AudioTimeLine time.Duration
AudioTimeScale int64
audioCodec av.CodecType
videoCodec av.CodecType
PreAudioTS int64
PreVideoTS int64
PreSequenceNumber int
FPS int
WaitCodec bool
chTMP int
timestamp int64
sequenceNumber int
end int
offset int
realVideoTs int64
keyFrameRealVideoTs int64
keyFrameIterateDrua int64
preDuration time.Duration
status string
lastPausedTime time.Time
}
type RTSPClientOptions struct {
Debug bool
URL string
DialTimeout time.Duration
ReadWriteTimeout time.Duration
DisableAudio bool
OutgoingProxy bool
InsecureSkipVerify bool
}
func Dial(options RTSPClientOptions) (*RTSPClient, error) {
client := &RTSPClient{
headers: make(map[string]string),
Signals: make(chan int, 100),
OutgoingProxyQueue: make(chan *[]byte, 300),
OutgoingPacketQueue: make(chan *av.Packet, 300),
BufferRtpPacket: bytes.NewBuffer([]byte{}),
videoID: -1,
audioID: -2,
videoIDX: -1,
audioIDX: -2,
options: options,
AudioTimeScale: 8000,
}
client.headers["User-Agent"] = "Lavf58.76.100"
err := client.parseURL(html.UnescapeString(client.options.URL))
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", client.pURL.Host, client.options.DialTimeout)
if err != nil {
return nil, err
}
err = conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil {
return nil, err
}
if client.pURL.Scheme == "rtsps" {
tlsConn := tls.Client(conn, &tls.Config{InsecureSkipVerify: options.InsecureSkipVerify, ServerName: client.pURL.Hostname()})
err = tlsConn.Handshake()
if err != nil {
return nil, err
}
conn = tlsConn
}
client.conn = conn
client.connRW = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = client.request(OPTIONS, nil, client.pURL.String(), false, false)
if err != nil {
return nil, err
}
err = client.request(DESCRIBE, map[string]string{"Accept": "application/sdp"}, client.pURL.String(), false, false)
if err != nil {
return nil, err
}
for _, i2 := range client.mediaSDP {
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
//TODO check it
if strings.Contains(string(client.SDPRaw), "LaunchDigital") {
client.chTMP += 2
}
continue
}
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(client.chTMP) + "-" + strconv.Itoa(client.chTMP+1)}, client.ControlTrack(i2.Control), false, false)
if err != nil {
return nil, err
}
if i2.AVType == VIDEO {
if i2.Type == av.H264 {
if len(i2.SpropParameterSets) > 1 {
if codecData, err := h264parser.NewCodecDataFromSPSAndPPS(i2.SpropParameterSets[0], i2.SpropParameterSets[1]); err == nil {
client.sps = i2.SpropParameterSets[0]
client.pps = i2.SpropParameterSets[1]
client.CodecData = append(client.CodecData, codecData)
}
} else {
client.CodecData = append(client.CodecData, h264parser.CodecData{})
client.WaitCodec = true
}
client.FPS = i2.FPS
client.videoCodec = av.H264
} else if i2.Type == av.H265 {
if len(i2.SpropVPS) > 1 && len(i2.SpropSPS) > 1 && len(i2.SpropPPS) > 1 {
if codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(i2.SpropVPS, i2.SpropSPS, i2.SpropPPS); err == nil {
client.vps = i2.SpropVPS
client.sps = i2.SpropSPS
client.pps = i2.SpropPPS
client.CodecData = append(client.CodecData, codecData)
}
} else {
client.CodecData = append(client.CodecData, h265parser.CodecData{})
}
client.videoCodec = av.H265
} else {
client.Println("SDP Video Codec Type Not Supported", i2.Type)
}
client.videoIDX = int8(len(client.CodecData) - 1)
client.videoID = client.chTMP
}
if i2.AVType == AUDIO {
client.audioID = client.chTMP
var CodecData av.AudioCodecData
switch i2.Type {
case av.AAC:
CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(i2.Config)
if err == nil {
client.Println("Audio AAC bad config")
}
case av.OPUS:
var cl av.ChannelLayout
switch i2.ChannelCount {
case 1:
cl = av.CH_MONO
case 2:
cl = av.CH_STEREO
default:
cl = av.CH_MONO
}
CodecData = codec.NewOpusCodecData(i2.TimeScale, cl)
case av.PCM_MULAW:
CodecData = codec.NewPCMMulawCodecData()
case av.PCM_ALAW:
CodecData = codec.NewPCMAlawCodecData()
case av.PCM:
CodecData = codec.NewPCMCodecData()
default:
client.Println("Audio Codec", i2.Type, "not supported")
}
if CodecData != nil {
client.CodecData = append(client.CodecData, CodecData)
client.audioIDX = int8(len(client.CodecData) - 1)
client.audioCodec = CodecData.Type()
if i2.TimeScale != 0 {
client.AudioTimeScale = int64(i2.TimeScale)
}
}
}
client.chTMP += 2
}
customHeaders := map[string]string{"Range": "npt=0.00-"}
err = client.request(PLAY, customHeaders, client.control, false, false)
if err != nil {
return nil, err
}
go client.startStream()
return client, nil
}
func (client *RTSPClient) ControlTrack(track string) string {
if strings.Contains(track, "rtsp://") {
return track
}
if !strings.HasSuffix(client.control, "/") {
track = "/" + track
}
return client.control + track
}
func (client *RTSPClient) startStream() {
defer func() {
client.Signals <- SignalStreamRTPStop
}()
timer := time.Now()
oneb := make([]byte, 1)
header := make([]byte, 4)
for {
err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil {
client.Println("RTSP Client RTP SetDeadline", err)
return
}
if int(time.Since(timer).Seconds()) > 25 {
err := client.request(OPTIONS, map[string]string{"Require": "implicit-play"}, client.control, false, true)
if err != nil {
client.Println("RTSP Client RTP keep-alive", err)
return
}
timer = time.Now()
}
nb, err := io.ReadFull(client.connRW, header)
if err != nil || nb != 4 {
if client.status == PAUSE && strings.Contains(err.Error(), "timeout") {
if time.Since(client.lastPausedTime) >= time.Second*300 {
return
}
time.Sleep(100 * time.Millisecond)
continue
}
client.Println("RTSP Client RTP Read Header", err)
return
}
switch header[0] {
case 0x24:
length := int32(binary.BigEndian.Uint16(header[2:]))
if length > 65535 || length < 12 {
client.Println("RTSP Client RTP Incorrect Packet Size")
return
}
content := make([]byte, length+4)
content[0] = header[0]
content[1] = header[1]
content[2] = header[2]
content[3] = header[3]
n, rerr := io.ReadFull(client.connRW, content[4:length+4])
if rerr != nil || n != int(length) {
client.Println("RTSP Client RTP ReadFull", err)
return
}
//atomic.AddInt64(&client.Bitrate, int64(length+4))
if client.options.OutgoingProxy {
if len(client.OutgoingProxyQueue) < 2000 {
client.OutgoingProxyQueue <- &content
} else {
client.Println("RTSP Client OutgoingProxy Chanel Full")
return
}
}
pkt, got := client.RTPDemuxer(&content)
if !got {
continue
}
for _, i2 := range pkt {
if len(client.OutgoingPacketQueue) > 200 {
client.Println("RTSP Client OutgoingPacket Chanel Full")
return
}
client.OutgoingPacketQueue <- i2
}
case 0x52:
var responseTmp []byte
for {
n, rerr := io.ReadFull(client.connRW, oneb)
if rerr != nil || n != 1 {
client.Println("RTSP Client RTP Read Keep-Alive Header", rerr)
return
}
responseTmp = append(responseTmp, oneb...)
if (len(responseTmp) > 4 && bytes.Equal(responseTmp[len(responseTmp)-4:], []byte("\r\n\r\n"))) || len(responseTmp) > 768 {
if strings.Contains(string(responseTmp), "Content-Length:") {
si, err := strconv.Atoi(stringInBetween(string(responseTmp), "Content-Length: ", "\r\n"))
if err != nil {
client.Println("RTSP Client RTP Read Keep-Alive Content-Length", err)
return
}
cont := make([]byte, si)
_, err = io.ReadFull(client.connRW, cont)
if err != nil {
client.Println("RTSP Client RTP Read Keep-Alive ReadFull", err)
return
}
}
break
}
}
default:
client.Println("RTSP Client RTP Read DeSync")
return
}
}
}
func (client *RTSPClient) request(method string, customHeaders map[string]string, uri string, one bool, nores bool) (err error) {
err = client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
if err != nil {
return
}
client.seq++
builder := bytes.Buffer{}
builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, uri))
builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", client.seq))
if client.clientDigest {
builder.WriteString(fmt.Sprintf("Authorization: %s\r\n", client.createDigest(method, uri)))
}
for k, v := range customHeaders {
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
for k, v := range client.headers {
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
builder.WriteString("\r\n")
client.Println(builder.String())
s := builder.String()
_, err = client.connRW.WriteString(s)
if err != nil {
return
}
err = client.connRW.Flush()
if err != nil {
return
}
builder.Reset()
if !nores {
var isPrefix bool
var line []byte
var contentLen int
res := make(map[string]string)
for {
line, isPrefix, err = client.connRW.ReadLine()
if err != nil {
return
}
if strings.Contains(string(line), "RTSP/1.0") && (!strings.Contains(string(line), "200") && !strings.Contains(string(line), "401")) {
time.Sleep(1 * time.Second)
err = errors.New("Camera send status" + string(line))
return
}
builder.Write(line)
if !isPrefix {
builder.WriteString("\r\n")
}
if len(line) == 0 {
break
}
splits := strings.SplitN(string(line), ":", 2)
if len(splits) == 2 {
if splits[0] == "Content-length" {
splits[0] = "Content-Length"
}
res[splits[0]] = splits[1]
}
}
if val, ok := res["WWW-Authenticate"]; ok {
if strings.Contains(val, "Digest") {
client.realm = stringInBetween(val, "realm=\"", "\"")
client.nonce = stringInBetween(val, "nonce=\"", "\"")
client.clientDigest = true
} else if strings.Contains(val, "Basic") {
client.headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(client.username+":"+client.password))
client.clientBasic = true
}
if !one {
err = client.request(method, customHeaders, uri, true, false)
return
}
err = errors.New("RTSP Client Unauthorized 401")
return
}
if val, ok := res["Session"]; ok {
splits2 := strings.Split(val, ";")
client.session = strings.TrimSpace(splits2[0])
client.headers["Session"] = strings.TrimSpace(splits2[0])
}
if val, ok := res["Content-Base"]; ok {
client.control = strings.TrimSpace(val)
}
if val, ok := res["RTP-Info"]; ok {
splits := strings.Split(val, ",")
for _, v := range splits {
splits2 := strings.Split(v, ";")
for _, vs := range splits2 {
if strings.Contains(vs, "rtptime") {
splits3 := strings.Split(vs, "=")
if len(splits3) == 2 {
if client.startVideoTS == 0 {
ts, _ := strconv.Atoi(strings.TrimSpace(splits3[1]))
client.startVideoTS = int64(ts)
} else {
ts, _ := strconv.Atoi(strings.TrimSpace(splits3[1]))
client.startAudioTS = int64(ts)
}
}
}
}
}
}
if method == DESCRIBE {
if val, ok := res["Content-Length"]; ok {
contentLen, err = strconv.Atoi(strings.TrimSpace(val))
if err != nil {
return
}
client.SDPRaw = make([]byte, contentLen)
_, err = io.ReadFull(client.connRW, client.SDPRaw)
if err != nil {
return
}
builder.Write(client.SDPRaw)
_, client.mediaSDP = sdp.Parse(string(client.SDPRaw))
}
}
if method == SETUP {
//deep := stringInBetween(builder.String(), "interleaved=", ";")
if val, ok := res["Transport"]; ok {
splits2 := strings.Split(val, ";")
for _, vs := range splits2 {
if strings.Contains(vs, "interleaved") {
splits3 := strings.Split(vs, "=")
if len(splits3) == 2 {
splits4 := strings.Split(splits3[1], "-")
if len(splits4) == 2 {
if val, err := strconv.Atoi(splits4[0]); err == nil {
client.chTMP = val
}
}
}
}
}
}
}
client.Println(builder.String())
}
return
}
func (client *RTSPClient) Pause() error {
if err := client.request(PAUSE, nil, client.pURL.String(), false, true); err != nil {
return err
}
client.status = PAUSE
client.lastPausedTime = time.Now()
return nil
}
func (client *RTSPClient) Play(customHeaders map[string]string) error {
if err := client.request(PLAY, customHeaders, client.pURL.String(), false, true); err != nil {
return err
}
client.status = PLAY
return nil
}
func (client *RTSPClient) Seek(customHeaders map[string]string, target int64) error {
customHeaders["Range"] = fmt.Sprintf("npt=%d.00-", target)
if err := client.request(PLAY, customHeaders, client.pURL.String(), false, true); err != nil {
return err
}
client.status = PLAY
client.PreVideoTS = 0
return nil
}
func (client *RTSPClient) Close() {
if client.conn != nil {
client.conn.SetDeadline(time.Now().Add(time.Second))
client.request(TEARDOWN, nil, client.control, false, true)
err := client.conn.Close()
client.Println("RTSP Client Close", err)
}
}
func (client *RTSPClient) parseURL(rawURL string) error {
l, err := url.Parse(rawURL)
if err != nil {
return err
}
username := l.User.Username()
password, _ := l.User.Password()
l.User = nil
if l.Port() == "" {
l.Host = fmt.Sprintf("%s:%s", l.Host, "554")
}
if l.Scheme != "rtsp" && l.Scheme != "rtsps" {
l.Scheme = "rtsp"
}
client.pURL = l
client.username = username
client.password = password
client.control = l.String()
return nil
}
func (client *RTSPClient) createDigest(method string, uri string) string {
md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", client.username, client.realm, client.password))))
md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri))))
response := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, client.nonce, md5MethodURL))))
Authorization := fmt.Sprintf("Digest username=\"%s\", realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"", client.username, client.realm, client.nonce, uri, response)
return Authorization
}
func stringInBetween(str string, start string, end string) (result string) {
s := strings.Index(str, start)
if s == -1 {
return
}
str = str[s+len(start):]
e := strings.Index(str, end)
if e == -1 {
return
}
str = str[:e]
return str
}
func (client *RTSPClient) CodecUpdateSPS(val []byte) {
if client.videoCodec != av.H264 && client.videoCodec != av.H265 {
return
}
if bytes.Equal(val, client.sps) {
return
}
client.sps = val
if (client.videoCodec == av.H264 && len(client.pps) == 0) || (client.videoCodec == av.H265 && (len(client.vps) == 0 || len(client.pps) == 0)) {
return
}
var codecData av.VideoCodecData
var err error
switch client.videoCodec {
case av.H264:
client.Println("Codec Update SPS", val)
codecData, err = h264parser.NewCodecDataFromSPSAndPPS(val, client.pps)
if err != nil {
client.Println("Parse Codec Data Error", err)
return
}
case av.H265:
codecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(client.vps, val, client.pps)
if err != nil {
client.Println("Parse Codec Data Error", err)
return
}
}
if len(client.CodecData) > 0 {
for i, i2 := range client.CodecData {
if i2.Type().IsVideo() {
client.CodecData[i] = codecData
}
}
} else {
client.CodecData = append(client.CodecData, codecData)
}
client.Signals <- SignalCodecUpdate
}
func (client *RTSPClient) CodecUpdatePPS(val []byte) {
if client.videoCodec != av.H264 && client.videoCodec != av.H265 {
return
}
if bytes.Equal(val, client.pps) {
return
}
client.pps = val
if (client.videoCodec == av.H264 && len(client.sps) == 0) || (client.videoCodec == av.H265 && (len(client.vps) == 0 || len(client.sps) == 0)) {
return
}
var codecData av.VideoCodecData
var err error
switch client.videoCodec {
case av.H264:
client.Println("Codec Update PPS", val)
codecData, err = h264parser.NewCodecDataFromSPSAndPPS(client.sps, val)
if err != nil {
client.Println("Parse Codec Data Error", err)
return
}
case av.H265:
codecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(client.vps, client.sps, val)
if err != nil {
client.Println("Parse Codec Data Error", err)
return
}
}
if len(client.CodecData) > 0 {
for i, i2 := range client.CodecData {
if i2.Type().IsVideo() {
client.CodecData[i] = codecData
}
}
} else {
client.CodecData = append(client.CodecData, codecData)
}
client.Signals <- SignalCodecUpdate
}
func (client *RTSPClient) CodecUpdateVPS(val []byte) {
if client.videoCodec != av.H265 {
return
}
if bytes.Equal(val, client.vps) {
return
}
client.vps = val
if len(client.sps) == 0 || len(client.pps) == 0 {
return
}
codecData, err := h265parser.NewCodecDataFromVPSAndSPSAndPPS(val, client.sps, client.pps)
if err != nil {
client.Println("Parse Codec Data Error", err)
return
}
if len(client.CodecData) > 0 {
for i, i2 := range client.CodecData {
if i2.Type().IsVideo() {
client.CodecData[i] = codecData
}
}
} else {
client.CodecData = append(client.CodecData, codecData)
}
client.Signals <- SignalCodecUpdate
}
// Println mini logging functions
func (client *RTSPClient) Println(v ...interface{}) {
if client.options.Debug {
log.Println(v)
}
}
// binSize
func binSize(val int) []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(val))
return buf
}
func isRTCPPacket(content []byte) bool {
rtcpPacketType := content[5]
return rtcpPacketType == RTCPSenderReport || rtcpPacketType == RTCPReceiverReport
}