fix: single pc subscribe

This commit is contained in:
langhuihui
2023-07-10 18:58:52 +08:00
parent aa42ec6284
commit 261851f7f7
2 changed files with 174 additions and 130 deletions

View File

@@ -5,6 +5,7 @@ import (
. "github.com/pion/webrtc/v3" . "github.com/pion/webrtc/v3"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec"
) )
type Signal struct { type Signal struct {
@@ -15,6 +16,34 @@ type Signal struct {
StreamPath string `json:"streamPath"` StreamPath string `json:"streamPath"`
} }
type SignalStreamPath struct {
Type string `json:"type"`
StreamPath string `json:"streamPath"`
}
func NewRemoveSingal(streamPath string) string {
s := SignalStreamPath{
Type: "remove",
StreamPath: streamPath,
}
b, _ := json.Marshal(s)
return string(b)
}
type SignalSDP struct {
Type string `json:"type"`
SDP string `json:"sdp"`
}
func NewAnswerSingal(sdp string) string {
s := SignalSDP{
Type: "answer",
SDP: sdp,
}
b, _ := json.Marshal(s)
return string(b)
}
type WebRTCBatcher struct { type WebRTCBatcher struct {
PageSize int PageSize int
PageNum int PageNum int
@@ -55,9 +84,22 @@ func (suber *WebRTCBatcher) Start() (err error) {
return return
} }
func (suber *WebRTCBatcher) RemoveSubscribe(streamPath string) {
suber.signalChannel.SendText(NewRemoveSingal(streamPath))
}
func (suber *WebRTCBatcher) Answer() (err error) {
var answer string
if answer, err = suber.GetAnswer(); err == nil {
err = suber.signalChannel.SendText(NewAnswerSingal(answer))
}
if err != nil {
WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
}
return
}
func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
var s Signal var s Signal
var removeMap = map[string]string{"type": "remove", "streamPath": ""}
// var offer SessionDescription // var offer SessionDescription
if err := json.Unmarshal(msg.Data, &s); err != nil { if err := json.Unmarshal(msg.Data, &s); err != nil {
WebRTCPlugin.Error("Signal", zap.Error(err)) WebRTCPlugin.Error("Signal", zap.Error(err))
@@ -74,36 +116,32 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil { if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil {
suber.subscribers = append(suber.subscribers, sub) suber.subscribers = append(suber.subscribers, sub)
go func(streamPath string) { go func(streamPath string) {
sub.PlayRTP() if sub.DC == nil {
if sub.audio.RTPSender != nil { sub.PlayRTP()
suber.RemoveTrack(sub.audio.RTPSender ) if sub.audio.RTPSender != nil {
suber.RemoveTrack(sub.audio.RTPSender)
}
if sub.video.RTPSender != nil {
suber.RemoveTrack(sub.video.RTPSender)
}
suber.RemoveSubscribe(streamPath)
} else {
sub.DC.OnOpen(func() {
sub.DC.Send(codec.FLVHeader)
go func() {
sub.PlayFLV()
sub.DC.Close()
suber.RemoveSubscribe(streamPath)
}()
})
} }
if sub.video.RTPSender != nil {
suber.RemoveTrack(sub.video.RTPSender)
}
if sub.DC != nil {
sub.DC.Close()
}
removeMap["streamPath"] = streamPath
b, _ := json.Marshal(removeMap)
suber.signalChannel.SendText(string(b))
}(streamPath) }(streamPath)
} else { } else {
removeMap["streamPath"] = streamPath WebRTCPlugin.Error("subscribe", zap.String("streamPath", streamPath), zap.Error(err))
b, _ := json.Marshal(removeMap) suber.RemoveSubscribe(streamPath)
suber.signalChannel.SendText(string(b))
} }
} }
var answer string err = suber.Answer()
if answer, err = suber.GetAnswer(); err == nil {
b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer})
err = suber.signalChannel.SendText(string(b))
}
if err != nil {
WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
return
}
// if offer, err = suber.CreateOffer(nil); err == nil { // if offer, err = suber.CreateOffer(nil); err == nil {
// b, _ := json.Marshal(offer) // b, _ := json.Marshal(offer)
// err = suber.signalChannel.SendText(string(b)) // err = suber.signalChannel.SendText(string(b))
@@ -114,20 +152,13 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
return return
} }
var answer string if err = suber.Answer(); err == nil {
if answer, err = suber.GetAnswer(); err == nil { switch s.Type {
b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer}) case "publish":
err = suber.signalChannel.SendText(string(b)) WebRTCPlugin.Publish(s.StreamPath, suber)
} case "unpublish":
if err != nil { suber.Stop()
WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) }
return
}
switch s.Type {
case "publish":
WebRTCPlugin.Publish(s.StreamPath, suber)
case "unpublish":
suber.Stop()
} }
case "answer": case "answer":
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil { if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil {

View File

@@ -43,23 +43,95 @@ func (suber *WebRTCSubscriber) createDataChannel() {
// suber.flvHeadCache = make([]byte, 15) // suber.flvHeadCache = make([]byte, 15)
} }
// func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) { // func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) {
// suber.flvHeadCache[0] = t // suber.flvHeadCache[0] = t
// frame := reader.Frame // frame := reader.Frame
// dataSize := uint32(frame.AVCC.ByteLength) // dataSize := uint32(frame.AVCC.ByteLength)
// result := net.Buffers{suber.flvHeadCache[:11]} // result := net.Buffers{suber.flvHeadCache[:11]}
// result = append(result, frame.AVCC.ToBuffers()...) // result = append(result, frame.AVCC.ToBuffers()...)
// ts := reader.AbsTime // ts := reader.AbsTime
// util.PutBE(suber.flvHeadCache[1:4], dataSize) // util.PutBE(suber.flvHeadCache[1:4], dataSize)
// util.PutBE(suber.flvHeadCache[4:7], ts) // util.PutBE(suber.flvHeadCache[4:7], ts)
// suber.flvHeadCache[7] = byte(ts >> 24) // suber.flvHeadCache[7] = byte(ts >> 24)
// result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11)) // result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11))
// for _, data := range util.SplitBuffers(result, 65535) { // for _, data := range util.SplitBuffers(result, 65535) {
// for _, d := range data { // for _, d := range data {
// suber.queueDCData(d) // suber.queueDCData(d)
// } // }
// } // }
// } // }
func (suber *WebRTCSubscriber) OnSubscribe() {
vm := make(map[codec.VideoCodecID]*track.Video)
am := make(map[codec.AudioCodecID]*track.Audio)
for _, track := range suber.videoTracks {
vm[track.CodecID] = track
}
for _, track := range suber.audioTracks {
am[track.CodecID] = track
}
if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) {
video := vm[codec.CodecID_H264]
if video != nil {
suber.Subscriber.AddTrack(video)
pli := fmt.Sprintf("%x", video.SPS[1:4])
// pli := "42001f"
if !strings.Contains(suber.SDP, pli) {
list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
if len(list) > 0 {
pli = list[0][1]
}
}
suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path)
if suber.video.TrackLocalStaticRTP != nil {
suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP)
go func() {
rtcpBuf := make([]byte, 1500)
for {
if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil {
return
} else {
if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
for _, pp := range p {
switch pp.(type) {
case *rtcp.PictureLossIndication:
// fmt.Println("PictureLossIndication")
}
}
}
}
}
}()
}
}
var audio *track.Audio
audioMimeType := MimeTypePCMA
if am[codec.CodecID_PCMA] != nil {
audio = am[codec.CodecID_PCMA]
} else if am[codec.CodecID_PCMU] != nil {
audioMimeType = MimeTypePCMU
audio = am[codec.CodecID_PCMU]
} else {
}
if audio != nil {
suber.Subscriber.AddTrack(audio)
suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path)
if suber.audio.TrackLocalStaticRTP != nil {
suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
}
}
} else {
suber.createDataChannel()
if len(suber.videoTracks) > 0 {
suber.Subscriber.AddTrack(suber.videoTracks[0])
}
if len(suber.audioTracks) > 0 {
suber.Subscriber.AddTrack(suber.audioTracks[0])
}
}
}
func (suber *WebRTCSubscriber) OnEvent(event any) { func (suber *WebRTCSubscriber) OnEvent(event any) {
switch v := event.(type) { switch v := event.(type) {
@@ -124,90 +196,29 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
suber.queueDCData(data...) suber.queueDCData(data...)
} }
case ISubscriber: case ISubscriber:
vm := make(map[codec.VideoCodecID]*track.Video) suber.OnSubscribe()
am := make(map[codec.AudioCodecID]*track.Audio) if suber.DC != nil {
for _, track := range suber.videoTracks {
vm[track.CodecID] = track
}
for _, track := range suber.audioTracks {
am[track.CodecID] = track
}
if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) {
video := vm[codec.CodecID_H264]
if video != nil {
suber.Subscriber.AddTrack(video)
pli := fmt.Sprintf("%x", video.SPS[1:4])
// pli := "42001f"
if !strings.Contains(suber.SDP, pli) {
list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
if len(list) > 0 {
pli = list[0][1]
}
}
suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path)
if suber.video.TrackLocalStaticRTP != nil {
suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP)
go func() {
rtcpBuf := make([]byte, 1500)
for {
if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil {
return
} else {
if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
for _, pp := range p {
switch pp.(type) {
case *rtcp.PictureLossIndication:
// fmt.Println("PictureLossIndication")
}
}
}
}
}
}()
}
}
var audio *track.Audio
audioMimeType := MimeTypePCMA
if am[codec.CodecID_PCMA] != nil {
audio = am[codec.CodecID_PCMA]
} else if am[codec.CodecID_PCMU] != nil {
audioMimeType = MimeTypePCMU
audio = am[codec.CodecID_PCMU]
} else {
}
if audio != nil {
suber.Subscriber.AddTrack(audio)
suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path)
if suber.audio.TrackLocalStaticRTP != nil {
suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
}
}
} else {
if vm[codec.CodecID_H265] != nil {
suber.Subscriber.AddTrack(vm[codec.CodecID_H265])
}
if am[codec.CodecID_AAC] != nil {
suber.Subscriber.AddTrack(am[codec.CodecID_AAC])
}
suber.DC.OnOpen(func() { suber.DC.OnOpen(func() {
suber.DC.Send(codec.FLVHeader) suber.DC.Send(codec.FLVHeader)
go suber.PlayFLV() go func() {
suber.PlayFLV()
suber.DC.Close()
suber.PeerConnection.Close()
}()
}) })
} }
suber.OnConnectionStateChange(func(pcs PeerConnectionState) { suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
suber.Info("Connection State has changed:" + pcs.String()) suber.Info("Connection State has changed:" + pcs.String())
switch pcs { switch pcs {
case PeerConnectionStateConnected: case PeerConnectionStateConnected:
if suber.DC != nil { if suber.DC == nil {
// go suber.PlayFLV() go func() {
} else { suber.PlayRTP()
go suber.PlayRTP() suber.PeerConnection.Close()
}()
} }
case PeerConnectionStateDisconnected, PeerConnectionStateFailed: case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
suber.Stop() suber.Stop()
suber.PeerConnection.Close()
} }
}) })
default: default:
@@ -217,11 +228,13 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
type WebRTCBatchSubscriber struct { type WebRTCBatchSubscriber struct {
WebRTCSubscriber WebRTCSubscriber
OnPlayDone func()
} }
func (suber *WebRTCBatchSubscriber) OnEvent(event any) { func (suber *WebRTCBatchSubscriber) OnEvent(event any) {
switch event.(type) { switch event.(type) {
case ISubscriber: case ISubscriber:
suber.OnSubscribe()
default: default:
suber.WebRTCSubscriber.OnEvent(event) suber.WebRTCSubscriber.OnEvent(event)
} }