10 Commits

Author SHA1 Message Date
Andrey Semochkin
eaab841cfb Update mp4io.go 2023-11-20 10:00:15 +03:00
deepch
9d21d056dd test 2023-08-25 14:35:34 +03:00
deepch
5a989a5c40 test 2023-05-25 13:08:57 +03:00
deepch
0b08aa7224 test 2023-05-25 13:07:13 +03:00
deepch
9e244f72bf test 2023-05-25 12:35:12 +03:00
deepch
8d4be0821c test 2023-05-25 12:00:15 +03:00
deepch
8958197548 fix ip camera bug 2023-04-02 17:58:56 +03:00
deepch
def88cb695 test transcode slow 2023-03-22 22:01:39 +03:00
deepch
175afae1c1 test transcode slow 2023-03-22 21:59:06 +03:00
deepch
c923e0010b fix rtmp handshake flashphoner 2023-03-13 19:26:29 +03:00
9 changed files with 150 additions and 33 deletions

View File

@@ -0,0 +1,64 @@
package main
import (
"context"
"github.com/deepch/vdk/format/rtspv2"
"github.com/deepch/vdk/format/ts"
"log"
"os/exec"
"time"
)
func main() {
RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: "rtsp://url", DisableAudio: true, DialTimeout: 3 * time.Second, ReadWriteTimeout: 5 * time.Second, Debug: true, OutgoingProxy: false})
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, "ffmpeg", "-flags", "low_delay", "-analyzeduration", "1", "-fflags", "-nobuffer", "-probesize", "1024k", "-f", "mpegts", "-i", "-", "-vcodec", "libx264", "-preset", "ultrafast", "-bf", "0", "-f", "mpegts", "-max_muxing_queue_size", "400", "-pes_payload_size", "0", "pipe:1")
inPipe, _ := cmd.StdinPipe()
outPipe, _ := cmd.StdoutPipe()
//cmd.Stderr = os.Stderr
mux := ts.NewMuxer(inPipe)
demuxer := ts.NewDemuxer(outPipe)
codec := RTSPClient.CodecData
mux.WriteHeader(codec)
go func() {
imNewCodec, err := demuxer.Streams()
log.Println("new codec data", imNewCodec, err)
for i, data := range imNewCodec {
log.Println(i, data)
}
for {
pkt, err := demuxer.ReadPacket()
if err != nil {
log.Panic(err)
}
log.Println("im new pkt ===>", pkt.Idx, pkt.Time)
}
}()
cmd.Start()
var start bool
for {
select {
case signals := <-RTSPClient.Signals:
switch signals {
case rtspv2.SignalCodecUpdate:
//?
case rtspv2.SignalStreamRTPStop:
return
}
case packetAV := <-RTSPClient.OutgoingPacketQueue:
if packetAV.IsKeyFrame {
start = true
}
if !start {
continue
}
if err = mux.WritePacket(*packetAV); err != nil {
return
}
}
}
}

View File

@@ -387,6 +387,10 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
return
}
size := pio.U32BE(taghdr[0:])
if size > 5242880 {
err = parseErr("len", 5242880, err)
return
}
tag := Tag(pio.U32BE(taghdr[4:]))
var atom Atom

View File

@@ -387,6 +387,12 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
return
}
size := pio.U32BE(taghdr[0:])
if size == 0 {
err = fmt.Errorf("bad hdr size")
return
}
tag := Tag(pio.U32BE(taghdr[4:]))
var atom Atom

View File

