fix: gb single port

This commit is contained in:
langhuihui
2025-09-11 01:22:00 +08:00
parent 7b38bd0500
commit ea512e1dd9
6 changed files with 249 additions and 107 deletions

View File

@@ -161,3 +161,49 @@ func (c *Collection[K, T]) Clear() {
c.m = nil
c.Length = 0
}
// LoadOrStore 返回键的现有值(如果存在),否则存储并返回给定的值。
// loaded 结果表示是否找到了值,如果为 true 则表示找到了现有值false 表示存储了新值。
func (c *Collection[K, T]) LoadOrStore(item T) (actual T, loaded bool) {
key := item.GetKey()
if c.L != nil {
c.L.Lock()
defer c.L.Unlock()
}
// 先尝试获取现有值
if c.m != nil {
actual, loaded = c.m[key]
} else {
for _, v := range c.Items {
if v.GetKey() == key {
actual = v
loaded = true
break
}
}
}
// 如果没有找到现有值,则存储新值
if !loaded {
c.Items = append(c.Items, item)
if c.Length > 100 || c.m != nil {
if c.m == nil {
c.m = make(map[K]T)
for _, v := range c.Items {
c.m[v.GetKey()] = v
}
}
c.m[key] = item
}
c.Length++
actual = item
// 触发添加监听器
for _, listener := range c.addListeners {
listener(item)
}
}
return actual, loaded
}

View File

@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
@@ -136,7 +135,8 @@ func (d *Dialog) Start() (err error) {
d.pullCtx.GoToStepConst(StepSIPPrepare)
//defer d.gb.dialogs.Remove(d)
if d.StreamMode == mrtp.StreamModeTCPPassive {
switch d.StreamMode {
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
} else {
@@ -151,7 +151,7 @@ func (d *Dialog) Start() (err error) {
d.MediaPort = d.gb.MediaPort[0]
}
}
} else if d.StreamMode == mrtp.StreamModeUDP {
case mrtp.StreamModeUDP:
if d.gb.udpPort > 0 {
d.MediaPort = d.gb.udpPort
} else {
@@ -367,10 +367,6 @@ func (d *Dialog) Run() (err error) {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
}
err = d.session.Ack(d.gb)
if err != nil {
d.gb.Error("ack session err", err)
}
// 移动到流数据接收步骤
d.pullCtx.GoToStepConst(pkg.StepStreaming)
@@ -383,31 +379,41 @@ func (d *Dialog) Run() (err error) {
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
if d.gb.netListener != nil {
d.Info("use gb.netListener", d.gb.netListener.Addr())
pub.Listener = d.gb.netListener
} else {
d.Info("listen tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
pub.Listener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", d.gb.tcpPort))
d.gb.netListener = pub.Listener
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 1),
}
pub.SSRC = d.SSRC
reader, _ = d.gb.singlePorts.LoadOrStore(reader)
pub.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
if d.gb.udpPort > 0 {
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
if d.gb.netUDPListener != nil {
d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr())
} else {
d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort))
d.gb.netUDPListener, _ = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4)
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 100),
}
reader, _ = d.gb.singlePorts.LoadOrStore(reader)
pub.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
pub.SSRC = d.SSRC
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
pub.StreamMode = d.StreamMode
err = d.session.Ack(d.gb)
if err != nil {
d.gb.Error("ack session err", err)
}
return d.RunTask(&pub)
}

View File

