package rtc import ( "fmt" "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" "io" "time" ) type Sink struct { stream.BaseSink offer string answer string peer *webrtc.PeerConnection tracks []*webrtc.TrackLocalStaticSample state webrtc.ICEConnectionState cb func(sdp string) } func (s *Sink) StartStreaming(transStream stream.TransStream) error { if s.peer != nil { return nil } // 创建PeerConnection var remoteTrack *webrtc.TrackLocalStaticSample s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackSize()) connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{}) connection.OnICECandidate(func(candidate *webrtc.ICECandidate) { }) tracks := transStream.GetTracks() for index, track := range tracks { var id string codecId := track.Stream.CodecID mimeType, ok := SupportedCodecs[codecId] if !ok { log.Sugar.Errorf("unsupported codec: %s", codecId) continue } if utils.AVMediaTypeAudio == track.Stream.MediaType { id = "audio" } else { id = "video" } remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType.(string)}, id, "pion") if err != nil { return err } transceiver, err := connection.AddTransceiverFromTrack(remoteTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}) if err != nil { return err } // pion需要在外部读取rtcp包,才会处理nack包丢包重传. https://github.com/pion/interceptor/blob/e1874104865b23ba465ecc505f959daf156cc2a5/pkg/nack/responder_interceptor.go#L82 go func() { for { _, _, err := transceiver.Sender().ReadRTCP() if err == nil { continue } else if io.EOF == err || io.ErrClosedPipe == err { break } else { println(err.Error()) } } }() s.tracks[index] = remoteTrack } if len(connection.GetTransceivers()) == 0 { return fmt.Errorf("no track added") } else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: s.offer}); err != nil { return err } complete := webrtc.GatheringCompletePromise(connection) answer, err := connection.CreateAnswer(nil) if err != nil { return err } else if err = connection.SetLocalDescription(answer); err != nil { return err } // offer的sdp, 应答给http请求 if s.cb != nil { log.Sugar.Infof("answer: %s", connection.LocalDescription().SDP) s.cb(connection.LocalDescription().SDP) } <-complete connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { s.state = state log.Sugar.Infof("ice state: %v sink: %s source: %s", state.String(), stream.SinkID2String(s.GetID()), s.SourceID) if state > webrtc.ICEConnectionStateDisconnected { log.Sugar.Errorf("webrtc peer断开连接 sink: %s source: %s", stream.SinkID2String(s.GetID()), s.SourceID) s.Close() } }) s.peer = connection return nil } func (s *Sink) Close() { s.BaseSink.Close() if s.peer != nil { s.peer.Close() s.peer = nil } } func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { if s.tracks[index] == nil { return nil } for _, bytes := range data { err := s.tracks[index].WriteSample(media.Sample{ Data: bytes.Get(), Duration: time.Duration(ts) * time.Millisecond, }) if err != nil { return err } } return nil } func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink { return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb} }