diff --git a/main.go b/main.go index cdeb636..1231fd7 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ func init() { stream.SetDefaultConfig(config) stream.AppConfig = *config stream.InitHookUrl() + rtc.InitConfig() //初始化日志 log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress) diff --git a/rtc.html b/rtc.html index 67d6621..6aafeb1 100644 --- a/rtc.html +++ b/rtc.html @@ -32,7 +32,7 @@
- +
diff --git a/rtc/rtc_stream.go b/rtc/rtc_stream.go index 858e1ba..5e0c95e 100644 --- a/rtc/rtc_stream.go +++ b/rtc/rtc_stream.go @@ -1,35 +1,33 @@ package rtc import ( + "fmt" "github.com/lkmio/avformat/utils" + "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" + "github.com/pion/interceptor" "github.com/pion/webrtc/v3" + "net" +) + +var ( + webrtcApi *webrtc.API ) type transStream struct { stream.BaseTransStream } -func NewTransStream() stream.TransStream { - t := &transStream{} - return t -} - -func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) { - return NewTransStream(), nil -} - func (t *transStream) Input(packet utils.AVPacket) error { - if utils.AVMediaTypeAudio == packet.MediaType() { - - } else if utils.AVMediaTypeVideo == packet.MediaType() { - - for _, iSink := range t.Sinks { - sink_ := iSink.(*sink) - if sink_.state < webrtc.ICEConnectionStateConnected { - continue - } + for _, iSink := range t.Sinks { + sink_ := iSink.(*sink) + if sink_.state < webrtc.ICEConnectionStateConnected { + continue + } + if utils.AVMediaTypeAudio == packet.MediaType() { + sink_.input(packet.Index(), packet.Data(), uint32(packet.Duration(1000))) + } else if utils.AVMediaTypeVideo == packet.MediaType() { if packet.KeyFrame() { extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData() sink_.input(packet.Index(), extra, 0) @@ -47,34 +45,56 @@ func (t *transStream) AddSink(sink_ stream.Sink) error { var videoTrack *webrtc.TrackLocalStaticSample rtcSink := sink_.(*sink) rtcSink.setTrackCount(len(t.Tracks)) - connection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) - + connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{}) connection.OnICECandidate(func(candidate *webrtc.ICECandidate) { }) for index, track := range t.Tracks { - if utils.AVCodecIdH264 != track.CodecId() { + var mimeType string + var id string + if utils.AVCodecIdH264 == track.CodecId() { + mimeType = webrtc.MimeTypeH264 + } else if utils.AVCodecIdH265 == track.CodecId() { + mimeType = webrtc.MimeTypeH265 + } else if utils.AVCodecIdAV1 == track.CodecId() { + mimeType = webrtc.MimeTypeAV1 + } else if utils.AVCodecIdVP8 == track.CodecId() { + mimeType = webrtc.MimeTypeVP8 + } else if utils.AVCodecIdVP9 == track.CodecId() { + mimeType = webrtc.MimeTypeVP9 + } else if utils.AVCodecIdOPUS == track.CodecId() { + mimeType = webrtc.MimeTypeOpus + } else if utils.AVCodecIdPCMALAW == track.CodecId() { + mimeType = webrtc.MimeTypePCMA + } else if utils.AVCodecIdPCMMULAW == track.CodecId() { + mimeType = webrtc.MimeTypePCMU + } else { + log.Sugar.Errorf("codec %d not compatible with webrtc", track.CodecId()) continue } - videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") + if utils.AVMediaTypeAudio == track.Type() { + id = "audio" + } else { + id = "video" + } + + videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion") if err != nil { panic(err) - } - - if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil { + } else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil { return err - } - - if _, err = connection.AddTrack(videoTrack); err != nil { + } else if _, err = connection.AddTrack(videoTrack); err != nil { return err } rtcSink.addTrack(index, videoTrack) } - if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil { + if len(connection.GetTransceivers()) == 0 { + return fmt.Errorf("no track added") + } else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil { return err } @@ -85,8 +105,8 @@ func (t *transStream) AddSink(sink_ stream.Sink) error { } else if err = connection.SetLocalDescription(answer); err != nil { return err } - <-complete + <-complete connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { rtcSink.state = state if webrtc.ICEConnectionStateDisconnected > state { @@ -102,3 +122,44 @@ func (t *transStream) AddSink(sink_ stream.Sink) error { func (t *transStream) WriteHeader() error { return nil } + +func NewTransStream() stream.TransStream { + t := &transStream{} + return t +} + +func InitConfig() { + setting := webrtc.SettingEngine{} + var ips []string + ips = append(ips, stream.AppConfig.PublicIP) + + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.ParseIP(stream.AppConfig.ListenIP), + Port: stream.AppConfig.WebRtc.Port, + }) + + if err != nil { + panic(err) + } + + //设置公网ip和监听端口 + setting.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener)) + setting.SetNAT1To1IPs(ips, webrtc.ICECandidateTypeHost) + + //注册音视频编码器 + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + panic(err) + } + + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + panic(err) + } + + webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(setting)) +} + +func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) { + return NewTransStream(), nil +} diff --git a/stream/config.go b/stream/config.go index 34f9d74..70660a0 100644 --- a/stream/config.go +++ b/stream/config.go @@ -69,6 +69,11 @@ type GB28181Config struct { Port []int `json:"port"` } +type WebRtcConfig struct { + TransportConfig + Port int `json:"port"` +} + func (g TransportConfig) IsEnableTCP() bool { return strings.Contains(g.Transport, "TCP") } @@ -244,6 +249,7 @@ type AppConfig_ struct { JT1078 JT1078Config Rtsp RtspConfig GB28181 GB28181Config + WebRtc WebRtcConfig Hook HookConfig Record RecordConfig diff --git a/stream/hook_sink.go b/stream/hook_sink.go index 62bba15..fe7b13e 100644 --- a/stream/hook_sink.go +++ b/stream/hook_sink.go @@ -28,7 +28,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) { sink.Lock() defer sink.UnLock() - if SessionStateClose == sink.State() { + if SessionStateClosed == sink.State() { log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.Id()) return response, utils.HookStateFailure } else { diff --git a/stream/sink.go b/stream/sink.go index 8a41626..8a70ca3 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -72,7 +72,11 @@ type Sink interface { func GenerateSinkId(addr net.Addr) SinkId { network := addr.Network() if "tcp" == network { - id := uint64(binary.BigEndian.Uint32(addr.(*net.TCPAddr).IP.To4())) + to4 := addr.(*net.TCPAddr).IP.To4() + if to4 == nil { + to4 = make([]byte, 4) + } + id := uint64(binary.BigEndian.Uint32(to4)) id <<= 32 id |= uint64(addr.(*net.TCPAddr).Port << 16) @@ -186,7 +190,9 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID { // 拉流断开连接,不需要考虑线程安全 // 踢流走source管道删除,并且关闭Conn func (s *BaseSink) Close() { - utils.Assert(SessionStateClose != s.State_) + if SessionStateClosed != s.State_ { + return + } if s.Conn != nil { s.Conn.Close() @@ -202,12 +208,12 @@ func (s *BaseSink) Close() { { s.Lock() defer s.UnLock() - if s.State_ == SessionStateClose { + if s.State_ == SessionStateClosed { return } state = s.State_ - s.State_ = SessionStateClose + s.State_ = SessionStateClosed } if state == SessionStateTransferring { diff --git a/stream/source.go b/stream/source.go index 2b35c33..d1253c6 100644 --- a/stream/source.go +++ b/stream/source.go @@ -50,7 +50,7 @@ const ( SessionStateHandshakeDone = SessionState(4) //握手完成 SessionStateWait = SessionState(5) //位于等待队列中 SessionStateTransferring = SessionState(6) //推拉流中 - SessionStateClose = SessionState(7) //关闭状态 + SessionStateClosed = SessionState(7) //关闭状态 ) // Source 父类Source负责, 除解析流以外的所有事情 @@ -407,7 +407,7 @@ func (s *PublishSource) AddSink(sink Sink) bool { sink.Lock() defer sink.UnLock() - if SessionStateClose == sink.State() { + if SessionStateClosed == sink.State() { log.Sugar.Warnf("AddSink失败, sink已经断开链接 %s", sink.PrintInfo()) } else { transStream.AddSink(sink) @@ -519,7 +519,7 @@ func (s *PublishSource) doClose() { sink.Lock() defer sink.UnLock() - if SessionStateClose == sink.State() { + if SessionStateClosed == sink.State() { log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo()) } else { sink.SetState(SessionStateWait) @@ -527,7 +527,7 @@ func (s *PublishSource) doClose() { } } - if SessionStateClose != sink.State() { + if SessionStateClosed != sink.State() { sink.Flush() } })