mirror of
https://github.com/lkmio/lkm.git
synced 2025-10-04 23:02:43 +08:00
rtc支持单端口
This commit is contained in:
1
main.go
1
main.go
@@ -35,6 +35,7 @@ func init() {
|
|||||||
stream.SetDefaultConfig(config)
|
stream.SetDefaultConfig(config)
|
||||||
stream.AppConfig = *config
|
stream.AppConfig = *config
|
||||||
stream.InitHookUrl()
|
stream.InitHookUrl()
|
||||||
|
rtc.InitConfig()
|
||||||
|
|
||||||
//初始化日志
|
//初始化日志
|
||||||
log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress)
|
log.InitLogger(zapcore.Level(stream.AppConfig.Log.Level), stream.AppConfig.Log.Name, stream.AppConfig.Log.MaxSize, stream.AppConfig.Log.MaxBackup, stream.AppConfig.Log.MaxAge, stream.AppConfig.Log.Compress)
|
||||||
|
2
rtc.html
2
rtc.html
@@ -32,7 +32,7 @@
|
|||||||
<body>
|
<body>
|
||||||
|
|
||||||
<div style="margin-top: 10px; margin-left: 10px;">
|
<div style="margin-top: 10px; margin-left: 10px;">
|
||||||
<input style="width: 100px;" id="url" type="text" value="live/rtc/_mystream"/>
|
<input style="width: 100px;" id="url" type="text" value="hls/mystream.rtc"/>
|
||||||
<button onclick="play()"> 播放</button>
|
<button onclick="play()"> 播放</button>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
@@ -1,35 +1,33 @@
|
|||||||
package rtc
|
package rtc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
|
"github.com/lkmio/lkm/log"
|
||||||
"github.com/lkmio/lkm/stream"
|
"github.com/lkmio/lkm/stream"
|
||||||
|
"github.com/pion/interceptor"
|
||||||
"github.com/pion/webrtc/v3"
|
"github.com/pion/webrtc/v3"
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
webrtcApi *webrtc.API
|
||||||
)
|
)
|
||||||
|
|
||||||
type transStream struct {
|
type transStream struct {
|
||||||
stream.BaseTransStream
|
stream.BaseTransStream
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransStream() stream.TransStream {
|
|
||||||
t := &transStream{}
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
|
|
||||||
return NewTransStream(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *transStream) Input(packet utils.AVPacket) error {
|
func (t *transStream) Input(packet utils.AVPacket) error {
|
||||||
if utils.AVMediaTypeAudio == packet.MediaType() {
|
|
||||||
|
|
||||||
} else if utils.AVMediaTypeVideo == packet.MediaType() {
|
|
||||||
|
|
||||||
for _, iSink := range t.Sinks {
|
for _, iSink := range t.Sinks {
|
||||||
sink_ := iSink.(*sink)
|
sink_ := iSink.(*sink)
|
||||||
if sink_.state < webrtc.ICEConnectionStateConnected {
|
if sink_.state < webrtc.ICEConnectionStateConnected {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if utils.AVMediaTypeAudio == packet.MediaType() {
|
||||||
|
sink_.input(packet.Index(), packet.Data(), uint32(packet.Duration(1000)))
|
||||||
|
} else if utils.AVMediaTypeVideo == packet.MediaType() {
|
||||||
if packet.KeyFrame() {
|
if packet.KeyFrame() {
|
||||||
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
|
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
|
||||||
sink_.input(packet.Index(), extra, 0)
|
sink_.input(packet.Index(), extra, 0)
|
||||||
@@ -47,34 +45,56 @@ func (t *transStream) AddSink(sink_ stream.Sink) error {
|
|||||||
var videoTrack *webrtc.TrackLocalStaticSample
|
var videoTrack *webrtc.TrackLocalStaticSample
|
||||||
rtcSink := sink_.(*sink)
|
rtcSink := sink_.(*sink)
|
||||||
rtcSink.setTrackCount(len(t.Tracks))
|
rtcSink.setTrackCount(len(t.Tracks))
|
||||||
connection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
|
connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
|
||||||
|
|
||||||
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
for index, track := range t.Tracks {
|
for index, track := range t.Tracks {
|
||||||
if utils.AVCodecIdH264 != track.CodecId() {
|
var mimeType string
|
||||||
|
var id string
|
||||||
|
if utils.AVCodecIdH264 == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeH264
|
||||||
|
} else if utils.AVCodecIdH265 == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeH265
|
||||||
|
} else if utils.AVCodecIdAV1 == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeAV1
|
||||||
|
} else if utils.AVCodecIdVP8 == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeVP8
|
||||||
|
} else if utils.AVCodecIdVP9 == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeVP9
|
||||||
|
} else if utils.AVCodecIdOPUS == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypeOpus
|
||||||
|
} else if utils.AVCodecIdPCMALAW == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypePCMA
|
||||||
|
} else if utils.AVCodecIdPCMMULAW == track.CodecId() {
|
||||||
|
mimeType = webrtc.MimeTypePCMU
|
||||||
|
} else {
|
||||||
|
log.Sugar.Errorf("codec %d not compatible with webrtc", track.CodecId())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
|
if utils.AVMediaTypeAudio == track.Type() {
|
||||||
|
id = "audio"
|
||||||
|
} else {
|
||||||
|
id = "video"
|
||||||
|
}
|
||||||
|
|
||||||
|
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
} else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
|
||||||
|
|
||||||
if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
} else if _, err = connection.AddTrack(videoTrack); err != nil {
|
||||||
|
|
||||||
if _, err = connection.AddTrack(videoTrack); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
rtcSink.addTrack(index, videoTrack)
|
rtcSink.addTrack(index, videoTrack)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil {
|
if len(connection.GetTransceivers()) == 0 {
|
||||||
|
return fmt.Errorf("no track added")
|
||||||
|
} else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,8 +105,8 @@ func (t *transStream) AddSink(sink_ stream.Sink) error {
|
|||||||
} else if err = connection.SetLocalDescription(answer); err != nil {
|
} else if err = connection.SetLocalDescription(answer); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
<-complete
|
|
||||||
|
|
||||||
|
<-complete
|
||||||
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||||
rtcSink.state = state
|
rtcSink.state = state
|
||||||
if webrtc.ICEConnectionStateDisconnected > state {
|
if webrtc.ICEConnectionStateDisconnected > state {
|
||||||
@@ -102,3 +122,44 @@ func (t *transStream) AddSink(sink_ stream.Sink) error {
|
|||||||
func (t *transStream) WriteHeader() error {
|
func (t *transStream) WriteHeader() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewTransStream() stream.TransStream {
|
||||||
|
t := &transStream{}
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitConfig() {
|
||||||
|
setting := webrtc.SettingEngine{}
|
||||||
|
var ips []string
|
||||||
|
ips = append(ips, stream.AppConfig.PublicIP)
|
||||||
|
|
||||||
|
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
|
||||||
|
IP: net.ParseIP(stream.AppConfig.ListenIP),
|
||||||
|
Port: stream.AppConfig.WebRtc.Port,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
//设置公网ip和监听端口
|
||||||
|
setting.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener))
|
||||||
|
setting.SetNAT1To1IPs(ips, webrtc.ICECandidateTypeHost)
|
||||||
|
|
||||||
|
//注册音视频编码器
|
||||||
|
m := &webrtc.MediaEngine{}
|
||||||
|
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
i := &interceptor.Registry{}
|
||||||
|
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(setting))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
|
||||||
|
return NewTransStream(), nil
|
||||||
|
}
|
||||||
|
@@ -69,6 +69,11 @@ type GB28181Config struct {
|
|||||||
Port []int `json:"port"`
|
Port []int `json:"port"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WebRtcConfig struct {
|
||||||
|
TransportConfig
|
||||||
|
Port int `json:"port"`
|
||||||
|
}
|
||||||
|
|
||||||
func (g TransportConfig) IsEnableTCP() bool {
|
func (g TransportConfig) IsEnableTCP() bool {
|
||||||
return strings.Contains(g.Transport, "TCP")
|
return strings.Contains(g.Transport, "TCP")
|
||||||
}
|
}
|
||||||
@@ -244,6 +249,7 @@ type AppConfig_ struct {
|
|||||||
JT1078 JT1078Config
|
JT1078 JT1078Config
|
||||||
Rtsp RtspConfig
|
Rtsp RtspConfig
|
||||||
GB28181 GB28181Config
|
GB28181 GB28181Config
|
||||||
|
WebRtc WebRtcConfig
|
||||||
|
|
||||||
Hook HookConfig
|
Hook HookConfig
|
||||||
Record RecordConfig
|
Record RecordConfig
|
||||||
|
@@ -28,7 +28,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
|
|||||||
sink.Lock()
|
sink.Lock()
|
||||||
defer sink.UnLock()
|
defer sink.UnLock()
|
||||||
|
|
||||||
if SessionStateClose == sink.State() {
|
if SessionStateClosed == sink.State() {
|
||||||
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.Id())
|
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.Id())
|
||||||
return response, utils.HookStateFailure
|
return response, utils.HookStateFailure
|
||||||
} else {
|
} else {
|
||||||
|
@@ -72,7 +72,11 @@ type Sink interface {
|
|||||||
func GenerateSinkId(addr net.Addr) SinkId {
|
func GenerateSinkId(addr net.Addr) SinkId {
|
||||||
network := addr.Network()
|
network := addr.Network()
|
||||||
if "tcp" == network {
|
if "tcp" == network {
|
||||||
id := uint64(binary.BigEndian.Uint32(addr.(*net.TCPAddr).IP.To4()))
|
to4 := addr.(*net.TCPAddr).IP.To4()
|
||||||
|
if to4 == nil {
|
||||||
|
to4 = make([]byte, 4)
|
||||||
|
}
|
||||||
|
id := uint64(binary.BigEndian.Uint32(to4))
|
||||||
id <<= 32
|
id <<= 32
|
||||||
id |= uint64(addr.(*net.TCPAddr).Port << 16)
|
id |= uint64(addr.(*net.TCPAddr).Port << 16)
|
||||||
|
|
||||||
@@ -186,7 +190,9 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
|
|||||||
// 拉流断开连接,不需要考虑线程安全
|
// 拉流断开连接,不需要考虑线程安全
|
||||||
// 踢流走source管道删除,并且关闭Conn
|
// 踢流走source管道删除,并且关闭Conn
|
||||||
func (s *BaseSink) Close() {
|
func (s *BaseSink) Close() {
|
||||||
utils.Assert(SessionStateClose != s.State_)
|
if SessionStateClosed != s.State_ {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if s.Conn != nil {
|
if s.Conn != nil {
|
||||||
s.Conn.Close()
|
s.Conn.Close()
|
||||||
@@ -202,12 +208,12 @@ func (s *BaseSink) Close() {
|
|||||||
{
|
{
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.UnLock()
|
defer s.UnLock()
|
||||||
if s.State_ == SessionStateClose {
|
if s.State_ == SessionStateClosed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
state = s.State_
|
state = s.State_
|
||||||
s.State_ = SessionStateClose
|
s.State_ = SessionStateClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
if state == SessionStateTransferring {
|
if state == SessionStateTransferring {
|
||||||
|
@@ -50,7 +50,7 @@ const (
|
|||||||
SessionStateHandshakeDone = SessionState(4) //握手完成
|
SessionStateHandshakeDone = SessionState(4) //握手完成
|
||||||
SessionStateWait = SessionState(5) //位于等待队列中
|
SessionStateWait = SessionState(5) //位于等待队列中
|
||||||
SessionStateTransferring = SessionState(6) //推拉流中
|
SessionStateTransferring = SessionState(6) //推拉流中
|
||||||
SessionStateClose = SessionState(7) //关闭状态
|
SessionStateClosed = SessionState(7) //关闭状态
|
||||||
)
|
)
|
||||||
|
|
||||||
// Source 父类Source负责, 除解析流以外的所有事情
|
// Source 父类Source负责, 除解析流以外的所有事情
|
||||||
@@ -407,7 +407,7 @@ func (s *PublishSource) AddSink(sink Sink) bool {
|
|||||||
sink.Lock()
|
sink.Lock()
|
||||||
defer sink.UnLock()
|
defer sink.UnLock()
|
||||||
|
|
||||||
if SessionStateClose == sink.State() {
|
if SessionStateClosed == sink.State() {
|
||||||
log.Sugar.Warnf("AddSink失败, sink已经断开链接 %s", sink.PrintInfo())
|
log.Sugar.Warnf("AddSink失败, sink已经断开链接 %s", sink.PrintInfo())
|
||||||
} else {
|
} else {
|
||||||
transStream.AddSink(sink)
|
transStream.AddSink(sink)
|
||||||
@@ -519,7 +519,7 @@ func (s *PublishSource) doClose() {
|
|||||||
sink.Lock()
|
sink.Lock()
|
||||||
defer sink.UnLock()
|
defer sink.UnLock()
|
||||||
|
|
||||||
if SessionStateClose == sink.State() {
|
if SessionStateClosed == sink.State() {
|
||||||
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo())
|
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo())
|
||||||
} else {
|
} else {
|
||||||
sink.SetState(SessionStateWait)
|
sink.SetState(SessionStateWait)
|
||||||
@@ -527,7 +527,7 @@ func (s *PublishSource) doClose() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if SessionStateClose != sink.State() {
|
if SessionStateClosed != sink.State() {
|
||||||
sink.Flush()
|
sink.Flush()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
Reference in New Issue
Block a user