@@ -1623,20 +1623,15 @@ func (self *Conn) handshakeServer() (err error) {
clitime := pio.U32BE(C1[0:4])
srvtime := clitime
srvver := uint32(0x0d0e0a0d)
cliver := pio.U32BE(C1[4:8])
if cliver != 0 {
var ok bool
var digest []byte
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); !ok {
err = fmt.Errorf("rtmp: handshake server: C1 invalid")
return
}
var ok bool
var digest []byte
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); ok {
hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey)
hsCreate2(S2, digest)
} else {
copy(S1, C1)
copy(S2, C2)
copy(S1, C2)
copy(S2, C1)
}
if _, err = self.bufw.Write(S0S1S2); err != nil {

View File

@@ -9,7 +9,6 @@ import (
"encoding/hex"
"fmt"
"io"
//"log"
"net"
"net/textproto"
"net/url"
@@ -147,7 +146,12 @@ func (self *Client) probe() (err error) {
}
func (self *Client) prepare(stage int) (err error) {
var waitIdle int
for self.stage < stage {
waitIdle++
if waitIdle > 20 {
return fmt.Errorf("codec not ready")
}
switch self.stage {
case 0:
if err = self.Options(); err != nil {
@@ -695,7 +699,9 @@ func (self *Client) Describe() (streams []sdp.Media, err error) {
self.streams = []*Stream{}
for _, media := range medias {
stream := &Stream{Sdp: media, client: self}
stream.makeCodecData()
if err = stream.makeCodecData(); err != nil && DebugRtsp {
fmt.Println("rtsp: makeCodecData error", err)
}
self.streams = append(self.streams, stream)
streams = append(streams, media)
}
@@ -767,7 +773,7 @@ func (self *Stream) timeScale() int {
func (self *Stream) makeCodecData() (err error) {
media := self.Sdp
if media.PayloadType >= 96 && media.PayloadType <= 127 {
if (media.PayloadType >= 96 && media.PayloadType <= 127) || media.Type == av.H264 || media.Type == av.AAC {
switch media.Type {
case av.H264:
for _, nalu := range media.SpropParameterSets {

View File

@@ -150,9 +150,12 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
if err != nil {
return nil, err
}
for _, i2 := range client.mediaSDP {
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
//TODO check it
if strings.Contains(string(client.SDPRaw), "LaunchDigital") {
client.chTMP += 2
}
continue
}
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(client.chTMP) + "-" + strconv.Itoa(client.chTMP+1)}, client.ControlTrack(i2.Control), false, false)
@@ -945,14 +948,14 @@ func (client *RTSPClient) CodecUpdateVPS(val []byte) {
}
//Println mini logging functions
// Println mini logging functions
func (client *RTSPClient) Println(v ...interface{}) {
if client.options.Debug {
log.Println(v)
}
}
//binSize
// binSize
func binSize(val int) []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(val))

View File

@@ -25,6 +25,7 @@ type ProxyConn struct {
cseq int
session string
protocol int
in int
}
type Proxy struct {
@@ -40,7 +41,6 @@ func NewProxyConn(netconn net.Conn) *ProxyConn {
conn.writebuf = make([]byte, 4096)
conn.readbuf = make([]byte, 4096)
conn.session = uuid.New().String()
conn.cseq = 1
return conn
}
@@ -156,6 +156,7 @@ func (self *ProxyConn) prepare() error {
return errors.New("no fist cmd")
}
cseq := strings.TrimSpace(stringInBetween(string(self.readbuf[:n]), "CSeq:", "\r\n"))
switch fistStringsSlice[0] {
case OPTIONS:
@@ -165,45 +166,46 @@ func (self *ProxyConn) prepare() error {
if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil {
return err
}
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + cseq + "\r\n\r\n"))
if err != nil {
return err
}
self.options = true
case SETUP:
if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") {
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\n\r\n"))
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\n\r\n"))
if err != nil {
return err
}
return nil
}
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\r\n"))
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(self.in) + "-" + strconv.Itoa(self.in+1) + "\r\n\r\n"))
if err != nil {
return err
}
self.in = self.in + 2
case DESCRIBE:
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + cseq + "\r\n\r\n"
_, err := self.netconn.Write([]byte(buf + string(self.sdp)))
if err != nil {
return err
}
case PLAY:
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + cseq + "\r\n\r\n"))
if err != nil {
return err
}
self.playing = true
case TEARDOWN:
self.netconn.Close()
return errors.New("exit")
default:
return errors.New("metod not found")
return errors.New("metod not found " + fistStringsSlice[0])
}
return nil

View File

@@ -2,6 +2,7 @@ package ts
import (
"fmt"
"github.com/deepch/vdk/codec/h265parser"
"io"
"time"
@@ -11,7 +12,7 @@ import (
"github.com/deepch/vdk/format/ts/tsio"
)
var CodecTypes = []av.CodecType{av.H264, av.AAC}
var CodecTypes = []av.CodecType{av.H264, av.H265, av.AAC}
type Muxer struct {
w io.Writer
@@ -123,6 +124,11 @@ func (self *Muxer) WritePATPMT() (err error) {
StreamType: tsio.ElementaryStreamTypeH264,
ElementaryPID: stream.pid,
})
case av.H265:
elemStreams = append(elemStreams, tsio.ElementaryStreamInfo{
StreamType: tsio.ElementaryStreamTypeH265,
ElementaryPID: stream.pid,
})
}
}
@@ -210,6 +216,36 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
datav[0] = self.peshdr[:n]
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
return
}
case av.H265:
codec := stream.CodecData.(h265parser.CodecData)
nalus := self.nalus[:0]
if pkt.IsKeyFrame {
nalus = append(nalus, codec.SPS())
nalus = append(nalus, codec.PPS())
nalus = append(nalus, codec.VPS())
}
pktnalus, _ := h265parser.SplitNALUs(pkt.Data)
for _, nalu := range pktnalus {
nalus = append(nalus, nalu)
}
datav := self.datav[:1]
for i, nalu := range nalus {
if i == 0 {
datav = append(datav, h265parser.AUDBytes)
} else {
datav = append(datav, h265parser.StartCodeBytes)
}
datav = append(datav, nalu)
}
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
datav[0] = self.peshdr[:n]
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
return
}

View File

@@ -36,6 +36,7 @@ const (
ElementaryStreamTypeH264 = 0x1B
ElementaryStreamTypeAdtsAAC = 0x0F
ElementaryStreamTypeAlignmentDescriptor = 0x06
ElementaryStreamTypeH265 = 0x24
)
type PATEntry struct {
@@ -516,7 +517,7 @@ func NewTSWriter(pid uint16) *TSWriter {
return w
}
//TSHeader func
// TSHeader func
type TSHeader struct {
PID uint
PCR uint64
@@ -528,7 +529,7 @@ type TSHeader struct {
HeaderLength uint
}
//WriteTSHeader func
// WriteTSHeader func
func WriteTSHeader(w io.Writer, element TSHeader, dataLength int) (written int, err error) {
var flags, extFlags uint
@@ -688,13 +689,13 @@ func makeRepeatValBytes(val byte, n int) []byte {
return b
}
//WriteRepeatVal func
// WriteRepeatVal func
func WriteRepeatVal(w io.Writer, val byte, n int) (err error) {
_, err = w.Write(makeRepeatValBytes(val, n))
return
}
//WriteUInt64 func
// WriteUInt64 func
func WriteUInt64(w io.Writer, val uint64, n int) (err error) {
var b [8]byte
for i := n - 1; i >= 0; i-- {
@@ -707,12 +708,12 @@ func WriteUInt64(w io.Writer, val uint64, n int) (err error) {
return
}
//WriteUInt func
// WriteUInt func
func WriteUInt(w io.Writer, val uint, n int) (err error) {
return WriteUInt64(w, uint64(val), n)
}
//PCRToUInt func
// PCRToUInt func
func PCRToUInt(pcr uint64) uint64 {
base := pcr / 300
ext := pcr % 300