Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eaab841cfb | ||
|
|
9d21d056dd | ||
|
|
5a989a5c40 | ||
|
|
0b08aa7224 | ||
|
|
9e244f72bf | ||
|
|
8d4be0821c | ||
|
|
8958197548 | ||
|
|
def88cb695 | ||
|
|
175afae1c1 | ||
|
|
c923e0010b |
64
example/transcoder/main.go
Normal file
64
example/transcoder/main.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/deepch/vdk/format/rtspv2"
|
||||
"github.com/deepch/vdk/format/ts"
|
||||
"log"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: "rtsp://url", DisableAudio: true, DialTimeout: 3 * time.Second, ReadWriteTimeout: 5 * time.Second, Debug: true, OutgoingProxy: false})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(ctx, "ffmpeg", "-flags", "low_delay", "-analyzeduration", "1", "-fflags", "-nobuffer", "-probesize", "1024k", "-f", "mpegts", "-i", "-", "-vcodec", "libx264", "-preset", "ultrafast", "-bf", "0", "-f", "mpegts", "-max_muxing_queue_size", "400", "-pes_payload_size", "0", "pipe:1")
|
||||
inPipe, _ := cmd.StdinPipe()
|
||||
outPipe, _ := cmd.StdoutPipe()
|
||||
//cmd.Stderr = os.Stderr
|
||||
mux := ts.NewMuxer(inPipe)
|
||||
demuxer := ts.NewDemuxer(outPipe)
|
||||
codec := RTSPClient.CodecData
|
||||
mux.WriteHeader(codec)
|
||||
go func() {
|
||||
imNewCodec, err := demuxer.Streams()
|
||||
log.Println("new codec data", imNewCodec, err)
|
||||
for i, data := range imNewCodec {
|
||||
log.Println(i, data)
|
||||
}
|
||||
for {
|
||||
pkt, err := demuxer.ReadPacket()
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
log.Println("im new pkt ===>", pkt.Idx, pkt.Time)
|
||||
}
|
||||
}()
|
||||
cmd.Start()
|
||||
var start bool
|
||||
for {
|
||||
select {
|
||||
case signals := <-RTSPClient.Signals:
|
||||
switch signals {
|
||||
case rtspv2.SignalCodecUpdate:
|
||||
//?
|
||||
case rtspv2.SignalStreamRTPStop:
|
||||
return
|
||||
}
|
||||
case packetAV := <-RTSPClient.OutgoingPacketQueue:
|
||||
if packetAV.IsKeyFrame {
|
||||
start = true
|
||||
}
|
||||
if !start {
|
||||
continue
|
||||
}
|
||||
if err = mux.WritePacket(*packetAV); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -387,6 +387,10 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
|
||||
return
|
||||
}
|
||||
size := pio.U32BE(taghdr[0:])
|
||||
if size > 5242880 {
|
||||
err = parseErr("len", 5242880, err)
|
||||
return
|
||||
}
|
||||
tag := Tag(pio.U32BE(taghdr[4:]))
|
||||
|
||||
var atom Atom
|
||||
|
||||
@@ -387,6 +387,12 @@ func ReadFileAtoms(r io.ReadSeeker) (atoms []Atom, err error) {
|
||||
return
|
||||
}
|
||||
size := pio.U32BE(taghdr[0:])
|
||||
|
||||
if size == 0 {
|
||||
err = fmt.Errorf("bad hdr size")
|
||||
return
|
||||
}
|
||||
|
||||
tag := Tag(pio.U32BE(taghdr[4:]))
|
||||
|
||||
var atom Atom
|
||||
|
||||
@@ -1623,20 +1623,15 @@ func (self *Conn) handshakeServer() (err error) {
|
||||
clitime := pio.U32BE(C1[0:4])
|
||||
srvtime := clitime
|
||||
srvver := uint32(0x0d0e0a0d)
|
||||
cliver := pio.U32BE(C1[4:8])
|
||||
|
||||
if cliver != 0 {
|
||||
var ok bool
|
||||
var digest []byte
|
||||
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); !ok {
|
||||
err = fmt.Errorf("rtmp: handshake server: C1 invalid")
|
||||
return
|
||||
}
|
||||
if ok, digest = hsParse1(C1, hsClientPartialKey, hsServerFullKey); ok {
|
||||
hsCreate01(S0S1, srvtime, srvver, hsServerPartialKey)
|
||||
hsCreate2(S2, digest)
|
||||
} else {
|
||||
copy(S1, C1)
|
||||
copy(S2, C2)
|
||||
copy(S1, C2)
|
||||
copy(S2, C1)
|
||||
}
|
||||
|
||||
if _, err = self.bufw.Write(S0S1S2); err != nil {
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
//"log"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"net/url"
|
||||
@@ -147,7 +146,12 @@ func (self *Client) probe() (err error) {
|
||||
}
|
||||
|
||||
func (self *Client) prepare(stage int) (err error) {
|
||||
var waitIdle int
|
||||
for self.stage < stage {
|
||||
waitIdle++
|
||||
if waitIdle > 20 {
|
||||
return fmt.Errorf("codec not ready")
|
||||
}
|
||||
switch self.stage {
|
||||
case 0:
|
||||
if err = self.Options(); err != nil {
|
||||
@@ -695,7 +699,9 @@ func (self *Client) Describe() (streams []sdp.Media, err error) {
|
||||
self.streams = []*Stream{}
|
||||
for _, media := range medias {
|
||||
stream := &Stream{Sdp: media, client: self}
|
||||
stream.makeCodecData()
|
||||
if err = stream.makeCodecData(); err != nil && DebugRtsp {
|
||||
fmt.Println("rtsp: makeCodecData error", err)
|
||||
}
|
||||
self.streams = append(self.streams, stream)
|
||||
streams = append(streams, media)
|
||||
}
|
||||
@@ -767,7 +773,7 @@ func (self *Stream) timeScale() int {
|
||||
|
||||
func (self *Stream) makeCodecData() (err error) {
|
||||
media := self.Sdp
|
||||
if media.PayloadType >= 96 && media.PayloadType <= 127 {
|
||||
if (media.PayloadType >= 96 && media.PayloadType <= 127) || media.Type == av.H264 || media.Type == av.AAC {
|
||||
switch media.Type {
|
||||
case av.H264:
|
||||
for _, nalu := range media.SpropParameterSets {
|
||||
|
||||
@@ -150,9 +150,12 @@ func Dial(options RTSPClientOptions) (*RTSPClient, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, i2 := range client.mediaSDP {
|
||||
if (i2.AVType != VIDEO && i2.AVType != AUDIO) || (client.options.DisableAudio && i2.AVType == AUDIO) {
|
||||
//TODO check it
|
||||
if strings.Contains(string(client.SDPRaw), "LaunchDigital") {
|
||||
client.chTMP += 2
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = client.request(SETUP, map[string]string{"Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(client.chTMP) + "-" + strconv.Itoa(client.chTMP+1)}, client.ControlTrack(i2.Control), false, false)
|
||||
|
||||
@@ -25,6 +25,7 @@ type ProxyConn struct {
|
||||
cseq int
|
||||
session string
|
||||
protocol int
|
||||
in int
|
||||
}
|
||||
|
||||
type Proxy struct {
|
||||
@@ -40,7 +41,6 @@ func NewProxyConn(netconn net.Conn) *ProxyConn {
|
||||
conn.writebuf = make([]byte, 4096)
|
||||
conn.readbuf = make([]byte, 4096)
|
||||
conn.session = uuid.New().String()
|
||||
conn.cseq = 1
|
||||
return conn
|
||||
}
|
||||
|
||||
@@ -156,6 +156,7 @@ func (self *ProxyConn) prepare() error {
|
||||
return errors.New("no fist cmd")
|
||||
}
|
||||
|
||||
cseq := strings.TrimSpace(stringInBetween(string(self.readbuf[:n]), "CSeq:", "\r\n"))
|
||||
switch fistStringsSlice[0] {
|
||||
case OPTIONS:
|
||||
|
||||
@@ -165,45 +166,46 @@ func (self *ProxyConn) prepare() error {
|
||||
if self.URL, err = url.Parse(fistStringsSlice[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nPublic: OPTIONS, DESCRIBE, SETUP, PLAY\r\nSession: " + self.session + "\r\nCSeq: " + cseq + "\r\n\r\n"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
self.options = true
|
||||
|
||||
case SETUP:
|
||||
|
||||
if strings.Contains(string(self.readbuf[:n]), "RTP/AVP/UDP") {
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\n\r\n"))
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 461 Unsupported transport\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\n\r\n"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\r\n"))
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nCSeq: " + cseq + "\r\nSession: " + self.session + "\r\nTransport: RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(self.in) + "-" + strconv.Itoa(self.in+1) + "\r\n\r\n"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
self.in = self.in + 2
|
||||
case DESCRIBE:
|
||||
|
||||
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"
|
||||
buf := "RTSP/1.0 200 OK\r\nContent-Type: application/sdp\r\nSession: " + self.session + "\r\nContent-Length: " + strconv.Itoa(len(self.sdp)) + "\r\nCSeq: " + cseq + "\r\n\r\n"
|
||||
_, err := self.netconn.Write([]byte(buf + string(self.sdp)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case PLAY:
|
||||
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + strconv.Itoa(self.cseq) + "\r\n\r\n"))
|
||||
_, err := self.netconn.Write([]byte("RTSP/1.0 200 OK\r\nSession: " + self.session + ";timeout=60\r\nCSeq: " + cseq + "\r\n\r\n"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
self.playing = true
|
||||
case TEARDOWN:
|
||||
self.netconn.Close()
|
||||
return errors.New("exit")
|
||||
|
||||
default:
|
||||
|
||||
return errors.New("metod not found")
|
||||
return errors.New("metod not found " + fistStringsSlice[0])
|
||||
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -2,6 +2,7 @@ package ts
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/deepch/vdk/codec/h265parser"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
@@ -11,7 +12,7 @@ import (
|
||||
"github.com/deepch/vdk/format/ts/tsio"
|
||||
)
|
||||
|
||||
var CodecTypes = []av.CodecType{av.H264, av.AAC}
|
||||
var CodecTypes = []av.CodecType{av.H264, av.H265, av.AAC}
|
||||
|
||||
type Muxer struct {
|
||||
w io.Writer
|
||||
@@ -123,6 +124,11 @@ func (self *Muxer) WritePATPMT() (err error) {
|
||||
StreamType: tsio.ElementaryStreamTypeH264,
|
||||
ElementaryPID: stream.pid,
|
||||
})
|
||||
case av.H265:
|
||||
elemStreams = append(elemStreams, tsio.ElementaryStreamInfo{
|
||||
StreamType: tsio.ElementaryStreamTypeH265,
|
||||
ElementaryPID: stream.pid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,6 +216,36 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
|
||||
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
|
||||
datav[0] = self.peshdr[:n]
|
||||
|
||||
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
|
||||
return
|
||||
}
|
||||
case av.H265:
|
||||
codec := stream.CodecData.(h265parser.CodecData)
|
||||
|
||||
nalus := self.nalus[:0]
|
||||
if pkt.IsKeyFrame {
|
||||
nalus = append(nalus, codec.SPS())
|
||||
nalus = append(nalus, codec.PPS())
|
||||
nalus = append(nalus, codec.VPS())
|
||||
}
|
||||
pktnalus, _ := h265parser.SplitNALUs(pkt.Data)
|
||||
for _, nalu := range pktnalus {
|
||||
nalus = append(nalus, nalu)
|
||||
}
|
||||
|
||||
datav := self.datav[:1]
|
||||
for i, nalu := range nalus {
|
||||
if i == 0 {
|
||||
datav = append(datav, h265parser.AUDBytes)
|
||||
} else {
|
||||
datav = append(datav, h265parser.StartCodeBytes)
|
||||
}
|
||||
datav = append(datav, nalu)
|
||||
}
|
||||
|
||||
n := tsio.FillPESHeader(self.peshdr, tsio.StreamIdH264, -1, pkt.Time+pkt.CompositionTime, pkt.Time)
|
||||
datav[0] = self.peshdr[:n]
|
||||
|
||||
if err = stream.tsw.WritePackets(self.w, datav, pkt.Time, pkt.IsKeyFrame, false); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ const (
|
||||
ElementaryStreamTypeH264 = 0x1B
|
||||
ElementaryStreamTypeAdtsAAC = 0x0F
|
||||
ElementaryStreamTypeAlignmentDescriptor = 0x06
|
||||
ElementaryStreamTypeH265 = 0x24
|
||||
)
|
||||
|
||||
type PATEntry struct {
|
||||
|
||||
Reference in New Issue
Block a user