Merge pull request #3 from aginetwork7/feature/clean-queue
Feature clean queue and add seek playback
This commit is contained in:
commit
feb71f62ab
25
av/av.go
25
av/av.go
@ -253,8 +253,9 @@ type Packet struct {
|
|||||||
Idx int8 // stream index in container format
|
Idx int8 // stream index in container format
|
||||||
CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame
|
CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame
|
||||||
Time time.Duration // packet decode time
|
Time time.Duration // packet decode time
|
||||||
Duration time.Duration //packet duration
|
Duration time.Duration // packet duration
|
||||||
Data []byte // packet data
|
Data []byte // packet data
|
||||||
|
RealTimestamp int64 // packet real timestamp (ms)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Raw audio frame.
|
// Raw audio frame.
|
||||||
@ -266,33 +267,33 @@ type AudioFrame struct {
|
|||||||
Data [][]byte // data array for planar format len(Data) > 1
|
Data [][]byte // data array for planar format len(Data) > 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self AudioFrame) Duration() time.Duration {
|
func (af AudioFrame) Duration() time.Duration {
|
||||||
return time.Second * time.Duration(self.SampleCount) / time.Duration(self.SampleRate)
|
return time.Second * time.Duration(af.SampleCount) / time.Duration(af.SampleRate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check this audio frame has same format as other audio frame.
|
// Check this audio frame has same format as other audio frame.
|
||||||
func (self AudioFrame) HasSameFormat(other AudioFrame) bool {
|
func (af AudioFrame) HasSameFormat(other AudioFrame) bool {
|
||||||
if self.SampleRate != other.SampleRate {
|
if af.SampleRate != other.SampleRate {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if self.ChannelLayout != other.ChannelLayout {
|
if af.ChannelLayout != other.ChannelLayout {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if self.SampleFormat != other.SampleFormat {
|
if af.SampleFormat != other.SampleFormat {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Split sample audio sample from this frame.
|
// Split sample audio sample from this frame.
|
||||||
func (self AudioFrame) Slice(start int, end int) (out AudioFrame) {
|
func (af AudioFrame) Slice(start int, end int) (out AudioFrame) {
|
||||||
if start > end {
|
if start > end {
|
||||||
panic(fmt.Sprintf("av: AudioFrame split failed start=%d end=%d invalid", start, end))
|
panic(fmt.Sprintf("av: AudioFrame split failed start=%d end=%d invalid", start, end))
|
||||||
}
|
}
|
||||||
out = self
|
out = af
|
||||||
out.Data = append([][]byte(nil), out.Data...)
|
out.Data = append([][]byte(nil), out.Data...)
|
||||||
out.SampleCount = end - start
|
out.SampleCount = end - start
|
||||||
size := self.SampleFormat.BytesPerSample()
|
size := af.SampleFormat.BytesPerSample()
|
||||||
for i := range out.Data {
|
for i := range out.Data {
|
||||||
out.Data[i] = out.Data[i][start*size : end*size]
|
out.Data[i] = out.Data[i][start*size : end*size]
|
||||||
}
|
}
|
||||||
@ -300,8 +301,8 @@ func (self AudioFrame) Slice(start int, end int) (out AudioFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Concat two audio frames.
|
// Concat two audio frames.
|
||||||
func (self AudioFrame) Concat(in AudioFrame) (out AudioFrame) {
|
func (af AudioFrame) Concat(in AudioFrame) (out AudioFrame) {
|
||||||
out = self
|
out = af
|
||||||
out.Data = append([][]byte(nil), out.Data...)
|
out.Data = append([][]byte(nil), out.Data...)
|
||||||
out.SampleCount += in.SampleCount
|
out.SampleCount += in.SampleCount
|
||||||
for i := range out.Data {
|
for i := range out.Data {
|
||||||
|
@ -106,6 +106,20 @@ func (self *Queue) WritePacket(pkt av.Packet) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Queue) Clean() {
|
||||||
|
self.lock.Lock()
|
||||||
|
for self.buf.Count > 1 {
|
||||||
|
pkt := self.buf.Pop()
|
||||||
|
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
|
||||||
|
self.curgopcount--
|
||||||
|
}
|
||||||
|
if self.curgopcount < self.maxgopcount {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
type QueueCursor struct {
|
type QueueCursor struct {
|
||||||
que *Queue
|
que *Queue
|
||||||
pos pktque.BufPos
|
pos pktque.BufPos
|
||||||
|
@ -85,6 +85,8 @@ func Parse(content string) (sess Session, medias []Media) {
|
|||||||
media.Rtpmap, _ = strconv.Atoi(val)
|
media.Rtpmap, _ = strconv.Atoi(val)
|
||||||
case "x-framerate":
|
case "x-framerate":
|
||||||
media.FPS, _ = strconv.Atoi(val)
|
media.FPS, _ = strconv.Atoi(val)
|
||||||
|
case "framerate":
|
||||||
|
media.FPS, _ = strconv.Atoi(val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keyval = strings.Split(field, "/")
|
keyval = strings.Split(field, "/")
|
||||||
|
@ -97,7 +97,12 @@ type RTSPClient struct {
|
|||||||
sequenceNumber int
|
sequenceNumber int
|
||||||
end int
|
end int
|
||||||
offset int
|
offset int
|
||||||
|
realVideoTs int64
|
||||||
|
keyFrameRealVideoTs int64
|
||||||
|
keyFrameIterateDrua int64
|
||||||
|
preDuration time.Duration
|
||||||
status string
|
status string
|
||||||
|
lastPausedTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type RTSPClientOptions struct {
|
type RTSPClientOptions struct {
|
||||||
@ -266,7 +271,6 @@ func (client *RTSPClient) startStream() {
|
|||||||
timer := time.Now()
|
timer := time.Now()
|
||||||
oneb := make([]byte, 1)
|
oneb := make([]byte, 1)
|
||||||
header := make([]byte, 4)
|
header := make([]byte, 4)
|
||||||
var fixed bool
|
|
||||||
for {
|
for {
|
||||||
err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
|
err := client.conn.SetDeadline(time.Now().Add(client.options.ReadWriteTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -281,18 +285,19 @@ func (client *RTSPClient) startStream() {
|
|||||||
}
|
}
|
||||||
timer = time.Now()
|
timer = time.Now()
|
||||||
}
|
}
|
||||||
if client.status == PAUSE {
|
|
||||||
// client.Println("RTSP Client PAUSE")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !fixed {
|
|
||||||
nb, err := io.ReadFull(client.connRW, header)
|
nb, err := io.ReadFull(client.connRW, header)
|
||||||
if err != nil || nb != 4 {
|
if err != nil || nb != 4 {
|
||||||
|
if client.status == PAUSE && strings.Contains(err.Error(), "timeout") {
|
||||||
|
if time.Since(client.lastPausedTime) >= time.Second*300 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
client.Println("RTSP Client RTP Read Header", err)
|
client.Println("RTSP Client RTP Read Header", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
fixed = false
|
|
||||||
switch header[0] {
|
switch header[0] {
|
||||||
case 0x24:
|
case 0x24:
|
||||||
length := int32(binary.BigEndian.Uint16(header[2:]))
|
length := int32(binary.BigEndian.Uint16(header[2:]))
|
||||||
@ -514,6 +519,7 @@ func (client *RTSPClient) Pause() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
client.status = PAUSE
|
client.status = PAUSE
|
||||||
|
client.lastPausedTime = time.Now()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -531,6 +537,7 @@ func (client *RTSPClient) Seek(customHeaders map[string]string, target int64) er
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
client.status = PLAY
|
client.status = PLAY
|
||||||
|
client.PreVideoTS = 0
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ package rtspv2
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
|
||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,6 +26,15 @@ func (client *RTSPClient) containsPayloadType(pt int) bool {
|
|||||||
return exist
|
return exist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (client *RTSPClient) durationFromSDP() time.Duration {
|
||||||
|
for _, sdp := range client.mediaSDP {
|
||||||
|
if sdp.AVType == VIDEO && sdp.FPS != 0 {
|
||||||
|
return time.Duration((int(1000) / sdp.FPS) * int(time.Millisecond))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
||||||
content := *payloadRAW
|
content := *payloadRAW
|
||||||
firstByte := content[4]
|
firstByte := content[4]
|
||||||
@ -36,18 +44,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
|||||||
payloadType := int(content[5] & 0x7f)
|
payloadType := int(content[5] & 0x7f)
|
||||||
sequenceNumber := int(binary.BigEndian.Uint16(content[6:8]))
|
sequenceNumber := int(binary.BigEndian.Uint16(content[6:8]))
|
||||||
timestamp := int64(binary.BigEndian.Uint32(content[8:12]))
|
timestamp := int64(binary.BigEndian.Uint32(content[8:12]))
|
||||||
|
// SSRC := binary.BigEndian.Uint32(content[12:16])
|
||||||
if isRTCPPacket(content) {
|
if isRTCPPacket(content) {
|
||||||
client.Println("skipping RTCP packet")
|
client.Println("skipping RTCP packet")
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if !client.containsPayloadType(payloadType) {
|
if !client.containsPayloadType(payloadType) {
|
||||||
client.Println(fmt.Sprintf("skipping RTP packet, paytload type: %v", payloadType))
|
// client.Println(fmt.Sprintf("skipping RTP packet, paytload type: %v", payloadType))
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// client.Println(fmt.Sprintf("padding: %v, extension: %v, csrccnt: %d, sequence number: %d.payload type: %d, timestamp: %d",
|
// client.Println(fmt.Sprintf("padding: %v, extension: %v, csrccnt: %d, sequence number: %d.payload type: %d, timestamp: %d", padding, extension, CSRCCnt, sequenceNumber, payloadType, timestamp))
|
||||||
// padding, extension, CSRCCnt, sequenceNumber, payloadType, timestamp))
|
|
||||||
client.offset = RTPHeaderSize
|
client.offset = RTPHeaderSize
|
||||||
client.sequenceNumber = sequenceNumber
|
client.sequenceNumber = sequenceNumber
|
||||||
client.timestamp = timestamp
|
client.timestamp = timestamp
|
||||||
@ -58,13 +66,18 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) {
|
|||||||
if extension && len(content) < 4+client.offset+2+2 {
|
if extension && len(content) < 4+client.offset+2+2 {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
var realTimestamp int64
|
||||||
if extension && client.end-client.offset >= 4 {
|
if extension && client.end-client.offset >= 4 {
|
||||||
extLen := 4 * int(binary.BigEndian.Uint16(content[4+client.offset+2:]))
|
extWords := int(binary.BigEndian.Uint16(content[4+client.offset+2:]))
|
||||||
client.offset += 4
|
extLen := 4 * extWords
|
||||||
|
client.offset += 4 // this is profile(2 byte) + ext length(2 byte)
|
||||||
|
realTimestamp = int64(binary.BigEndian.Uint32(content[client.offset+4 : client.offset+4*2]))
|
||||||
if client.end-client.offset >= extLen {
|
if client.end-client.offset >= extLen {
|
||||||
client.offset += extLen
|
client.offset += extLen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
client.realVideoTs = realTimestamp
|
||||||
|
|
||||||
if padding && client.end-client.offset > 0 {
|
if padding && client.end-client.offset > 0 {
|
||||||
paddingLen := int(content[client.end-1])
|
paddingLen := int(content[client.end-1])
|
||||||
if client.end-client.offset >= paddingLen {
|
if client.end-client.offset >= paddingLen {
|
||||||
@ -286,12 +299,31 @@ func (client *RTSPClient) appendAudioPacket(retmap []*av.Packet, nal []byte, dur
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (client *RTSPClient) appendVideoPacket(retmap []*av.Packet, nal []byte, isKeyFrame bool) []*av.Packet {
|
func (client *RTSPClient) appendVideoPacket(retmap []*av.Packet, nal []byte, isKeyFrame bool) []*av.Packet {
|
||||||
|
duration := time.Duration(float32(client.timestamp-client.PreVideoTS)/TimeBaseFactor) * time.Millisecond
|
||||||
|
sdpDuration := client.durationFromSDP()
|
||||||
|
if sdpDuration != 0 && duration > sdpDuration {
|
||||||
|
duration = sdpDuration
|
||||||
|
}
|
||||||
|
if duration == 0 {
|
||||||
|
duration = client.preDuration
|
||||||
|
}
|
||||||
|
client.preDuration = duration
|
||||||
|
if isKeyFrame {
|
||||||
|
client.keyFrameRealVideoTs = client.realVideoTs
|
||||||
|
client.keyFrameIterateDrua = 0
|
||||||
|
} else {
|
||||||
|
client.keyFrameIterateDrua += duration.Milliseconds()
|
||||||
|
}
|
||||||
|
realVideoMs := client.keyFrameRealVideoTs * 1000
|
||||||
|
|
||||||
|
realVideoMs += client.keyFrameIterateDrua
|
||||||
return append(retmap, &av.Packet{
|
return append(retmap, &av.Packet{
|
||||||
Data: append(binSize(len(nal)), nal...),
|
Data: append(binSize(len(nal)), nal...),
|
||||||
CompositionTime: time.Duration(TimeDelay) * time.Millisecond,
|
CompositionTime: time.Duration(TimeDelay) * time.Millisecond,
|
||||||
Idx: client.videoIDX,
|
Idx: client.videoIDX,
|
||||||
IsKeyFrame: isKeyFrame,
|
IsKeyFrame: isKeyFrame,
|
||||||
Duration: time.Duration(float32(client.timestamp-client.PreVideoTS)/TimeBaseFactor) * time.Millisecond,
|
Duration: duration,
|
||||||
Time: time.Duration(client.timestamp/TimeBaseFactor) * time.Millisecond,
|
Time: time.Duration(client.timestamp/TimeBaseFactor) * time.Millisecond,
|
||||||
|
RealTimestamp: realVideoMs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user