5 Commits

Author SHA1 Message Date
deepch
7a563b07e3 testing 2024-01-29 12:46:06 +03:00
deepch
a743575ac9 testing 2024-01-22 16:07:30 +03:00
deepch
e4035e4407 testing 2024-01-22 11:36:06 +03:00
deepch
4df258b899 testing 2024-01-10 18:02:17 +03:00
deepch
385d9cc93c testing 2024-01-08 18:08:19 +03:00
4 changed files with 127 additions and 53 deletions

View File

@@ -170,6 +170,20 @@ func (self *AVSync) check(i int) (start time.Duration, end time.Duration, correc
return return
} }
type CalcDuration struct {
LastTime map[int8]time.Duration
}
func (self *CalcDuration) ModifyPacket(pkt *av.Packet, streams []av.CodecData, videoidx int, audioidx int) (drop bool, err error) {
if tmp, ok := self.LastTime[pkt.Idx]; ok && tmp != 0 {
pkt.Duration = pkt.Time - self.LastTime[pkt.Idx]
} else if pkt.Time < 100*time.Millisecond {
pkt.Duration = pkt.Time
}
self.LastTime[pkt.Idx] = pkt.Time
return
}
// Make packets reading speed as same as walltime, effect like ffmpeg -re option. // Make packets reading speed as same as walltime, effect like ffmpeg -re option.
type Walltime struct { type Walltime struct {
firsttime time.Time firsttime time.Time

View File

@@ -3,23 +3,22 @@ package mp4
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"time"
"github.com/deepch/vdk/codec/h265parser"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"github.com/deepch/vdk/format/mp4/mp4io" "github.com/deepch/vdk/format/mp4/mp4io"
"github.com/deepch/vdk/utils/bits/pio" "github.com/deepch/vdk/utils/bits/pio"
"io"
"time"
) )
type Muxer struct { type Muxer struct {
w io.WriteSeeker w io.WriteSeeker
bufw *bufio.Writer bufw *bufio.Writer
wpos int64 wpos int64
streams []*Stream streams []*Stream
NegativeTsMakeZero bool
} }
func NewMuxer(w io.WriteSeeker) *Muxer { func NewMuxer(w io.WriteSeeker) *Muxer {
@@ -206,8 +205,12 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) { func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) {
if rawdur < 0 { if rawdur < 0 {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time) if self.muxer.NegativeTsMakeZero {
return rawdur = 0
} else {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time)
return
}
} }
if _, err = self.muxer.bufw.Write(pkt.Data); err != nil { if _, err = self.muxer.bufw.Write(pkt.Data); err != nil {

View File

@@ -14,10 +14,11 @@ import (
) )
type Muxer struct { type Muxer struct {
w io.WriteSeeker w io.WriteSeeker
bufw *bufio.Writer bufw *bufio.Writer
wpos int64 wpos int64
streams []*Stream streams []*Stream
NegativeTsMakeZero bool
} }
func NewMuxer(w io.WriteSeeker) *Muxer { func NewMuxer(w io.WriteSeeker) *Muxer {
@@ -181,8 +182,12 @@ func (self *Muxer) WritePacket(pkt av.Packet) (err error) {
func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) { func (self *Stream) writePacket(pkt av.Packet, rawdur time.Duration) (err error) {
if rawdur < 0 { if rawdur < 0 {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time) if self.muxer.NegativeTsMakeZero {
return rawdur = 0
} else {
err = fmt.Errorf("mp4: stream#%d time=%v < lasttime=%v", pkt.Idx, pkt.Time, self.lastpkt.Time)
return
}
} }
if _, err = self.muxer.bufw.Write(pkt.Data); err != nil { if _, err = self.muxer.bufw.Write(pkt.Data); err != nil {

View File

@@ -4,11 +4,15 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/mp4" "github.com/deepch/vdk/format/mp4"
"github.com/google/uuid"
"github.com/moby/sys/mountinfo"
"github.com/shirou/gopsutil/v3/disk"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -16,7 +20,7 @@ import (
) )
var MIME = []byte{11, 22, 111, 222, 11, 22, 111, 222} var MIME = []byte{11, 22, 111, 222, 11, 22, 111, 222}
var listTag = []string{"{host_name}", "{stream_name}", "{channel_name}", "{stream_id}", "{channel_id}", "{start_year}", "{start_month}", "{start_day}", "{start_minute}", "{start_second}", "{start_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{start_time}", "{start_pts}", "{end_year}", "{end_month}", "{end_day}", "{end_minute}", "{end_second}", "{end_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{end_time}", "{end_pts}", "{duration_second}", "{duration_millisecond}"} var listTag = []string{"{server_id}", "{host_name}", "{host_name_short}", "{host_name_long}", "{stream_name}", "{channel_name}", "{stream_id}", "{channel_id}", "{start_year}", "{start_month}", "{start_day}", "{start_hour}", "{start_minute}", "{start_second}", "{start_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{start_time}", "{start_pts}", "{end_year}", "{end_month}", "{end_day}", "{end_hour}", "{end_minute}", "{end_second}", "{end_millisecond}", "{start_unix_second}", "{start_unix_millisecond}", "{end_time}", "{end_pts}", "{duration_second}", "{duration_millisecond}"}
const ( const (
MP4 = "mp4" MP4 = "mp4"
@@ -24,19 +28,20 @@ const (
) )
type Muxer struct { type Muxer struct {
muxer *mp4.Muxer muxer *mp4.Muxer
format string format string
limit int limit int
d *os.File d *os.File
m *os.File m *os.File
dur time.Duration dur time.Duration
h int h int
gof *Gof gof *Gof
patch string patch string
start, end time.Time mpoint []string
pstart, pend time.Duration start, end time.Time
started bool pstart, pend time.Duration
hostName, streamName, channelName, streamID, channelID string started bool
serverID, streamName, channelName, streamID, channelID, hostname string
} }
type Gof struct { type Gof struct {
@@ -64,18 +69,21 @@ func init() {
} }
func NewMuxer(hostName, streamName, channelName, streamID, channelID, patch string, format string, limit int) (m *Muxer, err error) { func NewMuxer(serverID, streamName, channelName, streamID, channelID string, mpoint []string, patch, format string, limit int) (m *Muxer, err error) {
hostname, _ := os.Hostname()
m = &Muxer{ m = &Muxer{
mpoint: mpoint,
patch: patch, patch: patch,
h: -1, h: -1,
gof: &Gof{}, gof: &Gof{},
format: format, format: format,
limit: limit, limit: limit,
hostName: hostName, serverID: serverID,
streamName: streamName, streamName: streamName,
channelName: channelName, channelName: channelName,
streamID: streamID, streamID: streamID,
channelID: channelID, channelID: channelID,
hostname: hostname,
} }
return return
} }
@@ -83,7 +91,7 @@ func NewMuxer(hostName, streamName, channelName, streamID, channelID, patch stri
func (m *Muxer) WriteHeader(streams []av.CodecData) (err error) { func (m *Muxer) WriteHeader(streams []av.CodecData) (err error) {
m.gof.Streams = streams m.gof.Streams = streams
if m.format == MP4 { if m.format == MP4 {
m.OpenMP4() return m.OpenMP4()
} }
return return
@@ -111,11 +119,15 @@ func (m *Muxer) WritePacket(pkt av.Packet) (err error) {
func (m *Muxer) writePacketMP4(pkt av.Packet) (err error) { func (m *Muxer) writePacketMP4(pkt av.Packet) (err error) {
if pkt.IsKeyFrame && m.dur > time.Duration(m.limit)*time.Second { if pkt.IsKeyFrame && m.dur > time.Duration(m.limit)*time.Second {
m.pstart = pkt.Time m.pstart = pkt.Time
m.OpenMP4() if err = m.OpenMP4(); err != nil {
return
}
m.dur = 0 m.dur = 0
} }
m.dur += pkt.Duration m.dur += pkt.Duration
m.pend = pkt.Time m.pend = pkt.Time
return m.muxer.WritePacket(pkt) return m.muxer.WritePacket(pkt)
} }
@@ -186,10 +198,19 @@ func (m *Muxer) OpenNVR() (err error) {
func (m *Muxer) OpenMP4() (err error) { func (m *Muxer) OpenMP4() (err error) {
m.WriteTrailer() m.WriteTrailer()
m.start = time.Now().UTC() m.start = time.Now().UTC()
if m.d, err = os.CreateTemp("", "rtspvideo.*.mp4"); err != nil {
d, err := m.filePatch()
if err != nil {
return
}
if err = os.MkdirAll(filepath.Dir(d), 0755); err != nil {
return
}
if m.d, err = os.Create(filepath.Join(filepath.Dir(d), fmt.Sprintf("tmp_%s_%d.mp4", uuid.New(), time.Now().Unix()))); err != nil {
return return
} }
m.muxer = mp4.NewMuxer(m.d) m.muxer = mp4.NewMuxer(m.d)
m.muxer.NegativeTsMakeZero = true
if err = m.muxer.WriteHeader(m.gof.Streams); err != nil { if err = m.muxer.WriteHeader(m.gof.Streams); err != nil {
return return
} }
@@ -197,13 +218,40 @@ func (m *Muxer) OpenMP4() (err error) {
return return
} }
func (m *Muxer) filePatch() string { func (m *Muxer) filePatch() (string, error) {
ts := m.patch var (
mu = float64(100)
ui = -1
)
for i, i2 := range m.mpoint {
if m, err := mountinfo.Mounted(i2); err == nil && m {
if d, err := disk.Usage(i2); err == nil {
if d.UsedPercent < mu {
ui = i
mu = d.UsedPercent
}
}
}
}
if ui == -1 {
return "", errors.New("not mount ready")
}
ts := filepath.Join(m.mpoint[ui], m.patch)
m.end = time.Now().UTC() m.end = time.Now().UTC()
for _, s := range listTag { for _, s := range listTag {
switch s { switch s {
case "{server_id}":
ts = strings.Replace(ts, "{server_id}", m.serverID, -1)
case "{host_name}": case "{host_name}":
ts = strings.Replace(ts, "{host_name}", m.hostName, -1) ts = strings.Replace(ts, "{host_name}", m.hostname, -1)
case "{host_name_short}":
ts = strings.Replace(ts, "{host_name_short}", m.hostname, -1)
case "{host_name_long}":
ts = strings.Replace(ts, "{host_name_long}", m.hostname, -1)
case "{stream_name}": case "{stream_name}":
ts = strings.Replace(ts, "{stream_name}", m.streamName, -1) ts = strings.Replace(ts, "{stream_name}", m.streamName, -1)
case "{channel_name}": case "{channel_name}":
@@ -215,13 +263,15 @@ func (m *Muxer) filePatch() string {
case "{start_year}": case "{start_year}":
ts = strings.Replace(ts, "{start_year}", fmt.Sprintf("%d", m.start.Year()), -1) ts = strings.Replace(ts, "{start_year}", fmt.Sprintf("%d", m.start.Year()), -1)
case "{start_month}": case "{start_month}":
ts = strings.Replace(ts, "{start_month}", fmt.Sprintf("%02d", int(m.start.Month())), -1) ts = strings.Replace(ts, "{start_month}", fmt.Sprintf("%d", int(m.start.Month())), -1)
case "{start_day}": case "{start_day}":
ts = strings.Replace(ts, "{start_day}", fmt.Sprintf("%02d", m.start.Day()), -1) ts = strings.Replace(ts, "{start_day}", fmt.Sprintf("%d", m.start.Day()), -1)
case "{start_hour}":
ts = strings.Replace(ts, "{start_hour}", fmt.Sprintf("%d", m.start.Hour()), -1)
case "{start_minute}": case "{start_minute}":
ts = strings.Replace(ts, "{start_minute}", fmt.Sprintf("%02d", m.start.Minute()), -1) ts = strings.Replace(ts, "{start_minute}", fmt.Sprintf("%d", m.start.Minute()), -1)
case "{start_second}": case "{start_second}":
ts = strings.Replace(ts, "{start_second}", fmt.Sprintf("%02d", m.start.Second()), -1) ts = strings.Replace(ts, "{start_second}", fmt.Sprintf("%d", m.start.Second()), -1)
case "{start_millisecond}": case "{start_millisecond}":
ts = strings.Replace(ts, "{start_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1) ts = strings.Replace(ts, "{start_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1)
case "{start_unix_millisecond}": case "{start_unix_millisecond}":
@@ -235,13 +285,15 @@ func (m *Muxer) filePatch() string {
case "{end_year}": case "{end_year}":
ts = strings.Replace(ts, "{end_year}", fmt.Sprintf("%d", m.end.Year()), -1) ts = strings.Replace(ts, "{end_year}", fmt.Sprintf("%d", m.end.Year()), -1)
case "{end_month}": case "{end_month}":
ts = strings.Replace(ts, "{end_month}", fmt.Sprintf("%02d", int(m.end.Month())), -1) ts = strings.Replace(ts, "{end_month}", fmt.Sprintf("%d", int(m.end.Month())), -1)
case "{end_day}": case "{end_day}":
ts = strings.Replace(ts, "{end_day}", fmt.Sprintf("%02d", m.end.Day()), -1) ts = strings.Replace(ts, "{end_day}", fmt.Sprintf("%d", m.end.Day()), -1)
case "{end_hour}":
ts = strings.Replace(ts, "{end_hour}", fmt.Sprintf("%d", m.end.Hour()), -1)
case "{end_minute}": case "{end_minute}":
ts = strings.Replace(ts, "{end_minute}", fmt.Sprintf("%02d", m.end.Minute()), -1) ts = strings.Replace(ts, "{end_minute}", fmt.Sprintf("%d", m.end.Minute()), -1)
case "{end_second}": case "{end_second}":
ts = strings.Replace(ts, "{end_second}", fmt.Sprintf("%02d", m.end.Second()), -1) ts = strings.Replace(ts, "{end_second}", fmt.Sprintf("%d", m.end.Second()), -1)
case "{end_millisecond}": case "{end_millisecond}":
ts = strings.Replace(ts, "{end_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1) ts = strings.Replace(ts, "{end_millisecond}", fmt.Sprintf("%d", m.start.Nanosecond()/1000/1000), -1)
case "{end_unix_millisecond}": case "{end_unix_millisecond}":
@@ -259,7 +311,7 @@ func (m *Muxer) filePatch() string {
} }
} }
return ts return ts, nil
} }
func (m *Muxer) WriteTrailer() (err error) { func (m *Muxer) WriteTrailer() (err error) {
@@ -271,12 +323,12 @@ func (m *Muxer) WriteTrailer() (err error) {
} }
if m.d != nil { if m.d != nil {
if m.format == MP4 { if m.format == MP4 {
p := m.filePatch() p, err := m.filePatch()
if err = os.MkdirAll(filepath.Dir(p), 0755); err != nil { if err != nil {
return return err
} }
if err = os.Rename(m.d.Name(), p); err != nil { if err = os.Rename(m.d.Name(), filepath.Join(filepath.Dir(m.d.Name()), filepath.Base(p))); err != nil {
return return err
} }
} }
err = m.d.Close() err = m.d.Close()