diff --git a/pkg/util/collection.go b/pkg/util/collection.go index cc3ca77..48d559d 100644 --- a/pkg/util/collection.go +++ b/pkg/util/collection.go @@ -161,3 +161,49 @@ func (c *Collection[K, T]) Clear() { c.m = nil c.Length = 0 } + +// LoadOrStore 返回键的现有值(如果存在),否则存储并返回给定的值。 +// loaded 结果表示是否找到了值,如果为 true 则表示找到了现有值,false 表示存储了新值。 +func (c *Collection[K, T]) LoadOrStore(item T) (actual T, loaded bool) { + key := item.GetKey() + if c.L != nil { + c.L.Lock() + defer c.L.Unlock() + } + + // 先尝试获取现有值 + if c.m != nil { + actual, loaded = c.m[key] + } else { + for _, v := range c.Items { + if v.GetKey() == key { + actual = v + loaded = true + break + } + } + } + + // 如果没有找到现有值,则存储新值 + if !loaded { + c.Items = append(c.Items, item) + if c.Length > 100 || c.m != nil { + if c.m == nil { + c.m = make(map[K]T) + for _, v := range c.Items { + c.m[v.GetKey()] = v + } + } + c.m[key] = item + } + c.Length++ + actual = item + + // 触发添加监听器 + for _, listener := range c.addListeners { + listener(item) + } + } + + return actual, loaded +} diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index aa4cc96..fec0838 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math/rand" - "net" "net/url" "strconv" "strings" @@ -136,7 +135,8 @@ func (d *Dialog) Start() (err error) { d.pullCtx.GoToStepConst(StepSIPPrepare) //defer d.gb.dialogs.Remove(d) - if d.StreamMode == mrtp.StreamModeTCPPassive { + switch d.StreamMode { + case mrtp.StreamModeTCPPassive: if d.gb.tcpPort > 0 { d.MediaPort = d.gb.tcpPort } else { @@ -151,7 +151,7 @@ func (d *Dialog) Start() (err error) { d.MediaPort = d.gb.MediaPort[0] } } - } else if d.StreamMode == mrtp.StreamModeUDP { + case mrtp.StreamModeUDP: if d.gb.udpPort > 0 { d.MediaPort = d.gb.udpPort } else { @@ -367,10 +367,6 @@ func (d *Dialog) Run() (err error) { d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient } } - err = d.session.Ack(d.gb) - if err != nil { - d.gb.Error("ack session err", err) - } // 移动到流数据接收步骤 d.pullCtx.GoToStepConst(pkg.StepStreaming) @@ -383,31 +379,41 @@ func (d *Dialog) Run() (err error) { case mrtp.StreamModeTCPPassive: if d.gb.tcpPort > 0 { d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort) - if d.gb.netListener != nil { - d.Info("use gb.netListener", d.gb.netListener.Addr()) - pub.Listener = d.gb.netListener - } else { - d.Info("listen tcp4", fmt.Sprintf(":%d", d.gb.tcpPort)) - pub.Listener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", d.gb.tcpPort)) - d.gb.netListener = pub.Listener + reader := &gb28181.SinglePortReader{ + SSRC: d.SSRC, + Mouth: make(chan []byte, 1), } - pub.SSRC = d.SSRC + reader, _ = d.gb.singlePorts.LoadOrStore(reader) + pub.SinglePort = reader + d.OnStop(func() { + reader.Close() + d.gb.singlePorts.Remove(reader) + }) } pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) case mrtp.StreamModeUDP: if d.gb.udpPort > 0 { d.Info("into single port mode, use gb.udpPort", d.gb.udpPort) - if d.gb.netUDPListener != nil { - d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr()) - } else { - d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort)) - d.gb.netUDPListener, _ = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4) + reader := &gb28181.SinglePortReader{ + SSRC: d.SSRC, + Mouth: make(chan []byte, 100), } + reader, _ = d.gb.singlePorts.LoadOrStore(reader) + pub.SinglePort = reader + d.OnStop(func() { + reader.Close() + d.gb.singlePorts.Remove(reader) + }) } - pub.SSRC = d.SSRC pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) } pub.StreamMode = d.StreamMode + + err = d.session.Ack(d.gb) + if err != nil { + d.gb.Error("ack session err", err) + } + return d.RunTask(&pub) } diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 1300f55..e9f64fe 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "log/slog" - "net" "net/http" "os" "regexp" @@ -18,7 +17,6 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" - "github.com/pion/rtp" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" @@ -54,7 +52,7 @@ type GB28181Plugin struct { ua *sipgo.UserAgent server *sipgo.Server devices task.WorkCollection[string, *Device] - dialogs task.WorkCollection[string, *Dialog] + dialogs util.Collection[string, *Dialog] forwardDialogs util.Collection[uint32, *ForwardDialog] platforms task.WorkCollection[string, *Platform] tcpPorts chan uint16 @@ -65,10 +63,9 @@ type GB28181Plugin struct { deviceRegisterManager task.WorkCollection[string, *DeviceRegisterQueueTask] Platforms []*gb28181.PlatformModel channels util.Collection[string, *Channel] - netListener net.Listener udpPorts chan uint16 udpPort uint16 - netUDPListener *net.UDPConn + singlePorts util.Collection[uint32, *gb28181.SinglePortReader] } var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ @@ -83,26 +80,6 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ NewPullProxy: NewPullProxy, }) -func (gb *GB28181Plugin) Dispose() { - if gb.netListener != nil { - gb.Info("gb28181 plugin dispose") - err := gb.netListener.Close() - if err != nil { - gb.Error("Close netListener error", "error", err) - } else { - gb.Info("netListener closed") - } - } - if gb.netUDPListener != nil { - err := gb.netUDPListener.Close() - if err != nil { - gb.Error("Close netUDPListener error", "error", err) - } else { - gb.Info("netUDPListener closed") - } - } -} - func init() { sip.SIPDebug = true } @@ -176,9 +153,10 @@ func (gb *GB28181Plugin) Start() (err error) { gb.AddTask(&catalogHandlerQueueTask) gb.AddTask(&gb.devices) gb.AddTask(&gb.platforms) - gb.AddTask(&gb.dialogs) gb.AddTask(&gb.deviceRegisterManager) + gb.dialogs.L = new(sync.RWMutex) gb.forwardDialogs.L = new(sync.RWMutex) + gb.singlePorts.L = new(sync.RWMutex) gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua gb.server.OnMessage(gb.OnMessage) gb.server.OnRegister(gb.OnRegister) @@ -188,39 +166,21 @@ func (gb *GB28181Plugin) Start() (err error) { gb.server.OnNotify(gb.OnNotify) if gb.MediaPort.Valid() { - gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) - gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) - gb.udpPorts = make(chan uint16, gb.MediaPort.Size()) + gb.SetDescription("media port", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) if gb.MediaPort.Size() == 0 { gb.tcpPort = gb.MediaPort[0] - gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) - //support udp - { - gb.udpPort = gb.MediaPort[0] - gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) - - if err != nil { - gb.Error("start listen", "err", err) - return errors.New("start udp listen, err" + err.Error()) - } - go gb.ReadUdpInsinglePort() - } - } else if gb.MediaPort.Size() == 1 { - gb.tcpPort = gb.MediaPort[0] + 1 - gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) - //support udp - { - gb.udpPort = gb.MediaPort[0] + 1 - gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) - - if err != nil { - gb.Error("start listen", "err", err) - return errors.New("start udp listen, err" + err.Error()) - } - - go gb.ReadUdpInsinglePort() - } + gb.AddTask(&gb28181.SinglePortTCP{ + Port: gb.tcpPort, + Collection: &gb.singlePorts, + }) + gb.udpPort = gb.MediaPort[0] + gb.AddTask(&gb28181.SinglePortUDP{ + Port: gb.udpPort, + Collection: &gb.singlePorts, + }) } else { + gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) + gb.udpPorts = make(chan uint16, gb.MediaPort.Size()) for i := range gb.MediaPort.Size() { gb.tcpPorts <- gb.MediaPort[0] + i gb.udpPorts <- gb.MediaPort[0] + i @@ -1058,19 +1018,3 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) { return } } - -func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) { - buffer := make(util.Buffer, 1024*1024) - var rtpPacket rtp.Packet - for { - n, _, err := gb.netUDPListener.ReadFromUDP(buffer) - if err != nil { - return err - } - - ps := buffer[:n] - if err := rtpPacket.Unmarshal(ps); err != nil { - continue - } - } -} diff --git a/plugin/gb28181/pkg/single_port.go b/plugin/gb28181/pkg/single_port.go new file mode 100644 index 0000000..23e61d1 --- /dev/null +++ b/plugin/gb28181/pkg/single_port.go @@ -0,0 +1,139 @@ +package gb28181 + +import ( + "fmt" + "io" + "net" + + "github.com/pion/rtp" + "m7s.live/v5/pkg/task" + "m7s.live/v5/pkg/util" +) + +type SinglePortReader struct { + SSRC uint32 + io.ReadCloser + buffered util.Buffer + Mouth chan []byte +} + +func (s *SinglePortReader) GetKey() uint32 { + return s.SSRC +} + +func (s *SinglePortReader) Read(buf []byte) (n int, err error) { + if s.buffered.Len() > 0 { + return s.buffered.Read(buf) + } + if s.ReadCloser != nil { + return s.ReadCloser.Read(buf) + } + s.buffered = <-s.Mouth + return s.buffered.Read(buf) +} + +func (s *SinglePortReader) Close() error { + if s.ReadCloser != nil { + return s.ReadCloser.Close() + } + return nil +} + +type SinglePortUDP struct { + task.Task + Port uint16 + conn *net.UDPConn + *util.Collection[uint32, *SinglePortReader] +} + +type SinglePortTCP struct { + task.Task + Port uint16 + net.Listener + *util.Collection[uint32, *SinglePortReader] +} + +func (s *SinglePortUDP) Start() (err error) { + addr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf(":%d", s.Port)) + if err != nil { + return err + } + s.conn, err = net.ListenUDP("udp4", addr) + if err == nil { + s.OnStop(func() { + s.conn.Close() + }) + } + return +} + +func (s *SinglePortUDP) Go() (err error) { + buffer := make([]byte, 2048) // 足够大的缓冲区来接收UDP包 + for { + n, _, err := s.conn.ReadFromUDP(buffer) + if err != nil { + return err + } + + var packet rtp.Packet + err = packet.Unmarshal(buffer[:n]) + if err != nil { + continue // 忽略无法解析的包 + } + + r, _ := s.LoadOrStore(&SinglePortReader{ + SSRC: packet.SSRC, + Mouth: make(chan []byte, 100), + }) + + // 创建一个新的缓冲区,包含当前接收到的数据 + packetBytes := make([]byte, n) + copy(packetBytes, buffer[:n]) + select { + case r.Mouth <- packetBytes: + default: + // 如果通道已满,则忽略该包 + } + } +} + +func (s *SinglePortTCP) Start() (err error) { + s.Listener, err = net.Listen("tcp4", fmt.Sprintf(":%d", s.Port)) + if err == nil { + s.OnStop(s.Listener.Close) + } + return +} + +func (s *SinglePortTCP) Go() (err error) { + for { + var packet rtp.Packet + var lenBytes [2]byte + conn, err := s.Listener.Accept() + if err != nil { + return err + } + _, err = io.ReadFull(conn, lenBytes[:]) + if err != nil { + return err + } + packetLength := int(lenBytes[0])<<8 | int(lenBytes[1]) + packetBytes := make([]byte, packetLength+2) + packetBytes[0] = lenBytes[0] + packetBytes[1] = lenBytes[1] + _, err = io.ReadFull(conn, packetBytes[2:]) + if err != nil { + return err + } + err = packet.Unmarshal(packetBytes[2:]) + if err != nil { + return err + } + r, _ := s.LoadOrStore(&SinglePortReader{ + SSRC: packet.SSRC, + Mouth: make(chan []byte, 10), + }) + r.Mouth <- packetBytes + r.ReadCloser = conn + } +} diff --git a/plugin/rtp/pkg/transceiver.go b/plugin/rtp/pkg/transceiver.go index 1ead725..9859973 100644 --- a/plugin/rtp/pkg/transceiver.go +++ b/plugin/rtp/pkg/transceiver.go @@ -57,8 +57,8 @@ type Receiver struct { ListenAddr string net.Listener StreamMode StreamMode - SSRC uint32 // RTP SSRC RTPMouth chan []byte + SinglePort io.ReadCloser } type PSReceiver struct { @@ -105,8 +105,8 @@ func (p *Receiver) Start() (err error) { rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn)) p.BufReader = util.NewBufReader(rtpReader) case StreamModeTCPPassive: - var conn net.Conn - if p.SSRC == 0 { + var conn io.ReadCloser + if p.SinglePort == nil { p.Info("start new listener", "addr", p.ListenAddr) p.Listener, err = net.Listen("tcp4", p.ListenAddr) if err != nil { @@ -116,8 +116,7 @@ func (p *Receiver) Start() (err error) { p.OnStop(p.Listener.Close) conn, err = p.Accept() } else { - conn, err = p.Accept() - //TODO: 公用监听端口 + conn = p.SinglePort } if err != nil { p.Error("accept", "err", err) @@ -127,15 +126,19 @@ func (p *Receiver) Start() (err error) { rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn)) p.BufReader = util.NewBufReader(rtpReader) case StreamModeUDP: - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp4", p.ListenAddr) - if err != nil { - return - } - var conn net.Conn - conn, err = net.ListenUDP("udp4", udpAddr) - if err != nil { - return + var conn io.ReadCloser + if p.SinglePort == nil { + var udpAddr *net.UDPAddr + udpAddr, err = net.ResolveUDPAddr("udp4", p.ListenAddr) + if err != nil { + return + } + conn, err = net.ListenUDP("udp4", udpAddr) + if err != nil { + return + } + } else { + conn = p.SinglePort } p.OnStop(conn.Close) rtpReader = NewRTPPayloadReader(NewRTPUDPReader(conn)) diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index e96845c..b9eb244 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -3,6 +3,7 @@ package rtsp import ( "encoding/base64" "encoding/hex" + "errors" "fmt" "net" "reflect" @@ -536,6 +537,9 @@ func (r *Receiver) Receive() (err error) { if pps, err = base64.StdEncoding.DecodeString(sprops[1]); err != nil { return err } + if len(sps) < 4 { + return errors.New("sps too short") + } } if ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil { return err