From 261851f7f76938b0ba4d4be7645d322e6f0b8d95 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 10 Jul 2023 18:58:52 +0800 Subject: [PATCH] fix: single pc subscribe --- batcher.go | 111 ++++++++++++++++++----------- subscriber.go | 193 +++++++++++++++++++++++++++----------------------- 2 files changed, 174 insertions(+), 130 deletions(-) diff --git a/batcher.go b/batcher.go index d7239ca..75fcf0a 100644 --- a/batcher.go +++ b/batcher.go @@ -5,6 +5,7 @@ import ( . "github.com/pion/webrtc/v3" "go.uber.org/zap" + "m7s.live/engine/v4/codec" ) type Signal struct { @@ -15,6 +16,34 @@ type Signal struct { StreamPath string `json:"streamPath"` } +type SignalStreamPath struct { + Type string `json:"type"` + StreamPath string `json:"streamPath"` +} + +func NewRemoveSingal(streamPath string) string { + s := SignalStreamPath{ + Type: "remove", + StreamPath: streamPath, + } + b, _ := json.Marshal(s) + return string(b) +} + +type SignalSDP struct { + Type string `json:"type"` + SDP string `json:"sdp"` +} + +func NewAnswerSingal(sdp string) string { + s := SignalSDP{ + Type: "answer", + SDP: sdp, + } + b, _ := json.Marshal(s) + return string(b) +} + type WebRTCBatcher struct { PageSize int PageNum int @@ -55,9 +84,22 @@ func (suber *WebRTCBatcher) Start() (err error) { return } +func (suber *WebRTCBatcher) RemoveSubscribe(streamPath string) { + suber.signalChannel.SendText(NewRemoveSingal(streamPath)) +} +func (suber *WebRTCBatcher) Answer() (err error) { + var answer string + if answer, err = suber.GetAnswer(); err == nil { + err = suber.signalChannel.SendText(NewAnswerSingal(answer)) + } + if err != nil { + WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) + } + return +} + func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { var s Signal - var removeMap = map[string]string{"type": "remove", "streamPath": ""} // var offer SessionDescription if err := json.Unmarshal(msg.Data, &s); err != nil { WebRTCPlugin.Error("Signal", zap.Error(err)) @@ -74,36 +116,32 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil { suber.subscribers = append(suber.subscribers, sub) go func(streamPath string) { - sub.PlayRTP() - if sub.audio.RTPSender != nil { - suber.RemoveTrack(sub.audio.RTPSender ) + if sub.DC == nil { + sub.PlayRTP() + if sub.audio.RTPSender != nil { + suber.RemoveTrack(sub.audio.RTPSender) + } + if sub.video.RTPSender != nil { + suber.RemoveTrack(sub.video.RTPSender) + } + suber.RemoveSubscribe(streamPath) + } else { + sub.DC.OnOpen(func() { + sub.DC.Send(codec.FLVHeader) + go func() { + sub.PlayFLV() + sub.DC.Close() + suber.RemoveSubscribe(streamPath) + }() + }) } - if sub.video.RTPSender != nil { - suber.RemoveTrack(sub.video.RTPSender) - } - if sub.DC != nil { - sub.DC.Close() - } - removeMap["streamPath"] = streamPath - b, _ := json.Marshal(removeMap) - suber.signalChannel.SendText(string(b)) }(streamPath) } else { - removeMap["streamPath"] = streamPath - b, _ := json.Marshal(removeMap) - suber.signalChannel.SendText(string(b)) + WebRTCPlugin.Error("subscribe", zap.String("streamPath", streamPath), zap.Error(err)) + suber.RemoveSubscribe(streamPath) } } - var answer string - if answer, err = suber.GetAnswer(); err == nil { - b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer}) - err = suber.signalChannel.SendText(string(b)) - } - if err != nil { - WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) - return - } - + err = suber.Answer() // if offer, err = suber.CreateOffer(nil); err == nil { // b, _ := json.Marshal(offer) // err = suber.signalChannel.SendText(string(b)) @@ -114,20 +152,13 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) return } - var answer string - if answer, err = suber.GetAnswer(); err == nil { - b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer}) - err = suber.signalChannel.SendText(string(b)) - } - if err != nil { - WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) - return - } - switch s.Type { - case "publish": - WebRTCPlugin.Publish(s.StreamPath, suber) - case "unpublish": - suber.Stop() + if err = suber.Answer(); err == nil { + switch s.Type { + case "publish": + WebRTCPlugin.Publish(s.StreamPath, suber) + case "unpublish": + suber.Stop() + } } case "answer": if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil { diff --git a/subscriber.go b/subscriber.go index 7c7d089..555c8b9 100644 --- a/subscriber.go +++ b/subscriber.go @@ -43,23 +43,95 @@ func (suber *WebRTCSubscriber) createDataChannel() { // suber.flvHeadCache = make([]byte, 15) } -// func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) { -// suber.flvHeadCache[0] = t -// frame := reader.Frame -// dataSize := uint32(frame.AVCC.ByteLength) -// result := net.Buffers{suber.flvHeadCache[:11]} -// result = append(result, frame.AVCC.ToBuffers()...) -// ts := reader.AbsTime -// util.PutBE(suber.flvHeadCache[1:4], dataSize) -// util.PutBE(suber.flvHeadCache[4:7], ts) -// suber.flvHeadCache[7] = byte(ts >> 24) -// result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11)) -// for _, data := range util.SplitBuffers(result, 65535) { -// for _, d := range data { -// suber.queueDCData(d) -// } -// } -// } +// func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) { +// suber.flvHeadCache[0] = t +// frame := reader.Frame +// dataSize := uint32(frame.AVCC.ByteLength) +// result := net.Buffers{suber.flvHeadCache[:11]} +// result = append(result, frame.AVCC.ToBuffers()...) +// ts := reader.AbsTime +// util.PutBE(suber.flvHeadCache[1:4], dataSize) +// util.PutBE(suber.flvHeadCache[4:7], ts) +// suber.flvHeadCache[7] = byte(ts >> 24) +// result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11)) +// for _, data := range util.SplitBuffers(result, 65535) { +// for _, d := range data { +// suber.queueDCData(d) +// } +// } +// } +func (suber *WebRTCSubscriber) OnSubscribe() { + vm := make(map[codec.VideoCodecID]*track.Video) + am := make(map[codec.AudioCodecID]*track.Audio) + for _, track := range suber.videoTracks { + vm[track.CodecID] = track + } + for _, track := range suber.audioTracks { + am[track.CodecID] = track + } + if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) { + video := vm[codec.CodecID_H264] + if video != nil { + suber.Subscriber.AddTrack(video) + pli := fmt.Sprintf("%x", video.SPS[1:4]) + // pli := "42001f" + if !strings.Contains(suber.SDP, pli) { + list := reg_level.FindAllStringSubmatch(suber.SDP, -1) + if len(list) > 0 { + pli = list[0][1] + } + } + suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path) + if suber.video.TrackLocalStaticRTP != nil { + suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP) + go func() { + rtcpBuf := make([]byte, 1500) + for { + if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil { + + return + } else { + if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil { + for _, pp := range p { + switch pp.(type) { + case *rtcp.PictureLossIndication: + // fmt.Println("PictureLossIndication") + } + } + } + } + } + }() + } + } + var audio *track.Audio + audioMimeType := MimeTypePCMA + if am[codec.CodecID_PCMA] != nil { + audio = am[codec.CodecID_PCMA] + } else if am[codec.CodecID_PCMU] != nil { + audioMimeType = MimeTypePCMU + audio = am[codec.CodecID_PCMU] + } else { + + } + if audio != nil { + suber.Subscriber.AddTrack(audio) + suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path) + if suber.audio.TrackLocalStaticRTP != nil { + suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP) + } + } + } else { + suber.createDataChannel() + if len(suber.videoTracks) > 0 { + suber.Subscriber.AddTrack(suber.videoTracks[0]) + } + if len(suber.audioTracks) > 0 { + suber.Subscriber.AddTrack(suber.audioTracks[0]) + } + + } +} func (suber *WebRTCSubscriber) OnEvent(event any) { switch v := event.(type) { @@ -124,90 +196,29 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { suber.queueDCData(data...) } case ISubscriber: - vm := make(map[codec.VideoCodecID]*track.Video) - am := make(map[codec.AudioCodecID]*track.Audio) - for _, track := range suber.videoTracks { - vm[track.CodecID] = track - } - for _, track := range suber.audioTracks { - am[track.CodecID] = track - } - if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) { - video := vm[codec.CodecID_H264] - if video != nil { - suber.Subscriber.AddTrack(video) - pli := fmt.Sprintf("%x", video.SPS[1:4]) - // pli := "42001f" - if !strings.Contains(suber.SDP, pli) { - list := reg_level.FindAllStringSubmatch(suber.SDP, -1) - if len(list) > 0 { - pli = list[0][1] - } - } - suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path) - if suber.video.TrackLocalStaticRTP != nil { - suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP) - go func() { - rtcpBuf := make([]byte, 1500) - for { - if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil { - - return - } else { - if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil { - for _, pp := range p { - switch pp.(type) { - case *rtcp.PictureLossIndication: - // fmt.Println("PictureLossIndication") - } - } - } - } - } - }() - } - } - var audio *track.Audio - audioMimeType := MimeTypePCMA - if am[codec.CodecID_PCMA] != nil { - audio = am[codec.CodecID_PCMA] - } else if am[codec.CodecID_PCMU] != nil { - audioMimeType = MimeTypePCMU - audio = am[codec.CodecID_PCMU] - } else { - - } - if audio != nil { - suber.Subscriber.AddTrack(audio) - suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path) - if suber.audio.TrackLocalStaticRTP != nil { - suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP) - } - } - } else { - if vm[codec.CodecID_H265] != nil { - suber.Subscriber.AddTrack(vm[codec.CodecID_H265]) - } - if am[codec.CodecID_AAC] != nil { - suber.Subscriber.AddTrack(am[codec.CodecID_AAC]) - } + suber.OnSubscribe() + if suber.DC != nil { suber.DC.OnOpen(func() { suber.DC.Send(codec.FLVHeader) - go suber.PlayFLV() + go func() { + suber.PlayFLV() + suber.DC.Close() + suber.PeerConnection.Close() + }() }) } suber.OnConnectionStateChange(func(pcs PeerConnectionState) { suber.Info("Connection State has changed:" + pcs.String()) switch pcs { case PeerConnectionStateConnected: - if suber.DC != nil { - // go suber.PlayFLV() - } else { - go suber.PlayRTP() + if suber.DC == nil { + go func() { + suber.PlayRTP() + suber.PeerConnection.Close() + }() } case PeerConnectionStateDisconnected, PeerConnectionStateFailed: suber.Stop() - suber.PeerConnection.Close() } }) default: @@ -217,11 +228,13 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { type WebRTCBatchSubscriber struct { WebRTCSubscriber + OnPlayDone func() } func (suber *WebRTCBatchSubscriber) OnEvent(event any) { switch event.(type) { case ISubscriber: + suber.OnSubscribe() default: suber.WebRTCSubscriber.OnEvent(event) }