vdk/format/webrtcv3/adapter.go

234 lines
5.8 KiB
Go
Raw Normal View History

2021-01-05 09:21:25 +08:00
package webrtc
import (
"bytes"
"encoding/base64"
"errors"
2021-01-06 06:56:00 +08:00
"log"
2021-01-05 09:21:25 +08:00
"time"
"github.com/pion/webrtc/v3"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/pion/webrtc/v3/pkg/media"
)
2021-01-06 06:56:00 +08:00
const (
// MimeTypeH264 H264 MIME type.
MimeTypeH264 = "video/h264"
// MimeTypeOpus Opus MIME type
MimeTypeOpus = "audio/opus"
// MimeTypeVP8 VP8 MIME type
MimeTypeVP8 = "video/vp8"
// MimeTypeVP9 VP9 MIME type
MimeTypeVP9 = "video/vp9"
// MimeTypeG722 G722 MIME type
MimeTypeG722 = "audio/G722"
// MimeTypePCMU PCMU MIME type
MimeTypePCMU = "audio/PCMU"
// MimeTypePCMA PCMA MIME type
MimeTypePCMA = "audio/PCMA"
)
2021-01-05 09:21:25 +08:00
var (
2021-01-06 06:56:00 +08:00
ErrorNotFound = errors.New("WebRTC Stream Not Found")
ErrorCodecNotSupported = errors.New("WebRTC Codec Not Supported")
ErrorClientOffline = errors.New("WebRTC Client Offline")
ErrorNotTrackAvailable = errors.New("WebRTC Not Track Available")
2021-01-08 12:23:06 +08:00
ErrorIgnoreAudioTrack = errors.New("WebRTC Ignore Audio Track codec not supported WebRTC support only PCM_ALAW or PCM_MULAW")
2021-01-05 09:21:25 +08:00
)
type Muxer struct {
2021-01-06 06:56:00 +08:00
streams map[int8]*Stream
status webrtc.ICEConnectionState
stop bool
pc *webrtc.PeerConnection
ClientACK *time.Timer
StreamACK *time.Timer
2021-01-05 09:21:25 +08:00
}
type Stream struct {
codec av.CodecData
ts time.Duration
track *webrtc.TrackLocalStaticSample
}
func NewMuxer() *Muxer {
2021-01-06 06:56:00 +08:00
tmp := Muxer{ClientACK: time.NewTimer(time.Second * 20), StreamACK: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)}
2021-01-05 09:21:25 +08:00
go tmp.WaitCloser()
return &tmp
}
func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) {
2021-01-06 06:56:00 +08:00
var WriteHeaderSuccess bool
2021-01-05 09:21:25 +08:00
if len(streams) == 0 {
return "", ErrorNotFound
}
sdpB, err := base64.StdEncoding.DecodeString(sdp64)
if err != nil {
return "", err
}
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(sdpB),
}
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return "", err
}
2021-01-06 06:56:00 +08:00
defer func() {
if !WriteHeaderSuccess {
err = element.Close()
if err != nil {
log.Println(err)
}
}
}()
2021-01-05 09:21:25 +08:00
for i, i2 := range streams {
var track *webrtc.TrackLocalStaticSample
if i2.Type().IsVideo() {
2021-01-06 06:56:00 +08:00
if i2.Type() == av.H264 {
track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: "video/h264",
}, "pion-rtsp-video", "pion-rtsp-video")
if err != nil {
return "", err
}
if _, err = peerConnection.AddTrack(track); err != nil {
return "", err
}
2021-01-05 09:21:25 +08:00
}
} else if i2.Type().IsAudio() {
2021-01-06 06:56:00 +08:00
AudioCodecString := MimeTypePCMU
2021-01-05 09:21:25 +08:00
switch i2.Type() {
case av.PCM_ALAW:
2021-01-06 06:56:00 +08:00
AudioCodecString = MimeTypePCMA
2021-01-05 09:21:25 +08:00
case av.PCM_MULAW:
2021-01-06 06:56:00 +08:00
AudioCodecString = MimeTypePCMU
2021-01-05 09:21:25 +08:00
default:
2021-01-06 06:56:00 +08:00
log.Println(ErrorIgnoreAudioTrack)
2021-01-05 09:21:25 +08:00
continue
}
track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: AudioCodecString,
}, "pion-rtsp-audio", "pion-rtsp-audio")
if err != nil {
return "", err
}
if _, err = peerConnection.AddTrack(track); err != nil {
return "", err
}
}
element.streams[int8(i)] = &Stream{track: track, codec: i2}
}
2021-01-06 06:56:00 +08:00
if len(element.streams) == 0 {
return "", ErrorNotTrackAvailable
}
2021-01-05 09:21:25 +08:00
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
element.status = connectionState
if connectionState == webrtc.ICEConnectionStateDisconnected {
2021-01-06 06:56:00 +08:00
element.Close()
2021-01-05 09:21:25 +08:00
}
})
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) {
2021-01-06 06:56:00 +08:00
element.ClientACK.Reset(5 * time.Second)
2021-01-05 09:21:25 +08:00
})
})
if err = peerConnection.SetRemoteDescription(offer); err != nil {
return "", err
}
gatherCompletePromise := webrtc.GatheringCompletePromise(peerConnection)
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return "", err
}
if err = peerConnection.SetLocalDescription(answer); err != nil {
return "", err
}
element.pc = peerConnection
2021-01-06 06:56:00 +08:00
waitT := time.NewTimer(time.Second * 10)
2021-01-05 09:21:25 +08:00
select {
case <-waitT.C:
return "", errors.New("gatherCompletePromise wait")
case <-gatherCompletePromise:
2021-01-06 06:56:00 +08:00
//Connected
2021-01-05 09:21:25 +08:00
}
resp := peerConnection.LocalDescription()
2021-01-06 06:56:00 +08:00
WriteHeaderSuccess = true
2021-01-05 09:21:25 +08:00
return base64.StdEncoding.EncodeToString([]byte(resp.SDP)), nil
}
func (element *Muxer) WritePacket(pkt av.Packet) (err error) {
2021-01-06 06:56:00 +08:00
var WritePacketSuccess bool
defer func() {
if !WritePacketSuccess {
element.Close()
}
}()
2021-01-05 09:21:25 +08:00
if element.stop {
return ErrorClientOffline
}
if element.status != webrtc.ICEConnectionStateConnected {
return nil
}
if tmp, ok := element.streams[pkt.Idx]; ok {
2021-01-06 06:56:00 +08:00
element.StreamACK.Reset(10 * time.Second)
2021-01-05 09:21:25 +08:00
if tmp.ts == 0 {
tmp.ts = pkt.Time
}
switch tmp.codec.Type() {
case av.H264:
codec := tmp.codec.(h264parser.CodecData)
if pkt.IsKeyFrame {
pkt.Data = append([]byte{0, 0, 0, 1}, bytes.Join([][]byte{codec.SPS(), codec.PPS(), pkt.Data[4:]}, []byte{0, 0, 0, 1})...)
} else {
pkt.Data = pkt.Data[4:]
}
case av.PCM_MULAW:
case av.PCM_ALAW:
default:
return ErrorCodecNotSupported
}
2021-01-08 12:23:06 +08:00
err = tmp.track.WriteSample(media.Sample{Data: pkt.Data, Duration: pkt.Time - element.streams[pkt.Idx].ts})
2021-01-06 06:56:00 +08:00
if err == nil {
element.streams[pkt.Idx].ts = pkt.Time
WritePacketSuccess = true
}
2021-01-05 09:21:25 +08:00
return err
2021-01-06 06:56:00 +08:00
} else {
WritePacketSuccess = true
return nil
2021-01-05 09:21:25 +08:00
}
}
func (element *Muxer) WaitCloser() {
2021-01-06 06:56:00 +08:00
waitT := time.NewTimer(time.Second * 10)
for {
select {
case <-waitT.C:
if element.stop {
return
}
waitT.Reset(time.Second * 10)
case <-element.StreamACK.C:
2021-01-08 12:23:06 +08:00
log.Println("Stream Not Send Video Close")
2021-01-06 06:56:00 +08:00
element.Close()
case <-element.ClientACK.C:
2021-01-08 12:23:06 +08:00
log.Println("Client Not Send ACK (probably the browser is minimized) or tab not active Close client")
2021-01-06 06:56:00 +08:00
element.Close()
}
2021-01-05 09:21:25 +08:00
}
}
func (element *Muxer) Close() error {
2021-01-06 06:56:00 +08:00
element.stop = true
2021-01-05 09:21:25 +08:00
if element.pc != nil {
err := element.pc.Close()
if err != nil {
return err
}
}
return nil
}