9 Commits

Author SHA1 Message Date
Andrey Semochkin
4b060bc442 fix dig 2024-02-08 01:00:12 +07:00
Andrey Semochkin
9f075c6682 fix hostShort / hostLong bug 2024-02-07 20:44:30 +07:00
Andrey Semochkin
0e06666006 fix listTag bug 2024-02-07 20:33:12 +07:00
deepch
b84c19f719 testing 2024-02-02 02:48:16 +03:00
deepch
7a563b07e3 testing 2024-01-29 12:46:06 +03:00
deepch
a743575ac9 testing 2024-01-22 16:07:30 +03:00
deepch
e4035e4407 testing 2024-01-22 11:36:06 +03:00
deepch
4df258b899 testing 2024-01-10 18:02:17 +03:00
deepch
385d9cc93c testing 2024-01-08 18:08:19 +03:00
9 changed files with 211 additions and 58 deletions

View File

@@ -170,6 +170,20 @@ func (self *AVSync) check(i int) (start time.Duration, end time.Duration, correc
return return
} }
type CalcDuration struct {
LastTime map[int8]time.Duration
}
func (self *CalcDuration) ModifyPacket(pkt *av.Packet, streams []av.CodecData, videoidx int, audioidx int) (drop bool, err error) {
if tmp, ok := self.LastTime[pkt.Idx]; ok && tmp != 0 {
pkt.Duration = pkt.Time - self.LastTime[pkt.Idx]
} else if pkt.Time < 100*time.Millisecond {
pkt.Duration = pkt.Time
}
self.LastTime[pkt.Idx] = pkt.Time
return
}
// Make packets reading speed as same as walltime, effect like ffmpeg -re option. // Make packets reading speed as same as walltime, effect like ffmpeg -re option.
type Walltime struct { type Walltime struct {
firsttime time.Time firsttime time.Time

25
example/test/main.go Normal file
View File

@@ -0,0 +1,25 @@
package main
import (
"github.com/deepch/vdk/format/ts"
"log"
"os"
)
func main() {
f, _ := os.Open("edb9708f29b24ba9b175808d6b9df9c6541e25766d4a40209a8f903948b72f3f.ts")
m := ts.NewDemuxer(f)
var i int
for {
p, err := m.ReadPacket()
if err != nil {
return
}
if p.IsKeyFrame {
i = 0
}
log.Println(i, p.Time, p.Data[4:10], len(p.Data))
i++
}
}

View File

@@ -3,16 +3,14 @@ package mp4
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"time"
"github.com/deepch/vdk/codec/h265parser"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"github.com/deepch/vdk/format/mp4/mp4io" "github.com/deepch/vdk/format/mp4/mp4io"
"github.com/deepch/vdk/utils/bits/pio" "github.com/deepch/vdk/utils/bits/pio"
"io"
"time"
) )
type Muxer struct { type Muxer struct {
@@ -20,6 +18,7 @@ type Muxer struct {
bufw *bufio.Writer bufw *bufio.Writer
wpos int64 wpos int64
streams []*Stream streams []*Stream
NegativeTsMakeZero bool
} }
func NewMuxer(w io.WriteSeeker) *Muxer { func NewMuxer(w io.WriteSeeker) *Muxer {
@@ -206,9 +205,13 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) { func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) {
if rawdur < 0 { if rawdur < 0 {
if self.muxer.NegativeTsMakeZero {
rawdur = 0
} else {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time) err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time)
return return
} }
}
if _, err = self.muxer.bufw.Write(pkt.Data); err != nil { if _, err = self.muxer.bufw.Write(pkt.Data); err != nil {
return return

View File

@@ -3,6 +3,7 @@ package mp4f
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"log"
"os" "os"
"time" "time"
@@ -238,14 +239,15 @@ func (self *Muxer) WriteTrailer() (err error) {
return return
} }
func (element *Muxer) WriteHeader(streams []av.CodecData) (err error) { func (element *Muxer) WriteHeader(streams []av.CodecData) error {
element.streams = []*Stream{} element.streams = []*Stream{}
for _, stream := range streams { for _, stream := range streams {
if err = element.newStream(stream); err != nil { if err := element.newStream(stream); err != nil {
return log.Println("WriteHeader", err)
} }
} }
return
return nil
} }
func (element *Muxer) GetInit(streams []av.CodecData) (string, []byte) { func (element *Muxer) GetInit(streams []av.CodecData) (string, []byte) {
@@ -285,6 +287,9 @@ func (element *Muxer) GetInit(streams []av.CodecData) (string, []byte) {
} }
func (element *Muxer) WritePacket(pkt av.Packet, GOP bool) (bool, []byte, error) { func (element *Muxer) WritePacket(pkt av.Packet, GOP bool) (bool, []byte, error) {
if pkt.Idx+1 > int8(len(element.streams)) {
return false, nil, nil
}
stream := element.streams[pkt.Idx] stream := element.streams[pkt.Idx]
if GOP { if GOP {
ts := time.Duration(0) ts := time.Duration(0)

View File

@@ -18,6 +18,7 @@ type Muxer struct {
bufw *bufio.Writer bufw *bufio.Writer
wpos int64 wpos int64
streams []*Stream streams []*Stream
NegativeTsMakeZero bool
} }
func NewMuxer(w io.WriteSeeker) *Muxer { func NewMuxer(w io.WriteSeeker) *Muxer {
@@ -181,9 +182,13 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) { func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) {
if rawdur < 0 { if rawdur < 0 {
if self.muxer.NegativeTsMakeZero {
rawdur = 0
} else {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time) err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time)
return return
} }
}
if _, err = self.muxer.bufw.Write(pkt.Data); err != nil { if _, err = self.muxer.bufw.Write(pkt.Data); err != nil {
return return

View File

@@ -4,11 +4,15 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/mp4" "github.com/deepch/vdk/format/mp4"
"github.com/google/uuid"
"github.com/moby/sys/mountinfo"
"github.com/shirou/gopsutil/v3/disk"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -16,7 +20,13 @@ import (
) )
var MIME = []byte{11, 22, 111, 222, 11, 22, 111, 222} var MIME = []byte{11, 22, 111, 222, 11, 22, 111, 222}
var listTag = []string{"{host_name}", "{stream_name}", "{channel_name}", "{stream_id}", "{channel_id}", "{start_year}", "{start_month}", "{start_day}", "{start_minute}", "{start_second}", "{start_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{start_time}", "{start_pts}", "{end_year}", "{end_month}", "{end_day}", "{end_minute}", "{end_second}", "{end_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{end_time}", "{end_pts}", "{duration_second}", "{duration_millisecond}"}
var listTag = []string{"{server_id}", "{host_name}", "{host_name_short}", "{host_name_long}",
"{stream_name}", "{channel_name}", "{stream_id}", "{channel_id}",
"{start_year}", "{start_month}", "{start_day}", "{start_hour}", "{start_minute}", "{start_second}",
"{start_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{start_time}", "{start_pts}",
"{end_year}", "{end_month}", "{end_day}", "{end_hour}", "{end_minute}", "{end_second}",
"{end_millisecond}", "{end_unix_millisecond}", "{end_unix_second}", "{end_time}", "{end_pts}", "{duration_second}", "{duration_millisecond}"}
const ( const (
MP4 = "mp4" MP4 = "mp4"
@@ -33,10 +43,11 @@ type Muxer struct {
h int h int
gof *Gof gof *Gof
patch string patch string
mpoint []string
start, end time.Time start, end time.Time
pstart, pend time.Duration pstart, pend time.Duration
started bool started bool
hostName, streamName, channelName, streamID, channelID string serverID, streamName, channelName, streamID, channelID, hostLong, hostShort string
} }
type Gof struct { type Gof struct {
@@ -64,18 +75,26 @@ func init() {
} }
func NewMuxer(hostName, streamName, channelName, streamID, channelID, patch string, format string, limit int) (m *Muxer, err error) { func NewMuxer(serverID, streamName, channelName, streamID, channelID string, mpoint []string, patch, format string, limit int) (m *Muxer, err error) {
hostLong, _ := os.Hostname()
var hostShort string
if p, _, ok := strings.Cut(hostLong, "."); ok {
hostShort = p
}
m = &Muxer{ m = &Muxer{
mpoint: mpoint,
patch: patch, patch: patch,
h: -1, h: -1,
gof: &Gof{}, gof: &Gof{},
format: format, format: format,
limit: limit, limit: limit,
hostName: hostName, serverID: serverID,
streamName: streamName, streamName: streamName,
channelName: channelName, channelName: channelName,
streamID: streamID, streamID: streamID,
channelID: channelID, channelID: channelID,
hostLong: hostLong,
hostShort: hostShort,
} }
return return
} }
@@ -83,7 +102,7 @@ func NewMuxer(hostName, streamName, channelName, streamID, channelID, patch stri
func (m *Muxer) WriteHeader(streams []av.CodecData) (err error) { func (m *Muxer) WriteHeader(streams []av.CodecData) (err error) {
m.gof.Streams = streams m.gof.Streams = streams
if m.format == MP4 { if m.format == MP4 {
m.OpenMP4() return m.OpenMP4()
} }
return return
@@ -111,11 +130,15 @@ func (m *Muxer) WritePacket(pkt av.Packet) (err error) {
func (m *Muxer) writePacketMP4(pkt av.Packet) (err error) { func (m *Muxer) writePacketMP4(pkt av.Packet) (err error) {
if pkt.IsKeyFrame && m.dur > time.Duration(m.limit)*time.Second { if pkt.IsKeyFrame && m.dur > time.Duration(m.limit)*time.Second {
m.pstart = pkt.Time m.pstart = pkt.Time
m.OpenMP4() if err = m.OpenMP4(); err != nil {
return
}
m.dur = 0 m.dur = 0
} }
m.dur += pkt.Duration m.dur += pkt.Duration
m.pend = pkt.Time m.pend = pkt.Time
return m.muxer.WritePacket(pkt) return m.muxer.WritePacket(pkt)
} }
@@ -186,10 +209,19 @@ func (m *Muxer) OpenNVR() (err error) {
func (m *Muxer) OpenMP4() (err error) { func (m *Muxer) OpenMP4() (err error) {
m.WriteTrailer() m.WriteTrailer()
m.start = time.Now().UTC() m.start = time.Now().UTC()
if m.d, err = os.CreateTemp("", "rtspvideo.*.mp4"); err != nil {
d, err := m.filePatch()
if err != nil {
return
}
if err = os.MkdirAll(filepath.Dir(d), 0755); err != nil {
return
}
if m.d, err = os.Create(filepath.Join(filepath.Dir(d), fmt.Sprintf("tmp_%s_%d.mp4", uuid.New(), time.Now().Unix()))); err != nil {
return return
} }
m.muxer = mp4.NewMuxer(m.d) m.muxer = mp4.NewMuxer(m.d)
m.muxer.NegativeTsMakeZero = true
if err = m.muxer.WriteHeader(m.gof.Streams); err != nil { if err = m.muxer.WriteHeader(m.gof.Streams); err != nil {
return return
} }
@@ -197,13 +229,40 @@ func (m *Muxer) OpenMP4() (err error) {
return return
} }
func (m *Muxer) filePatch() string { func (m *Muxer) filePatch() (string, error) {
ts := m.patch var (
mu = float64(100)
ui = -1
)
for i, i2 := range m.mpoint {
if m, err := mountinfo.Mounted(i2); err == nil && m {
if d, err := disk.Usage(i2); err == nil {
if d.UsedPercent < mu {
ui = i
mu = d.UsedPercent
}
}
}
}
if ui == -1 {
return "", errors.New("not mount ready")
}
ts := filepath.Join(m.mpoint[ui], m.patch)
m.end = time.Now().UTC() m.end = time.Now().UTC()
for _, s := range listTag { for _, s := range listTag {
switch s { switch s {
case "{server_id}":
ts = strings.Replace(ts, "{server_id}", m.serverID, -1)
case "{host_name}": case "{host_name}":
ts = strings.Replace(ts, "{host_name}", m.hostName, -1) ts = strings.Replace(ts, "{host_name}", m.hostLong, -1)
case "{host_name_short}":
ts = strings.Replace(ts, "{host_name_short}", m.hostShort, -1)
case "{host_name_long}":
ts = strings.Replace(ts, "{host_name_long}", m.hostLong, -1)
case "{stream_name}": case "{stream_name}":
ts = strings.Replace(ts, "{stream_name}", m.streamName, -1) ts = strings.Replace(ts, "{stream_name}", m.streamName, -1)
case "{channel_name}": case "{channel_name}":
@@ -218,6 +277,8 @@ func (m *Muxer) filePatch() string {
ts = strings.Replace(ts, "{start_month}", fmt.Sprintf("%02d", int(m.start.Month())), -1) ts = strings.Replace(ts, "{start_month}", fmt.Sprintf("%02d", int(m.start.Month())), -1)
case "{start_day}": case "{start_day}":
ts = strings.Replace(ts, "{start_day}", fmt.Sprintf("%02d", m.start.Day()), -1) ts = strings.Replace(ts, "{start_day}", fmt.Sprintf("%02d", m.start.Day()), -1)
case "{start_hour}":
ts = strings.Replace(ts, "{start_hour}", fmt.Sprintf("%02d", m.start.Hour()), -1)
case "{start_minute}": case "{start_minute}":
ts = strings.Replace(ts, "{start_minute}", fmt.Sprintf("%02d", m.start.Minute()), -1) ts = strings.Replace(ts, "{start_minute}", fmt.Sprintf("%02d", m.start.Minute()), -1)
case "{start_second}": case "{start_second}":
@@ -225,9 +286,9 @@ func (m *Muxer) filePatch() string {
case "{start_millisecond}": case "{start_millisecond}":
ts = strings.Replace(ts, "{start_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1) ts = strings.Replace(ts, "{start_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1)
case "{start_unix_millisecond}": case "{start_unix_millisecond}":
ts = strings.Replace(ts, "{start_unix_millisecond}", fmt.Sprintf("%d", m.end.UnixMilli()), -1) ts = strings.Replace(ts, "{start_unix_millisecond}", fmt.Sprintf("%d", m.start.UnixMilli()), -1)
case "{start_unix_second}": case "{start_unix_second}":
ts = strings.Replace(ts, "{start_unix_second}", fmt.Sprintf("%d", m.end.Unix()), -1) ts = strings.Replace(ts, "{start_unix_second}", fmt.Sprintf("%d", m.start.Unix()), -1)
case "{start_time}": case "{start_time}":
ts = strings.Replace(ts, "{start_time}", fmt.Sprintf("%s", m.start.Format("2006-01-02T15:04:05-0700")), -1) ts = strings.Replace(ts, "{start_time}", fmt.Sprintf("%s", m.start.Format("2006-01-02T15:04:05-0700")), -1)
case "{start_pts}": case "{start_pts}":
@@ -238,12 +299,14 @@ func (m *Muxer) filePatch() string {
ts = strings.Replace(ts, "{end_month}", fmt.Sprintf("%02d", int(m.end.Month())), -1) ts = strings.Replace(ts, "{end_month}", fmt.Sprintf("%02d", int(m.end.Month())), -1)
case "{end_day}": case "{end_day}":
ts = strings.Replace(ts, "{end_day}", fmt.Sprintf("%02d", m.end.Day()), -1) ts = strings.Replace(ts, "{end_day}", fmt.Sprintf("%02d", m.end.Day()), -1)
case "{end_hour}":
ts = strings.Replace(ts, "{end_hour}", fmt.Sprintf("%02d", m.end.Hour()), -1)
case "{end_minute}": case "{end_minute}":
ts = strings.Replace(ts, "{end_minute}", fmt.Sprintf("%02d", m.end.Minute()), -1) ts = strings.Replace(ts, "{end_minute}", fmt.Sprintf("%02d", m.end.Minute()), -1)
case "{end_second}": case "{end_second}":
ts = strings.Replace(ts, "{end_second}", fmt.Sprintf("%02d", m.end.Second()), -1) ts = strings.Replace(ts, "{end_second}", fmt.Sprintf("%02d", m.end.Second()), -1)
case "{end_millisecond}": case "{end_millisecond}":
ts = strings.Replace(ts, "{end_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1) ts = strings.Replace(ts, "{end_millisecond}", fmt.Sprintf("%d", m.end.Nanosecond()/1000/1000), -1)
case "{end_unix_millisecond}": case "{end_unix_millisecond}":
ts = strings.Replace(ts, "{end_unix_millisecond}", fmt.Sprintf("%d", m.end.UnixMilli()), -1) ts = strings.Replace(ts, "{end_unix_millisecond}", fmt.Sprintf("%d", m.end.UnixMilli()), -1)
case "{end_unix_second}": case "{end_unix_second}":
@@ -259,7 +322,7 @@ func (m *Muxer) filePatch() string {
} }
} }
return ts return ts, nil
} }
func (m *Muxer) WriteTrailer() (err error) { func (m *Muxer) WriteTrailer() (err error) {
@@ -271,12 +334,12 @@ func (m *Muxer) WriteTrailer() (err error) {
} }
if m.d != nil { if m.d != nil {
if m.format == MP4 { if m.format == MP4 {
p := m.filePatch() p, err := m.filePatch()
if err = os.MkdirAll(filepath.Dir(p), 0755); err != nil { if err != nil {
return return err
} }
if err = os.Rename(m.d.Name(), p); err != nil { if err = os.Rename(m.d.Name(), filepath.Join(filepath.Dir(m.d.Name()), filepath.Base(p))); err != nil {
return return err
} }
} }
err = m.d.Close() err = m.d.Close()

View File

@@ -42,6 +42,7 @@ const (
type Client struct { type Client struct {
DebugRtsp bool DebugRtsp bool
DebugRtp bool DebugRtp bool
DisableAudio bool
Headers []string Headers []string
SkipErrRtpBlock bool SkipErrRtpBlock bool
@@ -1076,12 +1077,44 @@ func (self *Stream) handleRtpPacket(packet []byte) (err error) {
err = fmt.Errorf("rtp: packet too short") err = fmt.Errorf("rtp: packet too short")
return return
} }
payloadOffset := 12 + int(packet[0]&0xf)*4
timestamp := binary.BigEndian.Uint32(packet[4:8])
/*
Test offset
*/
Padding := (packet[0]>>5)&1 == 1
Extension := (packet[0]>>4)&1 == 1
CSRCCnt := int(packet[0] & 0x0f)
RTPHeaderSize := 12
payloadOffset := RTPHeaderSize
end := len(packet)
if end-payloadOffset >= 4*CSRCCnt {
payloadOffset += 4 * CSRCCnt
}
if Extension && end-payloadOffset >= 4 {
extLen := 4 * int(binary.BigEndian.Uint16(packet[payloadOffset+2:]))
payloadOffset += 4
if end-payloadOffset >= extLen {
payloadOffset += extLen
}
}
if Padding && end-payloadOffset > 0 {
paddingLen := int(packet[end-1])
if end-payloadOffset >= paddingLen {
end -= paddingLen
}
}
if payloadOffset > len(packet) { if payloadOffset > len(packet) {
err = fmt.Errorf("rtp: packet too short") err = fmt.Errorf("rtp: packet too short")
return return
} }
timestamp := binary.BigEndian.Uint32(packet[4:8])
payload := packet[payloadOffset:] payload := packet[payloadOffset:]
/* /*

1
format/rtsp/server.go Normal file
View File

@@ -0,0 +1 @@
package rtsp

View File

@@ -284,7 +284,11 @@ func (self *Stream) payloadEnd() (n int, err error) {
b := make([]byte, 4+len(nalu)) b := make([]byte, 4+len(nalu))
pio.PutU32BE(b[0:4], uint32(len(nalu))) pio.PutU32BE(b[0:4], uint32(len(nalu)))
copy(b[4:], nalu) copy(b[4:], nalu)
self.addPacket(b, time.Duration(0), (1000*time.Millisecond)/time.Duration(self.fps)) fps := self.fps
if self.fps == 0 {
fps = 25
}
self.addPacket(b, time.Duration(0), (1000*time.Millisecond)/time.Duration(fps))
n++ n++
} }
} }