feat:support receive stream via UDP (#326)

Co-authored-by: yjx <yjx>
This commit is contained in:
yangjinxing123
2025-09-08 08:53:44 +08:00
committed by langhuihui
parent 2311931432
commit b5c339de6b
5 changed files with 233 additions and 46 deletions

View File

@@ -12,20 +12,20 @@ srt:
listenaddr: :6000
passphrase: foobarfoobar
gb28181:
enable: false # 是否启用GB28181协议
enable: false # 是否启用GB28181协议
autoinvite: false #建议使用false开启后会自动邀请设备推流
mediaip: 192.168.1.21 #流媒体收流IP,外网情况下使用公网IP,内网情况下使用网卡IP,不要用127.0.0.1
sipip: 192.168.1.21 #SIP通讯IP,不管公网还是内网都使用本机网卡IP,不要用127.0.0.1
mediaip: 192.168.1.21 #流媒体收流IP,外网情况下使用公网IP,内网情况下使用网卡IP,不要用127.0.0.1
sipip: 192.168.1.21 #SIP通讯IP,不管公网还是内网都使用本机网卡IP,不要用127.0.0.1
sip:
listenaddr:
- udp::5060
- udp::5060
onsub:
pull:
^\d{20}/\d{20}$: $0
^gb_\d+/(.+)$: $1
# .* : $0
# .* : $0
platforms:
- enable: false #是否启用平台
- enable: false #是否启用平台
name: "测试平台" #平台名称
servergbid: "34020000002000000002" #上级平台GBID
servergbdomain: "3402000000" #上级平台GB域
@@ -89,13 +89,10 @@ hls:
# onpub:
# transform:
# .* : 5s x 3
#rtsp:
# pull:
# live/test: rtsp://admin:1qaz2wsx3EDC@giroro.tpddns.cn:1554/Streaming/Channels/101
# live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test
s3:
auto: true # 启用自动上传
enable: false
auto: true # 启用自动上传
deleteAfterUpload: false # 上传后保留本地文件
endpoint: "storage-dev.xiding.tech"
accessKeyId: "xidinguser"
@@ -105,30 +102,38 @@ s3:
forcePathStyle: true
useSSL: true
rtsp:
# pull:
# live/test: rtsp://admin:1qaz2wsx3EDC@58.212.158.30/Streaming/Channels/101
# live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test
webrtc:
publish:
pubaudio: false
port: udp:9000-9100
snap:
enable: false
onpub:
transform:
.+:
output:
- watermark:
text: "abcd" # 水印文字内容
fontpath: /Users/dexter/Library/Fonts/MapleMono-NF-CN-Medium.ttf # 水印字体文件路径
fontcolor: "rgba(255,165,0,1)" # 水印字体颜色支持rgba格式
fontsize: 36 # 水印字体大小
offsetx: 0 # 水印位置X偏移
offsety: 0 # 水印位置Y偏移
timeinterval: 1s # 截图时间间隔
savepath: "snaps" # 截图保存路径
iframeinterval: 3 # 间隔多少帧截图
querytimedelta: 3 # 查询截图时允许的最大时间差(秒)
- watermark:
text: "abcd" # 水印文字内容
fontpath: /Users/dexter/Library/Fonts/MapleMono-NF-CN-Medium.ttf # 水印字体文件路径
fontcolor: "rgba(255,165,0,1)" # 水印字体颜色支持rgba格式
fontsize: 36 # 水印字体大小
offsetx: 0 # 水印位置X偏移
offsety: 0 # 水印位置Y偏移
timeinterval: 1s # 截图时间间隔
savepath: "snaps" # 截图保存路径
iframeinterval: 3 # 间隔多少帧截图
querytimedelta: 3 # 查询截图时允许的最大时间差(秒)
onvif:
enable: false
discoverinterval: 3 # 发现设备的间隔单位秒默认30秒建议比rtsp插件的重连间隔大点
autopull: true
autoadd: true
interfaces: # 设备发现指定网卡以及该网卡对应IP段的全局默认账号密码支持多网卡
- interfacename: 以太网 # 网卡名称 或者"以太网" "eth0"等使用ipconfig 或者 ifconfig 查看网卡名称
- interfacename: 以太网 # 网卡名称 或者"以太网" "eth0"等使用ipconfig 或者 ifconfig 查看网卡名称
username: admin # onvif 账号
password: admin # onvif 密码
# - interfacename: WLAN 2 # 网卡2
@@ -140,4 +145,4 @@ onvif:
# password: '123'
# - ip: 192.168.1.2
# username: admin
# password: '456'
# password: '456'

View File

@@ -136,18 +136,34 @@ func (d *Dialog) Start() (err error) {
d.pullCtx.GoToStepConst(StepSIPPrepare)
//defer d.gb.dialogs.Remove(d)
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
} else {
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.tcpPorts:
default:
d.pullCtx.Fail("no available tcp port")
return fmt.Errorf("no available tcp port")
}
if d.StreamMode == mrtp.StreamModeTCPPassive {
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
} else {
d.MediaPort = d.gb.MediaPort[0]
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.tcpPorts:
default:
d.pullCtx.Fail("no available tcp port")
return fmt.Errorf("no available tcp port")
}
} else {
d.MediaPort = d.gb.MediaPort[0]
}
}
} else if d.StreamMode == mrtp.StreamModeUDP {
if d.gb.udpPort > 0 {
d.MediaPort = d.gb.udpPort
} else {
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.udpPorts:
default:
return fmt.Errorf("no available udp port")
}
} else {
d.MediaPort = d.gb.MediaPort[0]
}
}
}
@@ -214,7 +230,9 @@ func (d *Dialog) Start() (err error) {
"a=connection:new",
)
case mrtp.StreamModeUDP:
/* 支持udp收流 yjx
return errors.New("do not support udp mode")
*/
default:
sdpInfo = append(sdpInfo,
"a=setup:passive",
@@ -354,9 +372,10 @@ func (d *Dialog) Run() (err error) {
var pub mrtp.PSReceiver
pub.Publisher = d.pullCtx.Publisher
if d.StreamMode == mrtp.StreamModeTCPActive {
switch d.StreamMode {
case mrtp.StreamModeTCPActive:
pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
} else {
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
if d.gb.netListener != nil {
@@ -370,6 +389,18 @@ func (d *Dialog) Run() (err error) {
pub.SSRC = d.SSRC
}
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
if d.gb.udpPort > 0 {
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
if d.gb.netUDPListener != nil {
d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr())
} else {
d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort))
d.gb.netUDPListener, _ = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4)
}
}
pub.SSRC = d.SSRC
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
pub.StreamMode = d.StreamMode
return d.RunTask(&pub)
@@ -380,9 +411,16 @@ func (d *Dialog) GetKey() string {
}
func (d *Dialog) Dispose() {
if d.gb.tcpPort == 0 {
// 如果没有设置tcp端口则将MediaPort设置为0表示不再使用
d.gb.tcpPorts <- d.MediaPort
if d.StreamMode == mrtp.StreamModeUDP {
if d.gb.udpPort == 0 { //多端口
// 如果没有设置udp端口则将MediaPort设置为0表示不再使用
d.gb.udpPorts <- d.MediaPort
}
} else {
if d.gb.tcpPort == 0 {
// 如果没有设置tcp端口则将MediaPort设置为0表示不再使用
d.gb.tcpPorts <- d.MediaPort
}
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
if d.session != nil {

View File

@@ -17,6 +17,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/pion/rtp"
"github.com/rs/zerolog"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/config"
@@ -65,6 +66,9 @@ type GB28181Plugin struct {
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *Channel]
netListener net.Listener
udpPorts chan uint16
udpPort uint16
netUDPListener *net.UDPConn
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -89,6 +93,14 @@ func (gb *GB28181Plugin) Dispose() {
gb.Info("netListener closed")
}
}
if gb.netUDPListener != nil {
err := gb.netUDPListener.Close()
if err != nil {
gb.Error("Close netUDPListener error", "error", err)
} else {
gb.Info("netUDPListener closed")
}
}
}
func init() {
@@ -173,15 +185,40 @@ func (gb *GB28181Plugin) Start() (err error) {
if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
if gb.MediaPort.Size() == 0 {
gb.tcpPort = gb.MediaPort[0]
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.udpPort = gb.MediaPort[0]
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
} else if gb.MediaPort.Size() == 1 {
gb.tcpPort = gb.MediaPort[0] + 1
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.udpPort = gb.MediaPort[0] + 1
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
} else {
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
gb.udpPorts <- gb.MediaPort[0] + i
}
}
} else {
@@ -1009,3 +1046,19 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
return
}
}
func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) {
buffer := make(util.Buffer, 1024*1024)
var rtpPacket rtp.Packet
for {
n, _, err := gb.netUDPListener.ReadFromUDP(buffer)
if err != nil {
return err
}
ps := buffer[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
continue
}
}
}

View File

@@ -15,19 +15,33 @@ type IRTPReader interface {
type RTPUDPReader struct {
io.Reader
buf [MTUSize]byte
RTPReorder[*rtp.Packet]
UdpCacheSize int
}
func NewRTPUDPReader(r io.Reader) *RTPUDPReader {
return &RTPUDPReader{Reader: r}
}
func (r *RTPUDPReader) Read(packet *rtp.Packet) (err error) {
n, err := r.Reader.Read(r.buf[:])
if err != nil {
return err
func (r *RTPUDPReader) Read(packet *rtp.Packet) error {
for {
ordered := r.Pop()
if ordered != nil {
*packet = *ordered
return nil
}
var buf [MTUSize]byte
var pack rtp.Packet
n, err := r.Reader.Read(buf[:])
if err != nil {
return err
}
err = pack.Unmarshal(buf[:n])
if err != nil {
return err
}
r.Push(pack.SequenceNumber, &pack)
}
return packet.Unmarshal(r.buf[:n])
}
type RTPTCPReader struct {

77
plugin/rtp/pkg/reorder.go Normal file
View File

@@ -0,0 +1,77 @@
package rtp
var RTPReorderBufferLen uint16 = 50
// RTPReorder RTP包乱序重排
type RTPReorder[T comparable] struct {
lastSeq uint16 //最新收到的rtp包序号
queue []T // 缓存队列,0号元素位置代表lastReq+1永远保持为空
Total uint32 // 总共收到的包数量
Drop uint32 // 丢弃的包数量
}
func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
p.Total++
// 初始化
if len(p.queue) == 0 {
p.lastSeq = seq
p.queue = make([]T, RTPReorderBufferLen)
return v
}
if seq < p.lastSeq && p.lastSeq-seq < 0x8000 {
// 旧的包直接丢弃
p.Drop++
return
}
delta := seq - p.lastSeq
if delta == 0 {
// 重复包
p.Drop++
return
}
if delta == 1 {
// 正常顺序,无需缓存
p.lastSeq = seq
p.pop()
return v
}
if RTPReorderBufferLen < delta {
//超过缓存最大范围,无法挽回,只能造成丢包(序号断裂)
for {
p.lastSeq++
delta--
head := p.pop()
// 可以放得进去了
if delta == RTPReorderBufferLen {
p.queue[RTPReorderBufferLen-1] = v
p.queue[0] = result
return head
} else if head != result {
p.Drop++
}
}
} else {
// 出现后面的包先到达,缓存起来
p.queue[delta-1] = v
return
}
}
func (p *RTPReorder[T]) pop() (result T) {
copy(p.queue, p.queue[1:]) //整体数据向前移动一位保持0号元素代表lastSeq+1
p.queue[RTPReorderBufferLen-1] = result
return p.queue[0]
}
// Pop 从缓存中取出一个包需要连续调用直到返回nil
func (p *RTPReorder[T]) Pop() (result T) {
if len(p.queue) == 0 {
return
}
if next := p.queue[0]; next != result {
result = next
p.lastSeq++
p.pop()
}
return
}