diff --git a/stream/forward_sink.go b/stream/forward_sink.go index 2973aed..9c2ddc6 100644 --- a/stream/forward_sink.go +++ b/stream/forward_sink.go @@ -2,6 +2,7 @@ package stream import ( "encoding/binary" + "fmt" "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/log" "github.com/lkmio/transport" @@ -34,6 +35,7 @@ func (t TransportType) String() string { type ForwardSink struct { BaseSink socket transport.Transport + spareSocket transport.Transport // 对讲备选udp发送 transportType TransportType receiveTimer *time.Timer ssrc uint32 @@ -68,7 +70,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) { func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { // TCP等待连接后再转发数据 - if TransportTypeUDP != f.transportType && f.Conn == nil { + if TransportTypeUDP != f.transportType && f.Conn == nil && f.spareSocket == nil { return nil } @@ -121,9 +123,15 @@ func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]by processedData = data } - if TransportTypeUDP == f.transportType { + spare := f.Conn == nil && f.spareSocket != nil + if TransportTypeUDP == f.transportType || spare { + var sender = f.socket + if spare { + sender = f.spareSocket + } + for _, datum := range processedData { - f.socket.(*transport.UDPClient).Write(datum.Get()[2:]) + sender.(*transport.UDPClient).Write(datum.Get()[2:]) } } else { return f.BaseSink.Write(index, processedData, ts, keyVideo) @@ -140,6 +148,10 @@ func (f *ForwardSink) Close() { f.socket.Close() } + if f.spareSocket != nil { + f.spareSocket.Close() + } + if f.receiveTimer != nil { f.receiveTimer.Stop() } @@ -175,14 +187,12 @@ func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, s remoteAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, 0, err - } - - client, err := manager.NewUDPClient(remoteAddr) - if err != nil { + } else if client, err := manager.NewUDPClient(remoteAddr); err != nil { return nil, 0, err + } else { + sink.socket = client } - sink.socket = client } else if transportType == TransportTypeTCPClient { client := transport.TCPClient{} err := manager.AllocPort(true, func(port uint16) error { @@ -220,7 +230,20 @@ func NewForwardSink(transportType TransportType, protocol TransStreamProtocol, s tcpServer.SetHandler(sink) tcpServer.Accept() sink.socket = tcpServer - sink.StartReceiveTimer() + + // 同时创建udp发送器, 兼容不支持tcp对讲的设备 + if TransStreamGBTalk == protocol { + localAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", AppConfig.ListenIP, tcpServer.ListenPort())) + remoteAddr, err := net.ResolveUDPAddr("udp", addr) + + udp := &transport.UDPClient{} + err = udp.Connect(localAddr, remoteAddr) + if err == nil { + sink.spareSocket = udp + } + } else { + sink.StartReceiveTimer() + } } return sink, sink.socket.ListenPort(), nil