10 Commits

Author SHA1 Message Date
Andrey Semochkin
eaab841cfb Update mp4io.go 2023-11-20 10:00:15 +03:00
deepch
9d21d056dd test 2023-08-25 14:35:34 +03:00
deepch
5a989a5c40 test 2023-05-25 13:08:57 +03:00
deepch
0b08aa7224 test 2023-05-25 13:07:13 +03:00
deepch
9e244f72bf test 2023-05-25 12:35:12 +03:00
deepch
8d4be0821c test 2023-05-25 12:00:15 +03:00
deepch
8958197548 fix ip camera bug 2023-04-02 17:58:56 +03:00
deepch
def88cb695 test transcode slow 2023-03-22 22:01:39 +03:00
deepch
175afae1c1 test transcode slow 2023-03-22 21:59:06 +03:00
deepch
c923e0010b fix rtmp handshake flashphoner 2023-03-13 19:26:29 +03:00
9 changed files with 150 additions and 33 deletions

View File

@@ -0,0 +1,64 @@
package main
import (
"context"
"github.com/deepch/vdk/format/rtspv2"
"github.com/deepch/vdk/format/ts"
"log"
"os/exec"
"time"
)
func main() {
RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: "rtsp://url", DisableAudio: true, DialTimeout: 3 * time.Second, ReadWriteTimeout: 5 * time.Second, Debug: true, OutgoingProxy: false})
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, "ffmpeg", "-flags", "low_delay", "-analyzeduration", "1", "-fflags", "-nobuffer", "-probesize", "1024k", "-f", "mpegts", "-i", "-", "-vcodec", "libx264", "-preset", "ultrafast", "-bf", "0", "-f", "mpegts", "-max_muxing_queue_size", "400", "-pes_payload_size", "0", "pipe:1")
inPipe, _ := cmd.StdinPipe()
outPipe, _ := cmd.StdoutPipe()
//cmd.Stderr = os.Stderr
mux := ts.NewMuxer(inPipe)
demuxer := ts.NewDemuxer(outPipe)
codec := RTSPClient.CodecData
mux.WriteHeader(codec)
go func() {
imNewCodec, err := demuxer.Streams()
log.Println("new codec data", imNewCodec, err)
for i, data := range imNewCodec {
log.Println(i, data)
}
for {
pkt, err := demuxer.ReadPacket()
if err != nil {
log.Panic(err)
}
log.Println("im new pkt ===>", pkt.Idx, pkt.Time)
}
}()
cmd.Start()
var start bool
for {
select {
case signals := <-RTSPClient.Signals:
switch signals {
case rtspv2.SignalCodecUpdate:
//?
case rtspv2.SignalStreamRTPStop:
return
}
case packetAV := <-RTSPClient.OutgoingPacketQueue:
if packetAV.IsKeyFrame {
start = true
}
if !start {
continue
}
if err = mux.WritePacket(*packetAV); err != nil {
return
}
}
}
}

View File

@@ -387,6 +387,10 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
return return
} }
size := pio.U32BE(taghdr[0:]) size := pio.U32BE(taghdr[0:])
if size > 5242880 {
err = parseErr("len", 5242880, err)
return
}
tag := Tag(pio.U32BE(taghdr[4:])) tag := Tag(pio.U32BE(taghdr[4:]))
var atom Atom var atom Atom

View File

@@ -387,6 +387,12 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
return return
} }
size := pio.U32BE(taghdr[0:]) size := pio.U32BE(taghdr[0:])
if size == 0 {
err = fmt.Errorf("bad hdr size")
return
}
tag := Tag(pio.U32BE(taghdr[4:])) tag := Tag(pio.U32BE(taghdr[4:]))
var atom Atom var atom Atom

View File

