Files
plugin-webrtc/publisher.go
langhuihui 538e4e7033 fix: WHIP
2023-11-17 17:30:15 +08:00

112 lines
2.8 KiB
Go

package webrtc
import (
"sync/atomic"
"time"
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v4"
"go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/track"
)
type WebRTCPublisher struct {
Publisher
WebRTCIO
audioTrack atomic.Pointer[TrackRemote]
videoTrack atomic.Pointer[TrackRemote]
}
func (puber *WebRTCPublisher) OnEvent(event any) {
switch event.(type) {
case IPublisher:
puber.OnTrack(puber.onTrack)
}
puber.Publisher.OnEvent(event)
}
func (puber *WebRTCPublisher) onTrack(track *TrackRemote, receiver *RTPReceiver) {
puber.Info("onTrack", zap.String("kind", track.Kind().String()), zap.Uint8("payloadType", uint8(track.Codec().PayloadType)))
if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio {
puber.audioTrack.Store(track)
if puber.AudioTrack == nil {
switch codec.PayloadType {
case 111:
puber.AudioTrack = NewOpus(puber.Stream)
case 8:
puber.AudioTrack = NewG711(puber.Stream, true)
case 0:
puber.AudioTrack = NewG711(puber.Stream, false)
default:
puber.AudioTrack = nil
puber.Config.PubAudio = false
return
}
}
for {
if puber.audioTrack.Load() != track {
return
}
rtpItem := puber.AudioTrack.GetRTPFromPool()
if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
puber.AudioTrack.WriteRTP(rtpItem)
} else {
puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
rtpItem.Recycle()
return
}
}
} else {
puber.videoTrack.Store(track)
if puber.VideoTrack == nil {
switch codec.PayloadType {
case 45:
puber.VideoTrack = NewAV1(puber.Stream, byte(codec.PayloadType))
default:
puber.VideoTrack = NewH264(puber.Stream, byte(codec.PayloadType))
}
}
go puber.writeRTCP(track)
for {
if puber.videoTrack.Load() != track {
return
}
rtpItem := puber.VideoTrack.GetRTPFromPool()
if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
if rtpItem.Value.Extension {
for _, id := range rtpItem.Value.GetExtensionIDs() {
puber.Debug("extension", zap.Uint8("id", id), zap.Binary("value", rtpItem.Value.GetExtension(id)))
}
}
puber.VideoTrack.WriteRTP(rtpItem)
} else {
puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
rtpItem.Recycle()
return
}
}
}
}
func (puber *WebRTCPublisher) writeRTCP(track *TrackRemote) {
ticker := time.NewTicker(webrtcConfig.PLI)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if puber.videoTrack.Load() != track {
return
}
if rtcpErr := puber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
puber.Error("writeRTCP", zap.Error(rtcpErr))
return
}
case <-puber.Done():
return
}
}
}