add codec wait

This commit is contained in:
deepch
2021-08-29 13:34:06 +03:00
parent 54128f7cd2
commit 374dbda6f4
4 changed files with 204 additions and 151 deletions

View File

@@ -68,7 +68,6 @@ func (self *Server) handleConn(conn *Conn) (err error) {
if err = conn.prepare(stageCommandDone, 0); err != nil {
return
}
if conn.playing {
if self.HandlePlay != nil {
self.HandlePlay(conn)
@@ -136,51 +135,42 @@ const (
)
type Conn struct {
URL *url.URL
OnPlayOrPublish func(string, flvio.AMFMap) error
prober *flv.Prober
streams []av.CodecData
txbytes uint64
rxbytes uint64
bufr *bufio.Reader
bufw *bufio.Writer
ackn uint32
writebuf []byte
readbuf []byte
netconn net.Conn
txrxcount *txrxcount
writeMaxChunkSize int
readMaxChunkSize int
readAckSize uint32
readcsmap map[uint32]*chunkStream
chunkHeaderBuf []byte
chunkHeaderBufExt []byte
URL *url.URL
OnPlayOrPublish func(string, flvio.AMFMap) error
prober *flv.Prober
streams []av.CodecData
txbytes uint64
rxbytes uint64
bufr *bufio.Reader
bufw *bufio.Writer
ackn uint32
writebuf []byte
readbuf []byte
netconn net.Conn
txrxcount *txrxcount
writeMaxChunkSize int
readMaxChunkSize int
readAckSize uint32
readcsmap map[uint32]*chunkStream
isserver bool
publishing, playing bool
reading, writing bool
stage int
avmsgsid uint32
gotcommand bool
commandname string
commandtransid float64
commandobj flvio.AMFMap
commandparams []interface{}
gotmsg bool
timestamp uint32
msgdata []byte
msgtypeid uint8
datamsgvals []interface{}
avtag flvio.Tag
eventtype uint16
avmsgsid uint32
gotcommand bool
commandname string
commandtransid float64
commandobj flvio.AMFMap
commandparams []interface{}
gotmsg bool
timestamp uint32
msgdata []byte
msgtypeid uint8
datamsgvals []interface{}
avtag flvio.Tag
eventtype uint16
}
type txrxcount struct {
@@ -213,6 +203,8 @@ func NewConn(netconn net.Conn) *Conn {
conn.txrxcount = &txrxcount{ReadWriter: netconn}
conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096)
conn.chunkHeaderBuf = make([]byte, 265)
conn.chunkHeaderBufExt = make([]byte, 12+4+4)
return conn
}
@@ -353,16 +345,13 @@ func createURL(tcurl, app, play string) (u *url.URL) {
var CodecTypes = flv.CodecTypes
func (self *Conn) writeBasicConf() (err error) {
// > SetChunkSize
if err = self.writeSetChunkSize(1024 * 128); err != nil {
if err = self.writeSetChunkSize(65536); err != nil {
return
}
// > WindowAckSize
if err = self.writeWindowAckSize(5000000); err != nil {
if err = self.writeWindowAckSize(2500000); err != nil {
return
}
// > SetPeerBandwidth
if err = self.writeSetPeerBandwidth(5000000, 2); err != nil {
if err = self.writeSetPeerBandwidth(10000000, 2); err != nil {
return
}
return
@@ -370,8 +359,6 @@ func (self *Conn) writeBasicConf() (err error) {
func (self *Conn) readConnect() (err error) {
var connectpath string
// < connect("app")
if err = self.pollCommand(); err != nil {
return
}
@@ -405,11 +392,11 @@ func (self *Conn) readConnect() (err error) {
return
}
// > _result("NetConnection.Connect.Success")
if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid,
flvio.AMFMap{
"fmtVer": "FMS/3,0,1,123",
"capabilities": 31,
"mode": 1,
},
flvio.AMFMap{
"level": "status",
@@ -432,10 +419,8 @@ func (self *Conn) readConnect() (err error) {
if self.gotcommand {
switch self.commandname {
// < createStream
case "createStream":
self.avmsgsid = uint32(1)
// > _result(streamid)
if err = self.writeCommandMsg(3, 0, "_result", self.commandtransid, nil, self.avmsgsid); err != nil {
return
}
@@ -443,7 +428,6 @@ func (self *Conn) readConnect() (err error) {
return
}
// < publish("path")
case "publish":
if Debug {
fmt.Println("rtmp: < publish")
@@ -460,7 +444,6 @@ func (self *Conn) readConnect() (err error) {
cberr = self.OnPlayOrPublish(self.commandname, connectparams)
}
// > onStatus()
if err = self.writeCommandMsg(5, self.avmsgsid,
"onStatus", self.commandtransid, nil,
flvio.AMFMap{
@@ -486,7 +469,6 @@ func (self *Conn) readConnect() (err error) {
self.stage++
return
// < play("path")
case "play":
if Debug {
fmt.Println("rtmp: < play")
@@ -498,12 +480,10 @@ func (self *Conn) readConnect() (err error) {
}
playpath, _ := self.commandparams[0].(string)
// > streamBegin(streamid)
if err = self.writeStreamBegin(self.avmsgsid); err != nil {
return
}
// > onStatus()
if err = self.writeCommandMsg(5, self.avmsgsid,
"onStatus", self.commandtransid, nil,
flvio.AMFMap{
@@ -515,7 +495,6 @@ func (self *Conn) readConnect() (err error) {
return
}
// > |RtmpSampleAccess()
if err = self.writeDataMsg(5, self.avmsgsid,
"|RtmpSampleAccess", true, true,
); err != nil {
@@ -599,7 +578,6 @@ func (self *Conn) writeConnect(path string) (err error) {
return
}
// > connect("app")
if Debug {
fmt.Printf("rtmp: > connect('%s') host=%s\n", path, self.URL.Host)
}
@@ -627,7 +605,6 @@ func (self *Conn) writeConnect(path string) (err error) {
return
}
if self.gotcommand {
// < _result("NetConnection.Connect.Success")
if self.commandname == "_result" {
var ok bool
var errmsg string
@@ -664,7 +641,6 @@ func (self *Conn) connectPublish() (err error) {
transid := 2
// > createStream()
if Debug {
fmt.Printf("rtmp: > createStream()\n")
}
@@ -682,7 +658,6 @@ func (self *Conn) connectPublish() (err error) {
return
}
if self.gotcommand {
// < _result(avmsgsid) of createStream
if self.commandname == "_result" {
var ok bool
if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok {
@@ -694,7 +669,6 @@ func (self *Conn) connectPublish() (err error) {
}
}
// > publish('app')
if Debug {
fmt.Printf("rtmp: > publish('%s')\n", publishpath)
}
@@ -720,7 +694,6 @@ func (self *Conn) connectPlay() (err error) {
return
}
// > createStream()
if Debug {
fmt.Printf("rtmp: > createStream()\n")
}
@@ -728,7 +701,6 @@ func (self *Conn) connectPlay() (err error) {
return
}
// > SetBufferLength 0,100ms
if err = self.writeSetBufferLength(0, 100); err != nil {
return
}
@@ -742,7 +714,6 @@ func (self *Conn) connectPlay() (err error) {
return
}
if self.gotcommand {
// < _result(avmsgsid) of createStream
if self.commandname == "_result" {
var ok bool
if ok, self.avmsgsid = self.checkCreateStreamResult(); !ok {
@@ -754,7 +725,6 @@ func (self *Conn) connectPlay() (err error) {
}
}
// > play('app')
if Debug {
fmt.Printf("rtmp: > play('%s')\n", playpath)
}
@@ -868,7 +838,6 @@ func (self *Conn) WritePacket(pkt av.Packet) (err error) {
if err = self.writeAVTag(tag, int32(timestamp)); err != nil {
return
}
return
}
@@ -889,13 +858,10 @@ func (self *Conn) WriteHeader(streams []av.CodecData) (err error) {
return
}
// > onMetaData()
if err = self.writeDataMsg(5, self.avmsgsid, "onMetaData", metadata); err != nil {
return
}
// > Videodata(decoder config)
// > Audiodata(decoder config)
for _, stream := range streams {
var ok bool
var tag flvio.Tag
@@ -983,6 +949,92 @@ func (self *Conn) writeAMF0Msg(msgtypeid uint8, csid, msgsid uint32, args ...int
_, err = self.bufw.Write(b[:n])
return
}
func (self *Conn) fillChunk3Header(b []byte, csid uint32, timestamp uint32) (n int) {
b[n] = (byte(csid) & 0x3f) | 0xC0
n++
if timestamp >= 0xffffff {
pio.PutU32BE(b[n:], timestamp)
n += 4
}
return
}
func (self *Conn) fillChunk0Header(b []byte, csid uint32, timestamp uint32, msgtypeid uint8, msgsid uint32, msgdatalen int) (n int) {
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | timestamp |message length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message length (cont) |message type id| msg stream id |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | message stream id (cont) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// Figure 9 Chunk Message Header Type 0
b[n] = byte(csid) & 0x3f
n++
if timestamp < 0xffffff {
pio.PutU24BE(b[n:], uint32(timestamp))
} else {
pio.PutU24BE(b[n:], uint32(0xffffff))
}
n += 3
pio.PutU24BE(b[n:], uint32(msgdatalen))
n += 3
b[n] = msgtypeid
n++
pio.PutU32LE(b[n:], msgsid)
n += 4
if timestamp >= 0xffffff {
pio.PutU32BE(b[n:], timestamp)
n += 4
}
if Debug {
fmt.Printf("rtmp: write chunk msgdatalen=%d msgsid=%d\n", msgdatalen, msgsid)
}
return
}
func (self *Conn) weiteAVTagtoChunk(csid uint32, timestamp uint32, msgtypeid uint8, msgsid uint32, msgdatalen int, tag flvio.Tag) (n int, err error) {
pos := 0
sn := 0
last := self.writeMaxChunkSize
hdrlen := tag.FillHeader(self.chunkHeaderBufExt)
tag.Data = append(self.chunkHeaderBufExt[:hdrlen], tag.Data...)
msgdatalen = len(tag.Data)
end := msgdatalen
for msgdatalen+hdrlen > 0 {
if pos == 0 {
n = self.fillChunk0Header(self.chunkHeaderBuf, csid, timestamp, msgtypeid, msgsid, msgdatalen)
n, err = self.bufw.Write(self.chunkHeaderBuf[:n])
if err != nil {
return
}
} else {
n := self.fillChunk3Header(self.chunkHeaderBuf, csid, timestamp)
_, err = self.bufw.Write(self.chunkHeaderBuf[:n])
}
if msgdatalen > self.writeMaxChunkSize {
if sn, err = self.bufw.Write(tag.Data[pos:last]); err != nil {
return
}
pos += sn
last += sn
msgdatalen -= sn
continue
}
if sn, err = self.bufw.Write(tag.Data[pos:end]); err != nil {
return
}
pos += sn
msgdatalen -= sn
return
}
return
}
func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) {
var msgtypeid uint8
@@ -1000,28 +1052,8 @@ func (self *Conn) writeAVTag(tag flvio.Tag, ts int32) (err error) {
csid = 7
data = tag.Data
}
actualChunkHeaderLength := chunkHeaderLength
if uint32(ts) > FlvTimestampMax {
actualChunkHeaderLength += 4
}
b := self.tmpwbuf(actualChunkHeaderLength + flvio.MaxTagSubHeaderLength)
hdrlen := tag.FillHeader(b[actualChunkHeaderLength:])
self.fillChunkHeader(b, csid, ts, msgtypeid, self.avmsgsid, hdrlen+len(data))
n := hdrlen + actualChunkHeaderLength
if n+len(data) > self.writeMaxChunkSize {
if err = self.writeSetChunkSize(n + len(data)); err != nil {
return
}
}
if _, err = self.bufw.Write(b[:n]); err != nil {
return
}
_, err = self.bufw.Write(data)
return
_, err = self.weiteAVTagtoChunk(csid, uint32(ts), msgtypeid, self.avmsgsid, len(data), tag)
return err
}
func (self *Conn) writeStreamBegin(msgsid uint32) (err error) {
@@ -1529,20 +1561,12 @@ func (self *Conn) handshakeClient() (err error) {
C0C1C2 := random[:1536*2+1]
C0 := C0C1C2[:1]
//C1 := C0C1C2[1:1536+1]
C0C1 := C0C1C2[:1536+1]
C2 := C0C1C2[1536+1:]
S0S1S2 := random[1536*2+1:]
//S0 := S0S1S2[:1]
S1 := S0S1S2[1 : 1536+1]
//S0S1 := S0S1S2[:1536+1]
//S2 := S0S1S2[1536+1:]
C0[0] = 3
//hsCreate01(C0C1, hsClientFullKey)
// > C0C1
if _, err = self.bufw.Write(C0C1); err != nil {
return
}
@@ -1550,7 +1574,6 @@ func (self *Conn) handshakeClient() (err error) {
return
}
// < S0S1S2
if _, err = io.ReadFull(self.bufr, S0S1S2); err != nil {
return
}
@@ -1565,7 +1588,6 @@ func (self *Conn) handshakeClient() (err error) {
C2 = S1
}
// > C2
if _, err = self.bufw.Write(C2); err != nil {
return
}
@@ -1582,14 +1604,12 @@ func (self *Conn) handshakeServer() (err error) {
C1 := C0C1C2[1 : 1536+1]
C0C1 := C0C1C2[:1536+1]
C2 := C0C1C2[1536+1:]
S0S1S2 := random[1536*2+1:]
S0 := S0S1S2[:1]
S1 := S0S1S2[1 : 1536+1]
S0S1 := S0S1S2[:1536+1]
S2 := S0S1S2[1536+1:]
// < C0C1
if _, err = io.ReadFull(self.bufr, C0C1); err != nil {
return
}
@@ -1619,7 +1639,6 @@ func (self *Conn) handshakeServer() (err error) {
copy(S2, C2)
}
// > S0S1S2
if _, err = self.bufw.Write(S0S1S2); err != nil {
return
}
@@ -1627,7 +1646,6 @@ func (self *Conn) handshakeServer() (err error) {
return
}
// < C2
if _, err = io.ReadFull(self.bufr, C2); err != nil {
return
}

View File

@@ -86,6 +86,7 @@ type RTSPClient struct {
PreVideoTS int64
PreSequenceNumber int
FPS int
WaitCodec bool
}
type RTSPClientOptions struct {
@@ -153,6 +154,7 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
}
} else {
client.CodecData = append(client.CodecData, h264parser.CodecData{})
client.WaitCodec = true
}
client.FPS = i2.FPS
client.videoCodec = av.H264