diff --git a/format/webrtcv3/adapter.go b/format/webrtcv3/adapter.go index 1c1d387..91d4c6f 100644 --- a/format/webrtcv3/adapter.go +++ b/format/webrtcv3/adapter.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "errors" + "log" "time" "github.com/pion/webrtc/v3" @@ -13,20 +14,38 @@ import ( "github.com/pion/webrtc/v3/pkg/media" ) +const ( + // MimeTypeH264 H264 MIME type. + MimeTypeH264 = "video/h264" + // MimeTypeOpus Opus MIME type + MimeTypeOpus = "audio/opus" + // MimeTypeVP8 VP8 MIME type + MimeTypeVP8 = "video/vp8" + // MimeTypeVP9 VP9 MIME type + MimeTypeVP9 = "video/vp9" + // MimeTypeG722 G722 MIME type + MimeTypeG722 = "audio/G722" + // MimeTypePCMU PCMU MIME type + MimeTypePCMU = "audio/PCMU" + // MimeTypePCMA PCMA MIME type + MimeTypePCMA = "audio/PCMA" +) + var ( - ErrorNotFound = errors.New("stream not found") - ErrorCodecNotSupported = errors.New("codec not supported") - ErrorClientOffline = errors.New("client offline") - Label = "track_" + ErrorNotFound = errors.New("WebRTC Stream Not Found") + ErrorCodecNotSupported = errors.New("WebRTC Codec Not Supported") + ErrorClientOffline = errors.New("WebRTC Client Offline") + ErrorNotTrackAvailable = errors.New("WebRTC Not Track Available") + ErrorIgnoreAudioTrack = errors.New("WebRTC Ignore Audio Track codec not supported WebRTC") ) type Muxer struct { - streams map[int8]*Stream - status webrtc.ICEConnectionState - stop bool - pc *webrtc.PeerConnection - pt *time.Timer - ps chan bool + streams map[int8]*Stream + status webrtc.ICEConnectionState + stop bool + pc *webrtc.PeerConnection + ClientACK *time.Timer + StreamACK *time.Timer } type Stream struct { codec av.CodecData @@ -35,12 +54,13 @@ type Stream struct { } func NewMuxer() *Muxer { - tmp := Muxer{ps: make(chan bool, 100), pt: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)} + tmp := Muxer{ClientACK: time.NewTimer(time.Second * 20), StreamACK: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)} go tmp.WaitCloser() return &tmp } func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) { + var WriteHeaderSuccess bool if len(streams) == 0 { return "", ErrorNotFound } @@ -56,33 +76,39 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, if err != nil { return "", err } + defer func() { + if !WriteHeaderSuccess { + err = element.Close() + if err != nil { + log.Println(err) + } + } + }() for i, i2 := range streams { var track *webrtc.TrackLocalStaticSample if i2.Type().IsVideo() { - if i2.Type() != av.H264 { - return "", errors.New("Video Not h264 codec not supported") - } - track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ - MimeType: "video/h264", - }, "pion-rtsp-video", "pion-rtsp-video") - if err != nil { - return "", err - } - if _, err = peerConnection.AddTrack(track); err != nil { - return "", err + if i2.Type() == av.H264 { + track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ + MimeType: "video/h264", + }, "pion-rtsp-video", "pion-rtsp-video") + if err != nil { + return "", err + } + if _, err = peerConnection.AddTrack(track); err != nil { + return "", err + } } } else if i2.Type().IsAudio() { - AudioCodecString := "audio/PCMU" + AudioCodecString := MimeTypePCMU switch i2.Type() { case av.PCM_ALAW: - AudioCodecString = "audio/PCMA" + AudioCodecString = MimeTypePCMA case av.PCM_MULAW: - AudioCodecString = "audio/PCMU" + AudioCodecString = MimeTypePCMU default: - return "", errors.New("No Audio Codec Supported") + log.Println(ErrorIgnoreAudioTrack) continue } - //log.Fatalln(i2.Type()) track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ MimeType: AudioCodecString, }, "pion-rtsp-audio", "pion-rtsp-audio") @@ -95,15 +121,18 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, } element.streams[int8(i)] = &Stream{track: track, codec: i2} } + if len(element.streams) == 0 { + return "", ErrorNotTrackAvailable + } peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { element.status = connectionState if connectionState == webrtc.ICEConnectionStateDisconnected { - element.ps <- true + element.Close() } }) peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { d.OnMessage(func(msg webrtc.DataChannelMessage) { - element.pt.Reset(5 * time.Second) + element.ClientACK.Reset(5 * time.Second) }) }) @@ -119,18 +148,26 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, return "", err } element.pc = peerConnection - waitT := time.NewTimer(time.Second * 20) + waitT := time.NewTimer(time.Second * 10) select { case <-waitT.C: return "", errors.New("gatherCompletePromise wait") case <-gatherCompletePromise: + //Connected } resp := peerConnection.LocalDescription() + WriteHeaderSuccess = true return base64.StdEncoding.EncodeToString([]byte(resp.SDP)), nil } func (element *Muxer) WritePacket(pkt av.Packet) (err error) { + var WritePacketSuccess bool + defer func() { + if !WritePacketSuccess { + element.Close() + } + }() if element.stop { return ErrorClientOffline } @@ -138,6 +175,7 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) { return nil } if tmp, ok := element.streams[pkt.Idx]; ok { + element.StreamACK.Reset(10 * time.Second) if tmp.ts == 0 { tmp.ts = pkt.Time } @@ -149,33 +187,40 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) { } else { pkt.Data = pkt.Data[4:] } - //log.Println("video", pkt.Time-tmp.ts) case av.PCM_MULAW: - //log.Println("audio", pkt.Time-tmp.ts) case av.PCM_ALAW: - //log.Println("audio", pkt.Time-tmp.ts) default: return ErrorCodecNotSupported } - //log.Println(tmp.codec.Type(), pkt.Time-tmp.ts) - err := tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - tmp.ts}) - element.streams[pkt.Idx].ts = pkt.Time + err = tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - tmp.ts}) + if err == nil { + element.streams[pkt.Idx].ts = pkt.Time + WritePacketSuccess = true + } return err + } else { + WritePacketSuccess = true + return nil } - return ErrorNotFound - } func (element *Muxer) WaitCloser() { - select { - case <-element.ps: - element.stop = true - element.Close() - case <-element.pt.C: - element.stop = true - element.Close() + waitT := time.NewTimer(time.Second * 10) + for { + select { + case <-waitT.C: + if element.stop { + return + } + waitT.Reset(time.Second * 10) + case <-element.StreamACK.C: + element.Close() + case <-element.ClientACK.C: + element.Close() + } } } func (element *Muxer) Close() error { + element.stop = true if element.pc != nil { err := element.pc.Close() if err != nil {