fix closer bug client

This commit is contained in:
Andrey Semochkin 2021-01-06 01:56:00 +03:00
parent 1b7359bceb
commit 23ad40b14b

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/base64" "encoding/base64"
"errors" "errors"
"log"
"time" "time"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
@ -13,11 +14,29 @@ import (
"github.com/pion/webrtc/v3/pkg/media" "github.com/pion/webrtc/v3/pkg/media"
) )
const (
// MimeTypeH264 H264 MIME type.
MimeTypeH264 = "video/h264"
// MimeTypeOpus Opus MIME type
MimeTypeOpus = "audio/opus"
// MimeTypeVP8 VP8 MIME type
MimeTypeVP8 = "video/vp8"
// MimeTypeVP9 VP9 MIME type
MimeTypeVP9 = "video/vp9"
// MimeTypeG722 G722 MIME type
MimeTypeG722 = "audio/G722"
// MimeTypePCMU PCMU MIME type
MimeTypePCMU = "audio/PCMU"
// MimeTypePCMA PCMA MIME type
MimeTypePCMA = "audio/PCMA"
)
var ( var (
ErrorNotFound = errors.New("stream not found") ErrorNotFound = errors.New("WebRTC Stream Not Found")
ErrorCodecNotSupported = errors.New("codec not supported") ErrorCodecNotSupported = errors.New("WebRTC Codec Not Supported")
ErrorClientOffline = errors.New("client offline") ErrorClientOffline = errors.New("WebRTC Client Offline")
Label = "track_" ErrorNotTrackAvailable = errors.New("WebRTC Not Track Available")
ErrorIgnoreAudioTrack = errors.New("WebRTC Ignore Audio Track codec not supported WebRTC")
) )
type Muxer struct { type Muxer struct {
@ -25,8 +44,8 @@ type Muxer struct {
status webrtc.ICEConnectionState status webrtc.ICEConnectionState
stop bool stop bool
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
pt *time.Timer ClientACK *time.Timer
ps chan bool StreamACK *time.Timer
} }
type Stream struct { type Stream struct {
codec av.CodecData codec av.CodecData
@ -35,12 +54,13 @@ type Stream struct {
} }
func NewMuxer() *Muxer { func NewMuxer() *Muxer {
tmp := Muxer{ps: make(chan bool, 100), pt: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)} tmp := Muxer{ClientACK: time.NewTimer(time.Second * 20), StreamACK: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)}
go tmp.WaitCloser() go tmp.WaitCloser()
return &tmp return &tmp
} }
func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) { func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) {
var WriteHeaderSuccess bool
if len(streams) == 0 { if len(streams) == 0 {
return "", ErrorNotFound return "", ErrorNotFound
} }
@ -56,12 +76,18 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string,
if err != nil { if err != nil {
return "", err return "", err
} }
defer func() {
if !WriteHeaderSuccess {
err = element.Close()
if err != nil {
log.Println(err)
}
}
}()
for i, i2 := range streams { for i, i2 := range streams {
var track *webrtc.TrackLocalStaticSample var track *webrtc.TrackLocalStaticSample
if i2.Type().IsVideo() { if i2.Type().IsVideo() {
if i2.Type() != av.H264 { if i2.Type() == av.H264 {
return "", errors.New("Video Not h264 codec not supported")
}
track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: "video/h264", MimeType: "video/h264",
}, "pion-rtsp-video", "pion-rtsp-video") }, "pion-rtsp-video", "pion-rtsp-video")
@ -71,18 +97,18 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string,
if _, err = peerConnection.AddTrack(track); err != nil { if _, err = peerConnection.AddTrack(track); err != nil {
return "", err return "", err
} }
}
} else if i2.Type().IsAudio() { } else if i2.Type().IsAudio() {
AudioCodecString := "audio/PCMU" AudioCodecString := MimeTypePCMU
switch i2.Type() { switch i2.Type() {
case av.PCM_ALAW: case av.PCM_ALAW:
AudioCodecString = "audio/PCMA" AudioCodecString = MimeTypePCMA
case av.PCM_MULAW: case av.PCM_MULAW:
AudioCodecString = "audio/PCMU" AudioCodecString = MimeTypePCMU
default: default:
return "", errors.New("No Audio Codec Supported") log.Println(ErrorIgnoreAudioTrack)
continue continue
} }
//log.Fatalln(i2.Type())
track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: AudioCodecString, MimeType: AudioCodecString,
}, "pion-rtsp-audio", "pion-rtsp-audio") }, "pion-rtsp-audio", "pion-rtsp-audio")
@ -95,15 +121,18 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string,
} }
element.streams[int8(i)] = &Stream{track: track, codec: i2} element.streams[int8(i)] = &Stream{track: track, codec: i2}
} }
if len(element.streams) == 0 {
return "", ErrorNotTrackAvailable
}
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
element.status = connectionState element.status = connectionState
if connectionState == webrtc.ICEConnectionStateDisconnected { if connectionState == webrtc.ICEConnectionStateDisconnected {
element.ps <- true element.Close()
} }
}) })
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) { d.OnMessage(func(msg webrtc.DataChannelMessage) {
element.pt.Reset(5 * time.Second) element.ClientACK.Reset(5 * time.Second)
}) })
}) })
@ -119,18 +148,26 @@ func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string,
return "", err return "", err
} }
element.pc = peerConnection element.pc = peerConnection
waitT := time.NewTimer(time.Second * 20) waitT := time.NewTimer(time.Second * 10)
select { select {
case <-waitT.C: case <-waitT.C:
return "", errors.New("gatherCompletePromise wait") return "", errors.New("gatherCompletePromise wait")
case <-gatherCompletePromise: case <-gatherCompletePromise:
//Connected
} }
resp := peerConnection.LocalDescription() resp := peerConnection.LocalDescription()
WriteHeaderSuccess = true
return base64.StdEncoding.EncodeToString([]byte(resp.SDP)), nil return base64.StdEncoding.EncodeToString([]byte(resp.SDP)), nil
} }
func (element *Muxer) WritePacket(pkt av.Packet) (err error) { func (element *Muxer) WritePacket(pkt av.Packet) (err error) {
var WritePacketSuccess bool
defer func() {
if !WritePacketSuccess {
element.Close()
}
}()
if element.stop { if element.stop {
return ErrorClientOffline return ErrorClientOffline
} }
@ -138,6 +175,7 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) {
return nil return nil
} }
if tmp, ok := element.streams[pkt.Idx]; ok { if tmp, ok := element.streams[pkt.Idx]; ok {
element.StreamACK.Reset(10 * time.Second)
if tmp.ts == 0 { if tmp.ts == 0 {
tmp.ts = pkt.Time tmp.ts = pkt.Time
} }
@ -149,33 +187,40 @@ func (element *Muxer) WritePacket(pkt av.Packet) (err error) {
} else { } else {
pkt.Data = pkt.Data[4:] pkt.Data = pkt.Data[4:]
} }
//log.Println("video", pkt.Time-tmp.ts)
case av.PCM_MULAW: case av.PCM_MULAW:
//log.Println("audio", pkt.Time-tmp.ts)
case av.PCM_ALAW: case av.PCM_ALAW:
//log.Println("audio", pkt.Time-tmp.ts)
default: default:
return ErrorCodecNotSupported return ErrorCodecNotSupported
} }
//log.Println(tmp.codec.Type(), pkt.Time-tmp.ts) err = tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - tmp.ts})
err := tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - tmp.ts}) if err == nil {
element.streams[pkt.Idx].ts = pkt.Time element.streams[pkt.Idx].ts = pkt.Time
return err WritePacketSuccess = true
}
return err
} else {
WritePacketSuccess = true
return nil
} }
return ErrorNotFound
} }
func (element *Muxer) WaitCloser() { func (element *Muxer) WaitCloser() {
waitT := time.NewTimer(time.Second * 10)
for {
select { select {
case <-element.ps: case <-waitT.C:
element.stop = true if element.stop {
return
}
waitT.Reset(time.Second * 10)
case <-element.StreamACK.C:
element.Close() element.Close()
case <-element.pt.C: case <-element.ClientACK.C:
element.stop = true
element.Close() element.Close()
} }
}
} }
func (element *Muxer) Close() error { func (element *Muxer) Close() error {
element.stop = true
if element.pc != nil { if element.pc != nil {
err := element.pc.Close() err := element.pc.Close()
if err != nil { if err != nil {