@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"regexp"
@@ -18,7 +17,6 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/pion/rtp"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
@@ -54,7 +52,7 @@ type GB28181Plugin struct {
ua *sipgo.UserAgent
server *sipgo.Server
devices task.WorkCollection[string, *Device]
dialogs task.WorkCollection[string, *Dialog]
dialogs util.Collection[string, *Dialog]
forwardDialogs util.Collection[uint32, *ForwardDialog]
platforms task.WorkCollection[string, *Platform]
tcpPorts chan uint16
@@ -65,10 +63,9 @@ type GB28181Plugin struct {
deviceRegisterManager task.WorkCollection[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *Channel]
netListener net.Listener
udpPorts chan uint16
udpPort uint16
netUDPListener *net.UDPConn
singlePorts util.Collection[uint32, *gb28181.SinglePortReader]
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -83,26 +80,6 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
NewPullProxy: NewPullProxy,
})
func (gb *GB28181Plugin) Dispose() {
if gb.netListener != nil {
gb.Info("gb28181 plugin dispose")
err := gb.netListener.Close()
if err != nil {
gb.Error("Close netListener error", "error", err)
} else {
gb.Info("netListener closed")
}
}
if gb.netUDPListener != nil {
err := gb.netUDPListener.Close()
if err != nil {
gb.Error("Close netUDPListener error", "error", err)
} else {
gb.Info("netUDPListener closed")
}
}
}
func init() {
sip.SIPDebug = true
}
@@ -176,9 +153,10 @@ func (gb *GB28181Plugin) Start() (err error) {
gb.AddTask(&catalogHandlerQueueTask)
gb.AddTask(&gb.devices)
gb.AddTask(&gb.platforms)
gb.AddTask(&gb.dialogs)
gb.AddTask(&gb.deviceRegisterManager)
gb.dialogs.L = new(sync.RWMutex)
gb.forwardDialogs.L = new(sync.RWMutex)
gb.singlePorts.L = new(sync.RWMutex)
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnMessage(gb.OnMessage)
gb.server.OnRegister(gb.OnRegister)
@@ -188,39 +166,21 @@ func (gb *GB28181Plugin) Start() (err error) {
gb.server.OnNotify(gb.OnNotify)
if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
gb.SetDescription("media port", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
if gb.MediaPort.Size() == 0 {
gb.tcpPort = gb.MediaPort[0]
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.AddTask(&gb28181.SinglePortTCP{
Port: gb.tcpPort,
Collection: &gb.singlePorts,
})
gb.udpPort = gb.MediaPort[0]
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
} else if gb.MediaPort.Size() == 1 {
gb.tcpPort = gb.MediaPort[0] + 1
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.udpPort = gb.MediaPort[0] + 1
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
gb.AddTask(&gb28181.SinglePortUDP{
Port: gb.udpPort,
Collection: &gb.singlePorts,
})
} else {
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
gb.udpPorts <- gb.MediaPort[0] + i
@@ -1058,19 +1018,3 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
return
}
}
func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) {
buffer := make(util.Buffer, 1024*1024)
var rtpPacket rtp.Packet
for {
n, _, err := gb.netUDPListener.ReadFromUDP(buffer)
if err != nil {
return err
}
ps := buffer[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
continue
}
}
}

View File

@@ -0,0 +1,139 @@
package gb28181
import (
"fmt"
"io"
"net"
"github.com/pion/rtp"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
type SinglePortReader struct {
SSRC uint32
io.ReadCloser
buffered util.Buffer
Mouth chan []byte
}
func (s *SinglePortReader) GetKey() uint32 {
return s.SSRC
}
func (s *SinglePortReader) Read(buf []byte) (n int, err error) {
if s.buffered.Len() > 0 {
return s.buffered.Read(buf)
}
if s.ReadCloser != nil {
return s.ReadCloser.Read(buf)
}
s.buffered = <-s.Mouth
return s.buffered.Read(buf)
}
func (s *SinglePortReader) Close() error {
if s.ReadCloser != nil {
return s.ReadCloser.Close()
}
return nil
}
type SinglePortUDP struct {
task.Task
Port uint16
conn *net.UDPConn
*util.Collection[uint32, *SinglePortReader]
}
type SinglePortTCP struct {
task.Task
Port uint16
net.Listener
*util.Collection[uint32, *SinglePortReader]
}
func (s *SinglePortUDP) Start() (err error) {
addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf(":%d", s.Port))
if err != nil {
return err
}
s.conn, err = net.ListenUDP("udp4", addr)
if err == nil {
s.OnStop(func() {
s.conn.Close()
})
}
return
}
func (s *SinglePortUDP) Go() (err error) {
buffer := make([]byte, 2048) // 足够大的缓冲区来接收UDP包
for {
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
return err
}
var packet rtp.Packet
err = packet.Unmarshal(buffer[:n])
if err != nil {
continue // 忽略无法解析的包
}
r, _ := s.LoadOrStore(&SinglePortReader{
SSRC: packet.SSRC,
Mouth: make(chan []byte, 100),
})
// 创建一个新的缓冲区,包含当前接收到的数据
packetBytes := make([]byte, n)
copy(packetBytes, buffer[:n])
select {
case r.Mouth <- packetBytes:
default:
// 如果通道已满,则忽略该包
}
}
}
func (s *SinglePortTCP) Start() (err error) {
s.Listener, err = net.Listen("tcp4", fmt.Sprintf(":%d", s.Port))
if err == nil {
s.OnStop(s.Listener.Close)
}
return
}
func (s *SinglePortTCP) Go() (err error) {
for {
var packet rtp.Packet
var lenBytes [2]byte
conn, err := s.Listener.Accept()
if err != nil {
return err
}
_, err = io.ReadFull(conn, lenBytes[:])
if err != nil {
return err
}
packetLength := int(lenBytes[0])<<8 | int(lenBytes[1])
packetBytes := make([]byte, packetLength+2)
packetBytes[0] = lenBytes[0]
packetBytes[1] = lenBytes[1]
_, err = io.ReadFull(conn, packetBytes[2:])
if err != nil {
return err
}
err = packet.Unmarshal(packetBytes[2:])
if err != nil {
return err
}
r, _ := s.LoadOrStore(&SinglePortReader{
SSRC: packet.SSRC,
Mouth: make(chan []byte, 10),
})
r.Mouth <- packetBytes
r.ReadCloser = conn
}
}

View File

@@ -57,8 +57,8 @@ type Receiver struct {
ListenAddr string
net.Listener
StreamMode StreamMode
SSRC uint32 // RTP SSRC
RTPMouth chan []byte
SinglePort io.ReadCloser
}
type PSReceiver struct {
@@ -105,8 +105,8 @@ func (p *Receiver) Start() (err error) {
rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn))
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeTCPPassive:
var conn net.Conn
if p.SSRC == 0 {
var conn io.ReadCloser
if p.SinglePort == nil {
p.Info("start new listener", "addr", p.ListenAddr)
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
if err != nil {
@@ -116,8 +116,7 @@ func (p *Receiver) Start() (err error) {
p.OnStop(p.Listener.Close)
conn, err = p.Accept()
} else {
conn, err = p.Accept()
//TODO: 公用监听端口
conn = p.SinglePort
}
if err != nil {
p.Error("accept", "err", err)
@@ -127,16 +126,20 @@ func (p *Receiver) Start() (err error) {
rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn))
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeUDP:
var conn io.ReadCloser
if p.SinglePort == nil {
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp4", p.ListenAddr)
if err != nil {
return
}
var conn net.Conn
conn, err = net.ListenUDP("udp4", udpAddr)
if err != nil {
return
}
} else {
conn = p.SinglePort
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPUDPReader(conn))
p.BufReader = util.NewBufReader(rtpReader)

View File

@@ -3,6 +3,7 @@ package rtsp
import (
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"net"
"reflect"
@@ -536,6 +537,9 @@ func (r *Receiver) Receive() (err error) {
if pps, err = base64.StdEncoding.DecodeString(sprops[1]); err != nil {
return err
}
if len(sps) < 4 {
return errors.New("sps too short")
}
}
if ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
return err