From 24b5a0e4108c85cdbbf6cf81f9b4147d2e06d6d7 Mon Sep 17 00:00:00 2001 From: deepch Date: Sat, 25 Sep 2021 17:37:55 +0300 Subject: [PATCH] proxy rtp packet --- format/rtspv2/proxy.go | 210 ++++++++++++++++++++++++++++++++++++++++ format/rtspv2/server.go | 141 +++++++++++++++++++++++++++ 2 files changed, 351 insertions(+) create mode 100644 format/rtspv2/proxy.go create mode 100644 format/rtspv2/server.go diff --git a/format/rtspv2/proxy.go b/format/rtspv2/proxy.go new file mode 100644 index 0000000..26fe5d3 --- /dev/null +++ b/format/rtspv2/proxy.go @@ -0,0 +1,210 @@ +package rtspv2 + +import ( + "errors" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "time" + + "github.com/google/uuid" +) + +var Debug bool + +type ProxyConn struct { + URL *url.URL + netconn net.Conn + readbuf []byte + writebuf []byte + sdp []byte + playing bool + options bool + cseq int + session string + protocol int +} + +type Proxy struct { + Addr string + HandleConn func(*ProxyConn) + HandleOptions func(*ProxyConn) + HandlePlay func(*ProxyConn) +} + +func NewProxyConn(netconn net.Conn) *ProxyConn { + conn := &ProxyConn{} + conn.netconn = netconn + conn.writebuf = make([]byte, 4096) + conn.readbuf = make([]byte, 4096) + conn.session = uuid.New().String() + return conn +} + +func (self *ProxyConn) Close() (err error) { + return nil +} + +func (self *ProxyConn) WritePacket(pkt *[]byte) (err error) { + err = self.netconn.SetDeadline(time.Now().Add(time.Second * 5)) + if err != nil { + return err + } + _, err = self.netconn.Write(*pkt) + if err != nil { + return err + } + return nil +} + +func (self *ProxyConn) WriteHeader(sdp []byte) { + self.sdp = sdp +} + +func (self *ProxyConn) NetConn() net.Conn { + return self.netconn +} + +func (self *Proxy) ListenAndServe() (err error) { + addr := self.Addr + if addr == "" { + addr = ":554" + } + var tcpaddr *net.TCPAddr + if tcpaddr, err = net.ResolveTCPAddr("tcp", addr); err != nil { + err = fmt.Errorf("rtsp: ListenAndServe: %s", err) + return + } + + var listener *net.TCPListener + if listener, err = net.ListenTCP("tcp", tcpaddr); err != nil { + return + } + + if Debug { + fmt.Println("rtsp: server: listening on", addr) + } + + for { + var netconn net.Conn + if netconn, err = listener.Accept(); err != nil { + return + } + + if Debug { + fmt.Println("rtsp: server: accepted") + } + conn := NewProxyConn(netconn) + go func() { + err := self.handleConn(conn) + if Debug { + fmt.Println("rtsp: server: client closed err:", err) + } + //defer conn.Close() + }() + } +} + +func (self *Proxy) handleConn(conn *ProxyConn) (err error) { + if self.HandleConn != nil { + self.HandleConn(conn) + } else { + for { + if err = conn.prepare(); err != nil { + return + } + if conn.options { + if self.HandleOptions != nil { + self.HandleOptions(conn) + } + } + if conn.playing { + if self.HandlePlay != nil { + self.HandlePlay(conn) + } + } + } + } + + return +} + +func (self *ProxyConn) prepare() error { + + self.options = false + self.cseq++ + err := self.netconn.SetDeadline(time.Now().Add(time.Second * 5)) + if err != nil { + return err + } + + n, err := self.netconn.Read(self.readbuf) + if err != nil { + return err + } + + allStringsSlice := strings.Split(string(self.readbuf[:n]), "\r\n") + if len(allStringsSlice) == 0 { + return errors.New("no cmd") + } + + fistStringsSlice := strings.Split(allStringsSlice[0], " ") + + if len(fistStringsSlice) == 0 { + return errors.New("no fist cmd") + } + + switch fistStringsSlice[0] { + case OPTIONS: + + if len(fistStringsSlice) < 2 { + return errors.New("return bad OPTIONS") + } + if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil { + return err + } + _, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n")) + if err != nil { + return err + } + self.options = true + + case SETUP: + + if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") { + _, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\n\r\n")) + if err != nil { + return err + } + return nil + } + _, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\r\n")) + if err != nil { + return err + } + + case DESCRIBE: + + buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n" + _, err := self.netconn.Write([]byte(buf + string(self.sdp))) + if err != nil { + return err + } + + case PLAY: + + _, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n")) + if err != nil { + return err + } + self.playing = true + + default: + + return errors.New("metod not found") + + } + return nil +} diff --git a/format/rtspv2/server.go b/format/rtspv2/server.go new file mode 100644 index 0000000..1f1807a --- /dev/null +++ b/format/rtspv2/server.go @@ -0,0 +1,141 @@ +package rtspv2 + +import ( + "fmt" + "math/rand" + "net" + "net/url" + "time" + + "github.com/deepch/vdk/av" +) + +const ( + StartCodePS = 0x000001ba + StartCodeSYS = 0x000001bb + StartCodeMAP = 0x000001bc + StartCodeVideo = 0x000001e0 + StartCodeAudio = 0x000001c0 + MEPGProgramEndCode = 0x000001b9 +) +const ( + StreamIDVideo = 0xe0 + StreamIDAudio = 0xc0 +) + +const ( + UDPTransfer int = 0 + TCPTransferActive int = 1 + TCPTransferPassive int = 2 + LocalCache int = 3 +) + +// +const ( + StreamTypeH264 = 0x1b + StreamTypeH265 = 0x24 + StreamTypeAAC = 0x90 +) + +type encPSPacket struct { + crc32 uint64 +} + +type Conn struct { + URL *url.URL + netconn net.Conn + readbuf []byte + writebuf []byte + playing bool + psEnc *encPSPacket + cseq int + ssrc uint32 + protocol int +} + +type Server struct { + Addr string + HandleDescribe func(*Conn) + HandleOptions func(*Conn) + HandleSetup func(*Conn) + HandlePlay func(*Conn) + HandleConn func(*Conn) +} + +func NewConn(netconn net.Conn) *Conn { + conn := &Conn{} + conn.netconn = netconn + conn.writebuf = make([]byte, 4096) + conn.readbuf = make([]byte, 4096) + conn.ssrc = rand.Uint32() + conn.protocol = TCPTransferPassive + return conn +} + +func (self *Conn) Close() (err error) { + return nil +} + +func (self *Conn) WritePacket(pkt *av.Packet) (err error) { + return nil +} + +func timeToTs(tm time.Duration, timeScale int64) int64 { + return int64(tm * time.Duration(timeScale) / time.Second) +} + +func (self *Conn) WriteHeader(codec []av.CodecData) (err error) { + return nil +} + +func (self *Conn) NetConn() net.Conn { + return self.netconn +} + +func (self *Server) ListenAndServe() (err error) { + addr := self.Addr + if addr == "" { + addr = ":554" + } + var tcpaddr *net.TCPAddr + if tcpaddr, err = net.ResolveTCPAddr("tcp", addr); err != nil { + err = fmt.Errorf("rtsp: ListenAndServe: %s", err) + return + } + + var listener *net.TCPListener + if listener, err = net.ListenTCP("tcp", tcpaddr); err != nil { + return + } + + if Debug { + fmt.Println("rtsp: server: listening on", addr) + } + + for { + var netconn net.Conn + if netconn, err = listener.Accept(); err != nil { + return + } + + if Debug { + fmt.Println("rtsp: server: accepted") + } + conn := NewConn(netconn) + go func() { + err := self.handleConn(conn) + if Debug { + fmt.Println("rtsp: server: client closed err:", err) + } + //defer conn.Close() + }() + } +} + +func (self *Server) handleConn(conn *Conn) (err error) { + return +} + +func (self *Conn) prepare() error { + return nil +}