From 8d167fd1c06729666a79487047dcb7e9528d689f Mon Sep 17 00:00:00 2001 From: Andrey Semochkin Date: Fri, 15 Jan 2021 18:26:23 +0300 Subject: [PATCH] fix audio del 4 byte on start fix webrtc add options ice and port, webrtc.MediaEngine{} --- format/mp4f/muxer.go | 3 --- format/rtspv2/client.go | 25 ++++++++++++++---- format/webrtcv3/adapter.go | 53 ++++++++++++++++++++++++++++---------- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/format/mp4f/muxer.go b/format/mp4f/muxer.go index 0f7568d..9b24833 100644 --- a/format/mp4f/muxer.go +++ b/format/mp4f/muxer.go @@ -272,9 +272,6 @@ func (element *Muxer) WritePacket(pkt av.Packet, GOP bool) (bool, []byte, error) if stream.lastpkt != nil { ts = pkt.Time - stream.lastpkt.Time } - if stream.CodecData.Type().IsAudio() { - pkt.Data = pkt.Data[4:] - } got, buf, err := stream.writePacketV2(pkt, ts, 5) stream.lastpkt = &pkt if err != nil { diff --git a/format/rtspv2/client.go b/format/rtspv2/client.go index a7056f6..9597de4 100644 --- a/format/rtspv2/client.go +++ b/format/rtspv2/client.go @@ -522,7 +522,6 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { client.BufferRtpPacket.Truncate(0) client.BufferRtpPacket.Reset() } - nalRaw, _ := h264parser.SplitNALUs(content[offset:end]) var retmap []*av.Packet for _, nal := range nalRaw { @@ -559,6 +558,22 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { if isEnd { client.fuStarted = false naluTypef := client.BufferRtpPacket.Bytes()[0] & 0x1f + if naluTypef == 7 { + bufered, _ := h264parser.SplitNALUs(append([]byte{0, 0, 0, 1}, client.BufferRtpPacket.Bytes()...)) + for _, v := range bufered { + naluTypefs := v[0] & 0x1f + switch { + case naluTypefs == 5: + client.BufferRtpPacket.Reset() + client.BufferRtpPacket.Write(v) + naluTypef = 5 + case naluTypefs == 7: + client.CodecUpdateSPS(v) + case naluTypefs == 8: + client.CodecUpdatePPS(v) + } + } + } retmap = append(retmap, &av.Packet{ Data: append(binSize(client.BufferRtpPacket.Len()), client.BufferRtpPacket.Bytes()...), CompositionTime: time.Duration(1) * time.Millisecond, @@ -591,7 +606,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { duration = time.Duration(len(nal)) * time.Second / time.Duration(client.AudioTimeScale) client.AudioTimeLine += duration retmap = append(retmap, &av.Packet{ - Data: append(binSize(len(nal)), nal...), + Data: nal, CompositionTime: time.Duration(1) * time.Millisecond, Duration: duration, Idx: client.audioIDX, @@ -602,7 +617,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { duration = time.Duration(len(nal)) * time.Second / time.Duration(client.AudioTimeScale) client.AudioTimeLine += duration retmap = append(retmap, &av.Packet{ - Data: append(binSize(len(nal)), nal...), + Data: nal, CompositionTime: time.Duration(1) * time.Millisecond, Duration: duration, Idx: client.audioIDX, @@ -613,7 +628,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { duration = time.Duration(20) * time.Millisecond client.AudioTimeLine += duration retmap = append(retmap, &av.Packet{ - Data: append(binSize(len(nal)), nal...), + Data: nal, CompositionTime: time.Duration(1) * time.Millisecond, Duration: duration, Idx: client.audioIDX, @@ -638,7 +653,7 @@ func (client *RTSPClient) RTPDemuxer(payloadRAW *[]byte) ([]*av.Packet, bool) { duration = time.Duration((float32(1024)/float32(client.AudioTimeScale))*1000) * time.Millisecond client.AudioTimeLine += duration retmap = append(retmap, &av.Packet{ - Data: append(binSize(len(frame)), frame...), + Data: frame, CompositionTime: time.Duration(1) * time.Millisecond, Duration: duration, Idx: client.audioIDX, diff --git a/format/webrtcv3/adapter.go b/format/webrtcv3/adapter.go index 34c3e96..5a0ed30 100644 --- a/format/webrtcv3/adapter.go +++ b/format/webrtcv3/adapter.go @@ -7,6 +7,7 @@ import ( "log" "time" + "github.com/pion/interceptor" "github.com/pion/webrtc/v3" "github.com/deepch/vdk/av" @@ -29,19 +30,44 @@ type Muxer struct { pc *webrtc.PeerConnection ClientACK *time.Timer StreamACK *time.Timer + Options Options } type Stream struct { codec av.CodecData - ts time.Duration track *webrtc.TrackLocalStaticSample } +type Options struct { + ICEServers []string + PortMin uint16 + PortMax uint16 +} -func NewMuxer() *Muxer { - tmp := Muxer{ClientACK: time.NewTimer(time.Second * 20), StreamACK: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)} +func NewMuxer(options Options) *Muxer { + tmp := Muxer{Options: options, ClientACK: time.NewTimer(time.Second * 20), StreamACK: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)} go tmp.WaitCloser() return &tmp } - +func (element *Muxer) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { + if len(element.Options.ICEServers) > 0 { + log.Println("Set ICEServers", element.Options.ICEServers) + configuration.ICEServers = append(configuration.ICEServers, webrtc.ICEServer{URLs: element.Options.ICEServers}) + } + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + return nil, err + } + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + return nil, err + } + s := webrtc.SettingEngine{} + if element.Options.PortMin > 0 && element.Options.PortMax > 0 && element.Options.PortMax > element.Options.PortMin { + s.SetEphemeralUDPPortRange(element.Options.PortMin, element.Options.PortMax) + log.Println("Set UDP ports to", element.Options.PortMin, "..", element.Options.PortMax) + } + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(s)) + return api.NewPeerConnection(configuration) +} func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) { var WriteHeaderSuccess bool if len(streams) == 0 { @@ -55,7 +81,7 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, Type: webrtc.SDPTypeOffer, SDP: string(sdpB), } - peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) + peerConnection, err := element.NewPeerConnection(webrtc.Configuration{}) if err != nil { return "", err } @@ -169,9 +195,6 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) { } if tmp, ok := element.streams[pkt.Idx]; ok { element.StreamACK.Reset(10 * time.Second) - if tmp.ts == 0 { - tmp.ts = pkt.Time - } switch tmp.codec.Type() { case av.H264: codec := tmp.codec.(h264parser.CodecData) @@ -180,18 +203,20 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) { } else { pkt.Data = pkt.Data[4:] } - case av.PCM_MULAW: - pkt.Data = pkt.Data[4:] case av.PCM_ALAW: - pkt.Data = pkt.Data[4:] case av.OPUS: - pkt.Data = pkt.Data[4:] + case av.PCM_ALAW: + case av.AAC: + //TODO: NEED ADD DECODER AND ENCODER + return ErrorCodecNotSupported + case av.PCM: + //TODO: NEED ADD ENCODER + return ErrorCodecNotSupported default: return ErrorCodecNotSupported } - err = tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - element.streams[pkt.Idx].ts}) + err = tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Duration}) if err == nil { - element.streams[pkt.Idx].ts = pkt.Time WritePacketSuccess = true } return err