From 175afae1c16912fa1e230149f8b5dc7e760d35d4 Mon Sep 17 00:00:00 2001 From: deepch Date: Wed, 22 Mar 2023 21:59:06 +0300 Subject: [PATCH] test transcode slow --- example/transcoder/main.go | 64 ++++++++++++++++++++++++++++++++++++++ format/mp4m/mp4io/mp4io.go | 8 ++++- format/ts/muxer.go | 38 +++++++++++++++++++++- format/ts/tsio/tsio.go | 13 ++++---- 4 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 example/transcoder/main.go diff --git a/example/transcoder/main.go b/example/transcoder/main.go new file mode 100644 index 0000000..7cd156a --- /dev/null +++ b/example/transcoder/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "github.com/deepch/vdk/format/rtspv2" + "github.com/deepch/vdk/format/ts" + "log" + "os/exec" + "time" +) + +func main() { + RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: "rtsp://url", DisableAudio: true, DialTimeout: 3 * time.Second, ReadWriteTimeout: 5 * time.Second, Debug: true, OutgoingProxy: false}) + if err != nil { + panic(err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cmd := exec.CommandContext(ctx, "ffmpeg", "-flags", "low_delay", "-analyzeduration", "1", "-fflags", "-nobuffer", "-probesize", "1024k", "-f", "mpegts", "-i", "-", "-vcodec", "libx264", "-preset", "ultrafast", "-bf", "0", "-f", "mpegts", "-max_muxing_queue_size", "400", "-pes_payload_size", "0", "pipe:1") + inPipe, _ := cmd.StdinPipe() + outPipe, _ := cmd.StdoutPipe() + //cmd.Stderr = os.Stderr + mux := ts.NewMuxer(inPipe) + demuxer := ts.NewDemuxer(outPipe) + codec := RTSPClient.CodecData + mux.WriteHeader(codec) + go func() { + imNewCodec, err := demuxer.Streams() + log.Println("new codec data", imNewCodec, err) + for i, data := range imNewCodec { + log.Println(i, data) + } + for { + pkt, err := demuxer.ReadPacket() + if err != nil { + log.Panic(err) + } + log.Println("im new pkt ===>", pkt.Idx, pkt.Time) + } + }() + cmd.Start() + var start bool + for { + select { + case signals := <-RTSPClient.Signals: + switch signals { + case rtspv2.SignalCodecUpdate: + //? + case rtspv2.SignalStreamRTPStop: + return + } + case packetAV := <-RTSPClient.OutgoingPacketQueue: + if packetAV.IsKeyFrame { + start = true + } + if !start { + continue + } + if err = mux.WritePacket(*packetAV); err != nil { + return + } + } + } +} diff --git a/format/mp4m/mp4io/mp4io.go b/format/mp4m/mp4io/mp4io.go index dfa99d5..5ce5ce8 100644 --- a/format/mp4m/mp4io/mp4io.go +++ b/format/mp4m/mp4io/mp4io.go @@ -376,7 +376,7 @@ func (self *ElemStreamDesc) parseDescHdr(b []byte, offset int) (n int, tag uint8 return } -func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) { +func ReadFileAtoms(r io.ReadSeeker, name string) (atoms []Atom, err error) { for { offset, _ := r.Seek(0, 1) taghdr := make([]byte, 8) @@ -387,6 +387,12 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) { return } size := pio.U32BE(taghdr[0:]) + + if size == 0 { + err = fmt.Errorf("bad hdr size") + return + } + tag := Tag(pio.U32BE(taghdr[4:])) var atom Atom diff --git a/format/ts/muxer.go b/format/ts/muxer.go index c33f1ff..53bc6a0 100644 --- a/format/ts/muxer.go +++ b/format/ts/muxer.go @@ -2,6 +2,7 @@ package ts import ( "fmt" + "github.com/deepch/vdk/codec/h265parser" "io" "time" @@ -11,7 +12,7 @@ import ( "github.com/deepch/vdk/format/ts/tsio" ) -var CodecTypes = []av.CodecType{av.H264, av.AAC} +var CodecTypes = []av.CodecType{av.H264, av.H265, av.AAC} type Muxer struct { w io.Writer @@ -123,6 +124,11 @@ func (self *Muxer) WritePATPMT() (err error) { StreamType: tsio.ElementaryStreamTypeH264, ElementaryPID: stream.pid, }) + case av.H265: + elemStreams = append(elemStreams, tsio.ElementaryStreamInfo{ + StreamType: tsio.ElementaryStreamTypeH265, + ElementaryPID: stream.pid, + }) } } @@ -210,6 +216,36 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) { n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time) datav[0] = self.peshdr[:n] + if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil { + return + } + case av.H265: + codec := stream.CodecData.(h265parser.CodecData) + + nalus := self.nalus[:0] + if pkt.IsKeyFrame { + nalus = append(nalus, codec.SPS()) + nalus = append(nalus, codec.PPS()) + nalus = append(nalus, codec.VPS()) + } + pktnalus, _ := h265parser.SplitNALUs(pkt.Data) + for _, nalu := range pktnalus { + nalus = append(nalus, nalu) + } + + datav := self.datav[:1] + for i, nalu := range nalus { + if i == 0 { + datav = append(datav, h265parser.AUDBytes) + } else { + datav = append(datav, h265parser.StartCodeBytes) + } + datav = append(datav, nalu) + } + + n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time) + datav[0] = self.peshdr[:n] + if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil { return } diff --git a/format/ts/tsio/tsio.go b/format/ts/tsio/tsio.go index 3fc616e..362e385 100644 --- a/format/ts/tsio/tsio.go +++ b/format/ts/tsio/tsio.go @@ -36,6 +36,7 @@ const ( ElementaryStreamTypeH264 = 0x1B ElementaryStreamTypeAdtsAAC = 0x0F ElementaryStreamTypeAlignmentDescriptor = 0x06 + ElementaryStreamTypeH265 = 0x24 ) type PATEntry struct { @@ -516,7 +517,7 @@ func NewTSWriter(pid uint16) *TSWriter { return w } -//TSHeader func +// TSHeader func type TSHeader struct { PID uint PCR uint64 @@ -528,7 +529,7 @@ type TSHeader struct { HeaderLength uint } -//WriteTSHeader func +// WriteTSHeader func func WriteTSHeader(w io.Writer, element TSHeader, dataLength int) (written int, err error) { var flags, extFlags uint @@ -688,13 +689,13 @@ func makeRepeatValBytes(val byte, n int) []byte { return b } -//WriteRepeatVal func +// WriteRepeatVal func func WriteRepeatVal(w io.Writer, val byte, n int) (err error) { _, err = w.Write(makeRepeatValBytes(val, n)) return } -//WriteUInt64 func +// WriteUInt64 func func WriteUInt64(w io.Writer, val uint64, n int) (err error) { var b [8]byte for i := n - 1; i >= 0; i-- { @@ -707,12 +708,12 @@ func WriteUInt64(w io.Writer, val uint64, n int) (err error) { return } -//WriteUInt func +// WriteUInt func func WriteUInt(w io.Writer, val uint, n int) (err error) { return WriteUInt64(w, uint64(val), n) } -//PCRToUInt func +// PCRToUInt func func PCRToUInt(pcr uint64) uint64 { base := pcr / 300 ext := pcr % 300