vdk/format/dvrip/client.go
kunmeng 0e65c9d1cc
Some checks failed
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (go) (push) Has been cancelled
将包名改为 git.r-2.top/kunmeng/vdk
2025-01-24 14:45:45 +08:00

503 lines
11 KiB
Go

package dvrip
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"html"
"net"
"net/url"
"strconv"
"strings"
"time"
"git.r-2.top/kunmeng/vdk/av"
"git.r-2.top/kunmeng/vdk/codec"
"git.r-2.top/kunmeng/vdk/codec/h264parser"
)
const (
SignalStreamStop = iota
SignalCodecUpdate
)
type Client struct {
conn net.Conn
login string
password string
host string
stream string
sequenceNumber int32
session int32
aliveInterval time.Duration
CodecData []av.CodecData
OutgoingPacketQueue chan *av.Packet
Signals chan int
options ClientOptions
sps []byte
pps []byte
}
type ClientOptions struct {
Debug bool
URL string
DialTimeout time.Duration
ReadWriteTimeout time.Duration
DisableAudio bool
}
// Dial func
func Dial(options ClientOptions) (*Client, error) {
client := &Client{
Signals: make(chan int, 100),
OutgoingPacketQueue: make(chan *av.Packet, 3000),
options: options,
}
err := client.parseURL(html.UnescapeString(client.options.URL))
if err != nil {
return nil, err
}
client.conn, err = net.DialTimeout("tcp", client.host, time.Second*2)
if err != nil {
return nil, err
}
err = client.conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
return nil, err
}
err = client.Login()
if err != nil {
return nil, err
}
err = client.SetTime()
if err != nil {
return nil, err
}
go client.Monitor()
return client, nil
}
// Close func
func (client *Client) Close() error {
err := client.conn.Close()
return err
}
// SetKeepAlive func
func (client *Client) SetKeepAlive() error {
body, err := json.Marshal(map[string]string{
"Name": "KeepAlive",
"SessionID": fmt.Sprintf("0x%08X", client.session),
})
if err != nil {
return err
}
err = client.send(codeKeepAlive, body)
if err != nil {
return err
}
return nil
}
// Monitor func
func (client *Client) Monitor() {
defer func() {
client.Signals <- SignalStreamStop
}()
_, _, err := client.Command(codeOPMonitor, map[string]interface{}{
"Action": "Claim",
"Parameter": map[string]interface{}{
"Channel": 0,
"CombinMode": "NONE",
"StreamType": client.stream,
"TransMode": "TCP",
},
})
if err != nil {
return
}
payload, err := json.Marshal(map[string]interface{}{
"Name": "OPMonitor",
"SessionID": fmt.Sprintf("0x%08X", client.session),
"OPMonitor": map[string]interface{}{
"Action": "Start",
"Parameter": map[string]interface{}{
"Channel": 0,
"CombinMode": "NONE",
"StreamType": client.stream,
"TransMode": "TCP",
},
},
})
err = client.send(1410, payload)
if err != nil {
return
}
var length uint32 = 0
var dataType uint32
timer := time.Now()
var fps int
for {
if time.Now().Sub(timer).Milliseconds() > client.aliveInterval.Milliseconds() {
err = client.SetKeepAlive()
if err != nil {
return
}
timer = time.Now()
}
_, body, err := client.recv(false)
if err != nil {
return
}
buf := bytes.NewReader(body)
err = binary.Read(buf, binary.BigEndian, &dataType)
if err != nil {
return
}
switch dataType {
case 0x1FC, 0x1FE:
frame := struct {
Media byte
FPS byte
Width byte
Height byte
DateTime uint32
Length uint32
}{}
err = binary.Read(buf, binary.LittleEndian, &frame)
fps = int(frame.FPS)
if err != nil {
return
}
var packet bytes.Buffer
if frame.Length > uint32(buf.Len()) {
need := frame.Length - uint32(buf.Len())
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
_, err := client.recvSize(&packet, need)
if err != nil {
return
}
} else {
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
}
if parseMediaType(dataType, frame.Media) == av.H264.String() {
packets, _ := h264parser.SplitNALUs(packet.Bytes())
for _, i2 := range packets {
naluType := i2[0] & 0x1f
switch {
case naluType >= 1 && naluType <= 5:
client.OutgoingPacketQueue <- &av.Packet{Duration: time.Duration(1000/fps) * time.Millisecond, Idx: 0, IsKeyFrame: naluType == 5, Data: append(binSize(len(i2)), i2...)}
case naluType == 7:
client.CodecUpdateSPS(i2)
case naluType == 8:
client.CodecUpdatePPS(i2)
}
}
}
case 0x1FD:
err = binary.Read(buf, binary.LittleEndian, &length)
if err != nil {
return
}
var packet bytes.Buffer
if length > uint32(buf.Len()) {
need := length - uint32(buf.Len())
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
_, err := client.recvSize(&packet, need)
if err != nil {
return
}
} else {
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
}
packets, _ := h264parser.SplitNALUs(packet.Bytes())
for _, i2 := range packets {
naluType := i2[0] & 0x1f
switch {
case naluType >= 1 && naluType <= 5:
if fps != 0 {
client.OutgoingPacketQueue <- &av.Packet{Duration: time.Duration(1000/fps) * time.Millisecond, Idx: 0, IsKeyFrame: naluType == 5, Data: append(binSize(len(i2)), i2...)}
}
case naluType == 7:
client.CodecUpdateSPS(i2)
case naluType == 8:
client.CodecUpdatePPS(i2)
}
}
case 0x1FA, 0x1F9:
if client.options.DisableAudio {
continue
}
frame := struct {
Media byte
SampleRate byte
Length uint16
}{}
err = binary.Read(buf, binary.LittleEndian, &frame)
if err != nil {
return
}
var packet bytes.Buffer
if uint32(frame.Length) > uint32(buf.Len()) {
need := uint32(frame.Length) - uint32(buf.Len())
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
_, err := client.recvSize(&packet, need)
if err != nil {
return
}
} else {
_, err = buf.WriteTo(&packet)
if err != nil {
return
}
}
if parseMediaType(dataType, frame.Media) == av.PCM_ALAW.String() {
if client.CodecData != nil {
if len(client.CodecData) == 1 {
client.CodecUpdatePCMAlaw()
}
client.OutgoingPacketQueue <- &av.Packet{Duration: time.Duration(8000/packet.Len()) * time.Millisecond, Idx: 1, Data: packet.Bytes()}
}
}
case 0xFFD8FFE0:
default:
continue
}
}
}
func (client *Client) SetTime() error {
_, _, err := client.Command(codeOPTimeSetting, time.Now().Format("2006-01-02 15:04:05"))
return err
}
func (client *Client) Login() error {
body, err := json.Marshal(map[string]string{
"EncryptType": "MD5",
"LoginType": "DVRIP-WEB",
"PassWord": sofiaHash(client.password),
"UserName": client.login,
})
if err != nil {
return err
}
err = client.send(codeLogin, body)
if err != nil {
return err
}
_, resp, err := client.recv(true)
if err != nil {
return err
}
res := LoginResp{}
err = json.Unmarshal(resp, &res)
if err != nil {
return err
}
if (statusCode(res.Ret) != statusOK) && (statusCode(res.Ret) != statusUpgradeSuccessful) {
return fmt.Errorf("unexpected status code: %v - %v", res.Ret, statusCodes[statusCode(res.Ret)])
}
client.aliveInterval = time.Duration(res.AliveInterval) * time.Second
session, err := strconv.ParseUint(res.SessionID, 0, 32)
if err != nil {
return err
}
client.session = int32(session)
return nil
}
// Command func
func (client *Client) Command(command requestCode, data interface{}) (*Payload, []byte, error) {
params, err := json.Marshal(map[string]interface{}{
"Name": requestCodes[command],
"SessionID": fmt.Sprintf("0x%08X", client.session),
requestCodes[command]: data,
})
if err != nil {
return nil, nil, err
}
err = client.send(command, params)
if err != nil {
return nil, nil, err
}
resp, body, err := client.recv(true)
return resp, body, err
}
// send func
func (client *Client) send(msgID requestCode, data []byte) error {
var buf bytes.Buffer
if err := binary.Write(&buf, binary.LittleEndian, Payload{
Head: 255,
Version: 0,
Session: client.session,
SequenceNumber: client.sequenceNumber,
MsgID: int16(msgID),
BodyLength: int32(len(data)) + 2,
}); err != nil {
return err
}
err := client.conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
return err
}
err = binary.Write(&buf, binary.LittleEndian, data)
if err != nil {
return err
}
err = binary.Write(&buf, binary.LittleEndian, magicEnd)
if err != nil {
return err
}
_, err = client.conn.Write(buf.Bytes())
if err != nil {
return err
}
client.sequenceNumber++
return nil
}
// recvSize func
func (client *Client) recvSize(buffer *bytes.Buffer, size uint32) ([]byte, error) {
all := uint32(0)
for {
_, body, err := client.recv(false)
if err != nil {
return nil, err
}
all += uint32(len(body))
buffer.Write(body)
if all == size {
break
} else if all > size {
return nil, fmt.Errorf("invalid read size")
}
}
return nil, nil
}
// recv func
func (client *Client) recv(text bool) (*Payload, []byte, error) {
var p Payload
var b = make([]byte, 20)
err := client.conn.SetReadDeadline(time.Now().Add(5 * time.Second))
if err != nil {
return nil, nil, err
}
_, err = client.conn.Read(b)
if err != nil {
return nil, nil, err
}
err = binary.Read(bytes.NewReader(b), binary.LittleEndian, &p)
if err != nil {
return nil, nil, err
}
client.sequenceNumber += 1
if p.BodyLength <= 0 || p.BodyLength >= 100000 {
return nil, nil, fmt.Errorf("invalid bodylength: %v", p.BodyLength)
}
body := make([]byte, p.BodyLength)
err = binary.Read(client.conn, binary.LittleEndian, &body)
if err != nil {
return nil, nil, err
}
if text && len(body) > 2 && bytes.Compare(body[len(body)-2:], []byte{10, 0}) == 0 {
body = body[:len(body)-2]
}
return &p, body, nil
}
// parseURL func
func (client *Client) parseURL(rawURL string) error {
l, err := url.Parse(rawURL)
if err != nil {
return err
}
username := l.User.Username()
password, _ := l.User.Password()
l.User = nil
if l.Port() == "" {
l.Host = fmt.Sprintf("%s:%s", l.Host, "34567")
}
if username == "" {
username = "admin"
}
if password == "" {
password = "admin"
}
client.login = username
client.password = password
client.host = l.Host
client.stream = strings.Trim(l.EscapedPath(), "/")
return nil
}
func (client *Client) CodecUpdateSPS(val []byte) {
if bytes.Compare(val, client.sps) == 0 {
return
}
client.sps = val
if len(client.pps) == 0 {
return
}
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(val, client.pps)
if err != nil {
return
}
if len(client.CodecData) > 0 {
for i, i2 := range client.CodecData {
if i2.Type().IsVideo() {
client.CodecData[i] = codecData
}
}
} else {
client.CodecData = append(client.CodecData, codecData)
}
client.Signals <- SignalCodecUpdate
}
func (client *Client) CodecUpdatePPS(val []byte) {
if bytes.Compare(val, client.pps) == 0 {
return
}
client.pps = val
if len(client.sps) == 0 {
return
}
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(client.sps, val)
if err != nil {
return
}
if len(client.CodecData) > 0 {
for i, i2 := range client.CodecData {
if i2.Type().IsVideo() {
client.CodecData[i] = codecData
}
}
} else {
client.CodecData = append(client.CodecData, codecData)
}
client.Signals <- SignalCodecUpdate
}
func (client *Client) CodecUpdatePCMAlaw() {
CodecData := codec.NewPCMAlawCodecData()
client.CodecData = append(client.CodecData, CodecData)
client.Signals <- SignalCodecUpdate
}