Stream增加一个GetType方法用于获取发布者类型

增加了一个获取本地ip的util方法
pushPS采用rtp v1版
This commit is contained in:
dexter
2023-03-13 12:50:58 +08:00
parent e92fd1edfd
commit 6e874008e4
6 changed files with 72 additions and 18 deletions

View File

@@ -3,16 +3,26 @@
该项目为m7s的引擎部分该部分逻辑是流媒体服务器的核心转发逻辑。包含了一个插件的引入机制其他功能均由插件实现
# 引擎的基本功能
- 引擎初始化会加载配置文件,引入的插件会自动注册到引擎中
- 配置文件中配置会被解析并覆盖插件的默认配置
- 引擎提供配置热更新机制(具体热更新逻辑由插件实现)
- 读取插件的特殊方法将其注册为可供HTTP访问的API接口
- 具有发布功能的插件,可以将流注入到引擎中
- 具有订阅功能的插件,可以从引擎中订阅到流
- 引擎会将流中的数据放入RingBuffer中缓存以便插件可以获取数据
- 引擎提供了从远端拉流和以及向远端推流的基础框架
- 引擎包了zap日志框架
- 引擎提供事件总线机制,可以对所有插件广播事件
- 提供插件机制,对插件的启动,配置解析,事件派发等进行统一管理
- 提供H264、H265、AAC、G711格式的转发
- 提供可复用的AVCC格式、RTP格式、AnnexB格式、ADTS格式等预封装机制
- 提供多Track机制支持大小流加密流扩展
- 提供DataTrack机制可用于实现房间文字聊天等功能
- 提供时间戳同步机制,限速机制
- 提供RTP包乱序重排机制
- 提供订阅者追帧跳帧机制
- 提供发布订阅对外推拉的基础架构
- 提供鉴权机制的底层架构支持
- 提供内存复用机制
- 提供发布者断线重连机制
- 提供按需拉流机制
- 提供HTTP服务端口公用机制
- 提供HTTP API接口自动注册机制
- 提供HTTP接口中间件机制
- 提供结构化日志
- 提供流信息统计和输出
- 提供事件总线机制,可以对所有插件广播事件
- 提供配置热更新机制
## 引擎自带HTTP接口
- 获取某一个流的详情 `/api/stream?streamPath=xxx`
- 终止某一个流 `/api/closeStream?streamPath=xxx`

View File

@@ -19,4 +19,5 @@ type IStream interface {
SetIDR(Track)
GetPublisherConfig() *config.Publish
GetStartTime() time.Time
GetType() string
}

View File

@@ -1,7 +1,7 @@
package engine
import (
"github.com/pion/rtp/v2"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/go-mpeg2"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
@@ -48,17 +48,17 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
} else {
item := p.pool.Get(len(rtp.Payload))
copy(item.Value, rtp.Payload)
for cacheItem := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); cacheItem != nil; cacheItem = p.reorder.Pop() {
if cacheItem.Seq != p.lastSeq+1 {
p.Debug("drop", zap.Uint16("seq", cacheItem.Seq), zap.Uint16("lastSeq", p.lastSeq))
for rtpPacket := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() {
if rtpPacket.Seq != p.lastSeq+1 {
p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq))
p.Reset()
if p.VideoTrack != nil {
p.SetLostFlag()
}
}
p.Feed(cacheItem.Value)
p.lastSeq = cacheItem.Seq
cacheItem.Recycle()
p.Feed(rtpPacket.Value)
p.lastSeq = rtpPacket.Seq
rtpPacket.Recycle()
}
}
}

View File

@@ -218,6 +218,13 @@ type StreamSummay struct {
BPS int
}
func (s *Stream) GetType() string {
if s.Publisher == nil {
return ""
}
return s.Publisher.GetPublisher().Type
}
func (s *Stream) GetStartTime() time.Time {
return s.StartTime
}

View File

@@ -29,7 +29,9 @@ func NewH265(stream IStream, stuff ...any) (vt *H265) {
}
func (vt *H265) WriteSliceBytes(slice []byte) {
switch t := codec.ParseH265NALUType(slice[0]); t {
t := codec.ParseH265NALUType(slice[0])
// fmt.Println("H265 NALU Type:", t)
switch t {
case codec.NAL_UNIT_VPS:
vt.VPS = slice
vt.ParamaterSets[0] = slice
@@ -43,6 +45,9 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS)
if err == nil {
vt.WriteSequenceHead(extraData)
} else {
vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err))
vt.Stream.Close()
}
case
codec.NAL_UNIT_CODED_SLICE_BLA,

31
util/ip.go Normal file
View File

@@ -0,0 +1,31 @@
package util
import "net"
// IsLANAddr 检测 IP 地址字符串是否是内网地址
func IsLANAddr(ip string) (bool, string) {
return IsLANIp(net.ParseIP(ip))
}
// IsLANIp 检测 IP 地址是否是内网地址
// 通过直接对比ip段范围效率更高
func IsLANIp(ip net.IP) (bool, string) {
if ip.IsLoopback() {
return true, "127.0.0.0/24"
}
ip4 := ip.To4()
if ip4 == nil {
return false, ""
}
if ip4[0] == 10 { // 10.0.0.0/8
return true, "10.0.0.0/8"
} else if ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31 { // 172.16.0.0/12
return true, "172.16.0.0/12"
} else if ip4[0] == 169 && ip4[1] == 254 { // 169.254.0.0/16
return true, "169.254.0.0/16"
} else if ip4[0] == 192 && ip4[1] == 168 { //192.168.0.0/16
return true, "192.168.0.0/16"
}
return false, ""
}