@@ -1623,20 +1623,15 @@ func (self *Conn) handshakeServer() (err error) {
clitime := pio.U32BE(C1[0:4]) clitime := pio.U32BE(C1[0:4])
srvtime := clitime srvtime := clitime
srvver := uint32(0x0d0e0a0d) srvver := uint32(0x0d0e0a0d)
cliver := pio.U32BE(C1[4:8])
if cliver != 0 { var ok bool
var ok bool var digest []byte
var digest []byte if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); ok {
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); !ok {
err = fmt.Errorf("rtmp: handshake server: C1 invalid")
return
}
hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey) hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey)
hsCreate2(S2, digest) hsCreate2(S2, digest)
} else { } else {
copy(S1, C1) copy(S1, C2)
copy(S2, C2) copy(S2, C1)
} }
if _, err = self.bufw.Write(S0S1S2); err != nil { if _, err = self.bufw.Write(S0S1S2); err != nil {

View File

@@ -9,7 +9,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
//"log"
"net" "net"
"net/textproto" "net/textproto"
"net/url" "net/url"
@@ -147,7 +146,12 @@ func (self *Client) probe() (err error) {
} }
func (self *Client) prepare(stage int) (err error) { func (self *Client) prepare(stage int) (err error) {
var waitIdle int
for self.stage < stage { for self.stage < stage {
waitIdle++
if waitIdle > 20 {
return fmt.Errorf("codec not ready")
}
switch self.stage { switch self.stage {
case 0: case 0:
if err = self.Options(); err != nil { if err = self.Options(); err != nil {
@@ -695,7 +699,9 @@ func (self *Client) Describe() (streams []sdp.Media, err error) {
self.streams = []*Stream{} self.streams = []*Stream{}
for _, media := range medias { for _, media := range medias {
stream := &Stream{Sdp: media, client: self} stream := &Stream{Sdp: media, client: self}
stream.makeCodecData() if err = stream.makeCodecData(); err != nil && DebugRtsp {
fmt.Println("rtsp: makeCodecData error", err)
}
self.streams = append(self.streams, stream) self.streams = append(self.streams, stream)
streams = append(streams, media) streams = append(streams, media)
} }
@@ -767,7 +773,7 @@ func (self *Stream) timeScale() int {
func (self *Stream) makeCodecData() (err error) { func (self *Stream) makeCodecData() (err error) {
media := self.Sdp media := self.Sdp
if media.PayloadType >= 96 && media.PayloadType <= 127 { if (media.PayloadType >= 96 && media.PayloadType <= 127) || media.Type == av.H264 || media.Type == av.AAC {
switch media.Type { switch media.Type {
case av.H264: case av.H264:
for _, nalu := range media.SpropParameterSets { for _, nalu := range media.SpropParameterSets {

View File

@@ -150,9 +150,12 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, i2 := range client.mediaSDP { for _, i2 := range client.mediaSDP {
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) { if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
//TODO check it
if strings.Contains(string(client.SDPRaw), "LaunchDigital") {
client.chTMP += 2
}
continue continue
} }
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(client.chTMP) + "-" + strconv.Itoa(client.chTMP+1)}, client.ControlTrack(i2.Control), false, false) err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(client.chTMP) + "-" + strconv.Itoa(client.chTMP+1)}, client.ControlTrack(i2.Control), false, false)
@@ -945,14 +948,14 @@ func (client *RTSPClient) CodecUpdateVPS(val []byte) {
} }
//Println mini logging functions // Println mini logging functions
func (client *RTSPClient) Println(v ...interface{}) { func (client *RTSPClient) Println(v ...interface{}) {
if client.options.Debug { if client.options.Debug {
log.Println(v) log.Println(v)
} }
} }
//binSize // binSize
func binSize(val int) []byte { func binSize(val int) []byte {
buf := make([]byte, 4) buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(val)) binary.BigEndian.PutUint32(buf, uint32(val))

View File

@@ -25,6 +25,7 @@ type ProxyConn struct {
cseq int cseq int
session string session string
protocol int protocol int
in int
} }
type Proxy struct { type Proxy struct {
@@ -40,7 +41,6 @@ func NewProxyConn(netconn net.Conn) *ProxyConn {
conn.writebuf = make([]byte, 4096) conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096) conn.readbuf = make([]byte, 4096)
conn.session = uuid.New().String() conn.session = uuid.New().String()
conn.cseq = 1
return conn return conn
} }
@@ -156,6 +156,7 @@ func (self *ProxyConn) prepare() error {
return errors.New("no fist cmd") return errors.New("no fist cmd")
} }
cseq := strings.TrimSpace(stringInBetween(string(self.readbuf[:n]), "CSeq:", "\r\n"))
switch fistStringsSlice[0] { switch fistStringsSlice[0] {
case OPTIONS: case OPTIONS:
@@ -165,45 +166,46 @@ func (self *ProxyConn) prepare() error {
if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil { if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil {
return err return err
} }
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n")) _, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + cseq + "\r\n\r\n"))
if err != nil { if err != nil {
return err return err
} }
self.options = true self.options = true
case SETUP: case SETUP:
if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") { if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") {
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\n\r\n")) _, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\n\r\n"))
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\r\n")) _, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(self.in) + "-" + strconv.Itoa(self.in+1) + "\r\n\r\n"))
if err != nil { if err != nil {
return err return err
} }
self.in = self.in + 2
case DESCRIBE: case DESCRIBE:
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n" buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + cseq + "\r\n\r\n"
_, err := self.netconn.Write([]byte(buf + string(self.sdp))) _, err := self.netconn.Write([]byte(buf + string(self.sdp)))
if err != nil { if err != nil {
return err return err
} }
case PLAY: case PLAY:
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + cseq + "\r\n\r\n"))
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
if err != nil { if err != nil {
return err return err
} }
self.playing = true self.playing = true
case TEARDOWN:
self.netconn.Close()
return errors.New("exit")
default: default:
return errors.New("metod not found") return errors.New("metod not found " + fistStringsSlice[0])
} }
return nil return nil

View File

@@ -2,6 +2,7 @@ package ts
import ( import (
"fmt" "fmt"
"github.com/deepch/vdk/codec/h265parser"
"io" "io"
"time" "time"
@@ -11,7 +12,7 @@ import (
"github.com/deepch/vdk/format/ts/tsio" "github.com/deepch/vdk/format/ts/tsio"
) )
var CodecTypes = []av.CodecType{av.H264, av.AAC} var CodecTypes = []av.CodecType{av.H264, av.H265, av.AAC}
type Muxer struct { type Muxer struct {
w io.Writer w io.Writer
@@ -123,6 +124,11 @@ func (self *Muxer) WritePATPMT() (err error) {
StreamType: tsio.ElementaryStreamTypeH264, StreamType: tsio.ElementaryStreamTypeH264,
ElementaryPID: stream.pid, ElementaryPID: stream.pid,
}) })
case av.H265:
elemStreams = append(elemStreams, tsio.ElementaryStreamInfo{
StreamType: tsio.ElementaryStreamTypeH265,
ElementaryPID: stream.pid,
})
} }
} }
@@ -210,6 +216,36 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time) n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
datav[0] = self.peshdr[:n] datav[0] = self.peshdr[:n]
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
return
}
case av.H265:
codec := stream.CodecData.(h265parser.CodecData)
nalus := self.nalus[:0]
if pkt.IsKeyFrame {
nalus = append(nalus, codec.SPS())
nalus = append(nalus, codec.PPS())
nalus = append(nalus, codec.VPS())
}
pktnalus, _ := h265parser.SplitNALUs(pkt.Data)
for _, nalu := range pktnalus {
nalus = append(nalus, nalu)
}
datav := self.datav[:1]
for i, nalu := range nalus {
if i == 0 {
datav = append(datav, h265parser.AUDBytes)
} else {
datav = append(datav, h265parser.StartCodeBytes)
}
datav = append(datav, nalu)
}
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
datav[0] = self.peshdr[:n]
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil { if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
return return
} }

View File

@@ -36,6 +36,7 @@ const (
ElementaryStreamTypeH264 = 0x1B ElementaryStreamTypeH264 = 0x1B
ElementaryStreamTypeAdtsAAC = 0x0F ElementaryStreamTypeAdtsAAC = 0x0F
ElementaryStreamTypeAlignmentDescriptor = 0x06 ElementaryStreamTypeAlignmentDescriptor = 0x06
ElementaryStreamTypeH265 = 0x24
) )
type PATEntry struct { type PATEntry struct {
@@ -516,7 +517,7 @@ func NewTSWriter(pid uint16) *TSWriter {
return w return w
} }
//TSHeader func // TSHeader func
type TSHeader struct { type TSHeader struct {
PID uint PID uint
PCR uint64 PCR uint64
@@ -528,7 +529,7 @@ type TSHeader struct {
HeaderLength uint HeaderLength uint
} }
//WriteTSHeader func // WriteTSHeader func
func WriteTSHeader(w io.Writer, element TSHeader, dataLength int) (written int, err error) { func WriteTSHeader(w io.Writer, element TSHeader, dataLength int) (written int, err error) {
var flags, extFlags uint var flags, extFlags uint
@@ -688,13 +689,13 @@ func makeRepeatValBytes(val byte, n int) []byte {
return b return b
} }
//WriteRepeatVal func // WriteRepeatVal func
func WriteRepeatVal(w io.Writer, val byte, n int) (err error) { func WriteRepeatVal(w io.Writer, val byte, n int) (err error) {
_, err = w.Write(makeRepeatValBytes(val, n)) _, err = w.Write(makeRepeatValBytes(val, n))
return return
} }
//WriteUInt64 func // WriteUInt64 func
func WriteUInt64(w io.Writer, val uint64, n int) (err error) { func WriteUInt64(w io.Writer, val uint64, n int) (err error) {
var b [8]byte var b [8]byte
for i := n - 1; i >= 0; i-- { for i := n - 1; i >= 0; i-- {
@@ -707,12 +708,12 @@ func WriteUInt64(w io.Writer, val uint64, n int) (err error) {
return return
} }
//WriteUInt func // WriteUInt func
func WriteUInt(w io.Writer, val uint, n int) (err error) { func WriteUInt(w io.Writer, val uint, n int) (err error) {
return WriteUInt64(w, uint64(val), n) return WriteUInt64(w, uint64(val), n)
} }
//PCRToUInt func // PCRToUInt func
func PCRToUInt(pcr uint64) uint64 { func PCRToUInt(pcr uint64) uint64 {
base := pcr / 300 base := pcr / 300
ext := pcr % 300 ext := pcr % 300