diff --git a/README.md b/README.md index 6399436..b0e6bff 100644 --- a/README.md +++ b/README.md @@ -3,16 +3,26 @@ 该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。包含了一个插件的引入机制,其他功能均由插件实现 # 引擎的基本功能 -- 引擎初始化会加载配置文件,引入的插件会自动注册到引擎中 -- 配置文件中配置会被解析并覆盖插件的默认配置 -- 引擎提供配置热更新机制(具体热更新逻辑由插件实现) -- 读取插件的特殊方法,将其注册为可供HTTP访问的API接口 -- 具有发布功能的插件,可以将流注入到引擎中 -- 具有订阅功能的插件,可以从引擎中订阅到流 -- 引擎会将流中的数据放入RingBuffer中缓存,以便插件可以获取数据 -- 引擎提供了从远端拉流和以及向远端推流的基础框架 -- 引擎包了zap日志框架 -- 引擎提供事件总线机制,可以对所有插件广播事件 +- 提供插件机制,对插件的启动,配置解析,事件派发等进行统一管理 +- 提供H264、H265、AAC、G711格式的转发 +- 提供可复用的AVCC格式、RTP格式、AnnexB格式、ADTS格式等预封装机制 +- 提供多Track机制,支持大小流,加密流扩展 +- 提供DataTrack机制,可用于实现房间文字聊天等功能 +- 提供时间戳同步机制,限速机制 +- 提供RTP包乱序重排机制 +- 提供订阅者追帧跳帧机制 +- 提供发布订阅对外推拉的基础架构 +- 提供鉴权机制的底层架构支持 +- 提供内存复用机制 +- 提供发布者断线重连机制 +- 提供按需拉流机制 +- 提供HTTP服务端口公用机制 +- 提供HTTP API接口自动注册机制 +- 提供HTTP接口中间件机制 +- 提供结构化日志 +- 提供流信息统计和输出 +- 提供事件总线机制,可以对所有插件广播事件 +- 提供配置热更新机制 ## 引擎自带HTTP接口 - 获取某一个流的详情 `/api/stream?streamPath=xxx` - 终止某一个流 `/api/closeStream?streamPath=xxx` diff --git a/common/stream.go b/common/stream.go index 6eb62a3..2f9f7f9 100644 --- a/common/stream.go +++ b/common/stream.go @@ -19,4 +19,5 @@ type IStream interface { SetIDR(Track) GetPublisherConfig() *config.Publish GetStartTime() time.Time + GetType() string } diff --git a/publisher-ps.go b/publisher-ps.go index aac7d4e..60904da 100644 --- a/publisher-ps.go +++ b/publisher-ps.go @@ -1,7 +1,7 @@ package engine import ( - "github.com/pion/rtp/v2" + "github.com/pion/rtp" "github.com/yapingcat/gomedia/go-mpeg2" "go.uber.org/zap" "m7s.live/engine/v4/codec" @@ -48,17 +48,17 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) { } else { item := p.pool.Get(len(rtp.Payload)) copy(item.Value, rtp.Payload) - for cacheItem := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); cacheItem != nil; cacheItem = p.reorder.Pop() { - if cacheItem.Seq != p.lastSeq+1 { - p.Debug("drop", zap.Uint16("seq", cacheItem.Seq), zap.Uint16("lastSeq", p.lastSeq)) + for rtpPacket := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() { + if rtpPacket.Seq != p.lastSeq+1 { + p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq)) p.Reset() if p.VideoTrack != nil { p.SetLostFlag() } } - p.Feed(cacheItem.Value) - p.lastSeq = cacheItem.Seq - cacheItem.Recycle() + p.Feed(rtpPacket.Value) + p.lastSeq = rtpPacket.Seq + rtpPacket.Recycle() } } } diff --git a/stream.go b/stream.go index 7a80f42..4afa417 100644 --- a/stream.go +++ b/stream.go @@ -218,6 +218,13 @@ type StreamSummay struct { BPS int } +func (s *Stream) GetType() string { + if s.Publisher == nil { + return "" + } + return s.Publisher.GetPublisher().Type +} + func (s *Stream) GetStartTime() time.Time { return s.StartTime } diff --git a/track/h265.go b/track/h265.go index de92920..29e5b70 100644 --- a/track/h265.go +++ b/track/h265.go @@ -29,7 +29,9 @@ func NewH265(stream IStream, stuff ...any) (vt *H265) { } func (vt *H265) WriteSliceBytes(slice []byte) { - switch t := codec.ParseH265NALUType(slice[0]); t { + t := codec.ParseH265NALUType(slice[0]) + // fmt.Println("H265 NALU Type:", t) + switch t { case codec.NAL_UNIT_VPS: vt.VPS = slice vt.ParamaterSets[0] = slice @@ -43,6 +45,9 @@ func (vt *H265) WriteSliceBytes(slice []byte) { extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS) if err == nil { vt.WriteSequenceHead(extraData) + } else { + vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err)) + vt.Stream.Close() } case codec.NAL_UNIT_CODED_SLICE_BLA, diff --git a/util/ip.go b/util/ip.go new file mode 100644 index 0000000..a26a4aa --- /dev/null +++ b/util/ip.go @@ -0,0 +1,31 @@ +package util + +import "net" + +// IsLANAddr 检测 IP 地址字符串是否是内网地址 +func IsLANAddr(ip string) (bool, string) { + return IsLANIp(net.ParseIP(ip)) +} + +// IsLANIp 检测 IP 地址是否是内网地址 +// 通过直接对比ip段范围效率更高 +func IsLANIp(ip net.IP) (bool, string) { + if ip.IsLoopback() { + return true, "127.0.0.0/24" + } + + ip4 := ip.To4() + if ip4 == nil { + return false, "" + } + if ip4[0] == 10 { // 10.0.0.0/8 + return true, "10.0.0.0/8" + } else if ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31 { // 172.16.0.0/12 + return true, "172.16.0.0/12" + } else if ip4[0] == 169 && ip4[1] == 254 { // 169.254.0.0/16 + return true, "169.254.0.0/16" + } else if ip4[0] == 192 && ip4[1] == 168 { //192.168.0.0/16 + return true, "192.168.0.0/16" + } + return false, "" +}