proxy rtp packet

This commit is contained in:
deepch 2021-09-25 17:37:55 +03:00
parent 8f395d64be
commit 24b5a0e410
2 changed files with 351 additions and 0 deletions

210
format/rtspv2/proxy.go Normal file
View File

@ -0,0 +1,210 @@
package rtspv2
import (
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/google/uuid"
)
var Debug bool
type ProxyConn struct {
URL *url.URL
netconn net.Conn
readbuf []byte
writebuf []byte
sdp []byte
playing bool
options bool
cseq int
session string
protocol int
}
type Proxy struct {
Addr string
HandleConn func(*ProxyConn)
HandleOptions func(*ProxyConn)
HandlePlay func(*ProxyConn)
}
func NewProxyConn(netconn net.Conn) *ProxyConn {
conn := &ProxyConn{}
conn.netconn = netconn
conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096)
conn.session = uuid.New().String()
return conn
}
func (self *ProxyConn) Close() (err error) {
return nil
}
func (self *ProxyConn) WritePacket(pkt *[]byte) (err error) {
err = self.netconn.SetDeadline(time.Now().Add(time.Second * 5))
if err != nil {
return err
}
_, err = self.netconn.Write(*pkt)
if err != nil {
return err
}
return nil
}
func (self *ProxyConn) WriteHeader(sdp []byte) {
self.sdp = sdp
}
func (self *ProxyConn) NetConn() net.Conn {
return self.netconn
}
func (self *Proxy) ListenAndServe() (err error) {
addr := self.Addr
if addr == "" {
addr = ":554"
}
var tcpaddr *net.TCPAddr
if tcpaddr, err = net.ResolveTCPAddr("tcp", addr); err != nil {
err = fmt.Errorf("rtsp: ListenAndServe: %s", err)
return
}
var listener *net.TCPListener
if listener, err = net.ListenTCP("tcp", tcpaddr); err != nil {
return
}
if Debug {
fmt.Println("rtsp: server: listening on", addr)
}
for {
var netconn net.Conn
if netconn, err = listener.Accept(); err != nil {
return
}
if Debug {
fmt.Println("rtsp: server: accepted")
}
conn := NewProxyConn(netconn)
go func() {
err := self.handleConn(conn)
if Debug {
fmt.Println("rtsp: server: client closed err:", err)
}
//defer conn.Close()
}()
}
}
func (self *Proxy) handleConn(conn *ProxyConn) (err error) {
if self.HandleConn != nil {
self.HandleConn(conn)
} else {
for {
if err = conn.prepare(); err != nil {
return
}
if conn.options {
if self.HandleOptions != nil {
self.HandleOptions(conn)
}
}
if conn.playing {
if self.HandlePlay != nil {
self.HandlePlay(conn)
}
}
}
}
return
}
func (self *ProxyConn) prepare() error {
self.options = false
self.cseq++
err := self.netconn.SetDeadline(time.Now().Add(time.Second * 5))
if err != nil {
return err
}
n, err := self.netconn.Read(self.readbuf)
if err != nil {
return err
}
allStringsSlice := strings.Split(string(self.readbuf[:n]), "\r\n")
if len(allStringsSlice) == 0 {
return errors.New("no cmd")
}
fistStringsSlice := strings.Split(allStringsSlice[0], " ")
if len(fistStringsSlice) == 0 {
return errors.New("no fist cmd")
}
switch fistStringsSlice[0] {
case OPTIONS:
if len(fistStringsSlice) < 2 {
return errors.New("return bad OPTIONS")
}
if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil {
return err
}
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
if err != nil {
return err
}
self.options = true
case SETUP:
if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") {
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\n\r\n"))
if err != nil {
return err
}
return nil
}
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\r\n"))
if err != nil {
return err
}
case DESCRIBE:
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"
_, err := self.netconn.Write([]byte(buf + string(self.sdp)))
if err != nil {
return err
}
case PLAY:
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
if err != nil {
return err
}
self.playing = true
default:
return errors.New("metod not found")
}
return nil
}

141
format/rtspv2/server.go Normal file
View File

@ -0,0 +1,141 @@
package rtspv2
import (
"fmt"
"math/rand"
"net"
"net/url"
"time"
"github.com/deepch/vdk/av"
)
const (
StartCodePS = 0x000001ba
StartCodeSYS = 0x000001bb
StartCodeMAP = 0x000001bc
StartCodeVideo = 0x000001e0
StartCodeAudio = 0x000001c0
MEPGProgramEndCode = 0x000001b9
)
const (
StreamIDVideo = 0xe0
StreamIDAudio = 0xc0
)
const (
UDPTransfer int = 0
TCPTransferActive int = 1
TCPTransferPassive int = 2
LocalCache int = 3
)
//
const (
StreamTypeH264 = 0x1b
StreamTypeH265 = 0x24
StreamTypeAAC = 0x90
)
type encPSPacket struct {
crc32 uint64
}
type Conn struct {
URL *url.URL
netconn net.Conn
readbuf []byte
writebuf []byte
playing bool
psEnc *encPSPacket
cseq int
ssrc uint32
protocol int
}
type Server struct {
Addr string
HandleDescribe func(*Conn)
HandleOptions func(*Conn)
HandleSetup func(*Conn)
HandlePlay func(*Conn)
HandleConn func(*Conn)
}
func NewConn(netconn net.Conn) *Conn {
conn := &Conn{}
conn.netconn = netconn
conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096)
conn.ssrc = rand.Uint32()
conn.protocol = TCPTransferPassive
return conn
}
func (self *Conn) Close() (err error) {
return nil
}
func (self *Conn) WritePacket(pkt *av.Packet) (err error) {
return nil
}
func timeToTs(tm time.Duration, timeScale int64) int64 {
return int64(tm * time.Duration(timeScale) / time.Second)
}
func (self *Conn) WriteHeader(codec []av.CodecData) (err error) {
return nil
}
func (self *Conn) NetConn() net.Conn {
return self.netconn
}
func (self *Server) ListenAndServe() (err error) {
addr := self.Addr
if addr == "" {
addr = ":554"
}
var tcpaddr *net.TCPAddr
if tcpaddr, err = net.ResolveTCPAddr("tcp", addr); err != nil {
err = fmt.Errorf("rtsp: ListenAndServe: %s", err)
return
}
var listener *net.TCPListener
if listener, err = net.ListenTCP("tcp", tcpaddr); err != nil {
return
}
if Debug {
fmt.Println("rtsp: server: listening on", addr)
}
for {
var netconn net.Conn
if netconn, err = listener.Accept(); err != nil {
return
}
if Debug {
fmt.Println("rtsp: server: accepted")
}
conn := NewConn(netconn)
go func() {
err := self.handleConn(conn)
if Debug {
fmt.Println("rtsp: server: client closed err:", err)
}
//defer conn.Close()
}()
}
}
func (self *Server) handleConn(conn *Conn) (err error) {
return
}
func (self *Conn) prepare() error {
return nil
}