vdk/format/webrtc/adapter.go

175 lines
4.3 KiB
Go
Raw Permalink Normal View History

2020-08-09 01:24:23 +08:00
package webrtc
import (
2020-08-11 09:21:33 +08:00
"bytes"
2020-08-09 01:24:23 +08:00
"encoding/base64"
"errors"
"fmt"
2020-08-09 01:29:34 +08:00
"math/rand"
2020-08-09 03:28:58 +08:00
"time"
"github.com/pion/webrtc/v2"
2020-08-09 01:29:34 +08:00
2020-08-09 01:24:23 +08:00
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
2020-08-09 03:28:58 +08:00
"github.com/pion/webrtc/v2/pkg/media"
2020-08-09 01:24:23 +08:00
)
2020-08-11 09:21:33 +08:00
var (
ErrorNotFound = errors.New("stream not found")
ErrorCodecNotSupported = errors.New("codec not supported")
ErrorClientOffline = errors.New("client offline")
Label = "track_"
)
2020-08-09 01:24:23 +08:00
type Muxer struct {
2020-08-11 09:21:33 +08:00
streams map[int8]*Stream
status webrtc.ICEConnectionState
stop bool
pc *webrtc.PeerConnection
pt *time.Timer
ps chan bool
2020-08-09 01:24:23 +08:00
}
type Stream struct {
codec av.CodecData
track *webrtc.Track
}
2020-08-09 01:29:34 +08:00
2020-08-09 01:24:23 +08:00
func NewMuxer() *Muxer {
2020-08-11 09:21:33 +08:00
tmp := Muxer{ps: make(chan bool, 100), pt: time.NewTimer(time.Second * 20), streams: make(map[int8]*Stream)}
go tmp.WaitCloser()
return &tmp
2020-08-09 01:24:23 +08:00
}
2020-08-09 01:29:34 +08:00
2020-08-11 09:21:33 +08:00
func (element *Muxer) WriteHeader(streams []av.CodecData, sdp64 string) (string, error) {
2020-08-09 01:24:23 +08:00
if len(streams) == 0 {
2020-08-11 09:21:33 +08:00
return "", ErrorNotFound
2020-08-09 01:24:23 +08:00
}
mediaEngine := webrtc.MediaEngine{}
sdpB, err := base64.StdEncoding.DecodeString(sdp64)
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(sdpB),
}
if err = mediaEngine.PopulateFromSDP(offer); err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
peerConnection, err := api.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
})
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
for i, i2 := range streams {
var track *webrtc.Track
if i2.Type().IsVideo() {
2020-08-11 09:21:33 +08:00
track, err = peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, i2.Type().String()), rand.Uint32(), "video", Label)
2020-08-09 01:24:23 +08:00
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
} else if i2.Type().IsAudio() {
2020-08-11 09:21:33 +08:00
track, err = peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, i2.Type().String()), rand.Uint32(), "audio", Label)
2020-08-09 01:24:23 +08:00
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
}
_, err = peerConnection.AddTransceiverFromTrack(track,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},
)
if err != nil {
return "", err
}
2020-08-09 01:29:34 +08:00
_, err = peerConnection.AddTrack(track)
2020-08-09 01:24:23 +08:00
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
2020-08-11 09:21:33 +08:00
element.streams[int8(i)] = &Stream{track: track, codec: i2}
2020-08-09 01:24:23 +08:00
}
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
2020-08-11 09:21:33 +08:00
element.status = connectionState
if connectionState == webrtc.ICEConnectionStateDisconnected {
element.ps <- true
2020-08-09 01:24:23 +08:00
}
})
2020-08-11 09:21:33 +08:00
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) {
element.pt.Reset(5 * time.Second)
})
})
2020-08-09 01:24:23 +08:00
if err = peerConnection.SetRemoteDescription(offer); err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
if err = peerConnection.SetLocalDescription(answer); err != nil {
2020-08-09 01:29:34 +08:00
return "", err
2020-08-09 01:24:23 +08:00
}
2020-08-11 09:21:33 +08:00
element.pc = peerConnection
2020-08-09 01:29:34 +08:00
return base64.StdEncoding.EncodeToString([]byte(answer.SDP)), nil
2020-08-09 01:24:23 +08:00
}
2020-08-09 01:29:34 +08:00
2020-08-11 09:21:33 +08:00
func (element *Muxer) WritePacket(pkt av.Packet) (err error) {
if element.stop {
return ErrorClientOffline
}
if element.status != webrtc.ICEConnectionStateConnected {
return nil
}
if tmp, ok := element.streams[pkt.Idx]; ok {
2020-08-09 01:24:23 +08:00
switch tmp.codec.Type() {
case av.H264:
codec := tmp.codec.(h264parser.CodecData)
if pkt.IsKeyFrame {
2020-08-11 09:21:33 +08:00
pkt.Data = append([]byte{0, 0, 0, 1}, bytes.Join([][]byte{codec.SPS(), codec.PPS(), pkt.Data[4:]}, []byte{0, 0, 0, 1})...)
2020-08-09 01:24:23 +08:00
} else {
pkt.Data = pkt.Data[4:]
}
return tmp.track.WriteSample(media.Sample{Data: pkt.Data, Samples: 90000})
default:
2020-08-11 09:21:33 +08:00
return ErrorCodecNotSupported
}
}
return ErrorNotFound
}
func (element *Muxer) WaitCloser() {
select {
case <-element.ps:
element.stop = true
element.Close()
case <-element.pt.C:
element.stop = true
element.Close()
}
}
func (element *Muxer) Close() error {
if element.pc != nil {
err := element.pc.Close()
if err != nil {
return err
2020-08-09 01:24:23 +08:00
}
}
2020-08-11 09:21:33 +08:00
return nil
2020-08-09 01:24:23 +08:00
}
2020-08-09 01:29:34 +08:00
2020-08-09 01:24:23 +08:00
func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 {
for _, codec := range m.GetCodecsByKind(codecType) {
if codec.Name == codecName {
return codec.PayloadType
}
}
panic(fmt.Sprintf("Remote peer does not support %s", codecName))
}