diff --git a/api.go b/api.go index 16e056c..cbcd354 100644 --- a/api.go +++ b/api.go @@ -97,10 +97,9 @@ func startApiServer(addr string) { func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { //请求参数 v := &struct { - Source string `json:"source"` //SourceId - Transport string `json:"transport,omitempty"` - Setup string `json:"setup"` //active/passive - SSRC uint32 `json:"ssrc,omitempty"` + Source string `json:"source"` //SourceId + Setup string `json:"setup"` //active/passive + SSRC uint32 `json:"ssrc,omitempty"` }{} //返回监听的端口 @@ -129,9 +128,17 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { return } - tcp := strings.Contains(v.Transport, "tcp") + tcp := true var active bool - if tcp && "active" == v.Setup { + if v.Setup == "passive" { + } else if v.Setup == "active" { + active = true + } else { + tcp = false + //udp收流 + } + + if tcp && active { if !stream.AppConfig.GB28181.IsMultiPort() { err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "创建GB28181 Source失败, 单端口模式下不能主动拉流"} } else if !tcp { @@ -143,8 +150,6 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { if err != nil { return } - - active = true } _, port, err := gb28181.NewGBSource(v.Source, v.SSRC, tcp, active) diff --git a/gb28181/source.go b/gb28181/source.go index c5fb874..fa8bb83 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -40,9 +40,7 @@ type GBSource interface { TransportType() TransportType - PrepareTransDeMuxer(id string, ssrc uint32) - - PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource) + PreparePublish(conn net.Conn, ssrc uint32, source GBSource) SetConn(conn net.Conn) @@ -56,108 +54,8 @@ type BaseGBSource struct { audioStream utils.AVStream videoStream utils.AVStream - ssrc uint32 - transport transport.ITransport - receiveBuffer *stream.ReceiveBuffer -} - -func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint16, error) { - if tcp { - utils.Assert(stream.AppConfig.GB28181.EnableTCP()) - } else { - utils.Assert(stream.AppConfig.GB28181.EnableUDP()) - } - - if active { - utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort()) - } - - var source GBSource - var port uint16 - var err error - - if active { - source, port, err = NewActiveSource() - } else if tcp { - source = NewPassiveSource() - } else { - source = NewUDPSource() - } - - if err != nil { - return nil, 0, err - } - - //单端口模式,绑定ssrc - if !stream.AppConfig.GB28181.IsMultiPort() { - var success bool - if tcp { - success = SharedTCPServer.filter.AddSource(ssrc, source) - } else { - success = SharedUDPServer.filter.AddSource(ssrc, source) - } - - if !success { - return nil, 0, fmt.Errorf("ssrc conflict") - } - - port = stream.AppConfig.GB28181.Port[0] - } else if !active { - if tcp { - err := TransportManger.AllocTransport(true, func(port_ uint16) error { - - addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_)) - server, err := NewTCPServer(addr, NewSingleFilter(source)) - if err != nil { - - return err - } - - source.(*PassiveSource).transport = server.tcp - port = port_ - return nil - }) - - if err != nil { - return nil, 0, err - } - } else { - err := TransportManger.AllocTransport(false, func(port_ uint16) error { - - addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_)) - server, err := NewUDPServer(addr, NewSingleFilter(source)) - if err != nil { - return err - } - - source.(*UDPSource).transport = server.udp - port = port_ - return nil - }) - - if err != nil { - return nil, 0, err - } - } - } - - source.PrepareTransDeMuxer(id, ssrc) - _, state := stream.PreparePublishSource(source, false) - if utils.HookStateOK != state { - return nil, 0, fmt.Errorf("error code %d", state) - } - - var bufferBlockCount int - if active || tcp { - bufferBlockCount = stream.ReceiveBufferTCPBlockCount - } else { - bufferBlockCount = stream.ReceiveBufferUdpBlockCount - } - - source.SetType(stream.SourceType28181) - source.Init(source.Input, source.Close, bufferBlockCount) - go source.LoopEvent() - return source, port, err + ssrc uint32 + transport transport.ITransport } func (source *BaseGBSource) InputRtp(pkt *rtp.Packet) error { @@ -168,14 +66,14 @@ func (source *BaseGBSource) Transport() TransportType { panic("implement me") } -func (source *BaseGBSource) PrepareTransDeMuxer(id string, ssrc uint32) { - source.Id_ = id - source.ssrc = ssrc +func (source *BaseGBSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) { source.deMuxerCtx = libmpeg.NewPSDeMuxerContext(make([]byte, PsProbeBufferSize)) source.deMuxerCtx.SetHandler(source) + source.SetType(stream.SourceType28181) + source.PublishSource.Init(inputCB, closeCB, receiveQueueSize) } -// Input 输入PS流 +// Input 解析PS流, 确保在loop event协程调用此函数 func (source *BaseGBSource) Input(data []byte) error { return source.deMuxerCtx.Input(data) } @@ -345,18 +243,9 @@ func (source *BaseGBSource) SetSSRC(ssrc uint32) { source.ssrc = ssrc } -func (source *BaseGBSource) SetReceiveBuffer(buffer *stream.ReceiveBuffer) { - source.receiveBuffer = buffer -} - -func (source *BaseGBSource) ReceiveBuffer() *stream.ReceiveBuffer { - return source.receiveBuffer -} - -func (source *BaseGBSource) PreparePublishSource(conn net.Conn, ssrc uint32, source_ GBSource) { +func (source *BaseGBSource) PreparePublish(conn net.Conn, ssrc uint32, source_ GBSource) { source.SetConn(conn) source.SetSSRC(ssrc) - source.SetState(stream.SessionStateTransferring) if stream.AppConfig.Hook.EnablePublishEvent() { @@ -372,3 +261,102 @@ func (source *BaseGBSource) PreparePublishSource(conn net.Conn, ssrc uint32, sou }() } } + +// NewGBSource 创建gb源,返回监听的收流端口 +func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint16, error) { + if tcp { + utils.Assert(stream.AppConfig.GB28181.EnableTCP()) + } else { + utils.Assert(stream.AppConfig.GB28181.EnableUDP()) + } + + if active { + utils.Assert(tcp && stream.AppConfig.GB28181.EnableTCP() && stream.AppConfig.GB28181.IsMultiPort()) + } + + var source GBSource + var port uint16 + var err error + + if active { + source, port, err = NewActiveSource() + } else if tcp { + source = NewPassiveSource() + } else { + source = NewUDPSource() + } + + if err != nil { + return nil, 0, err + } + + //单端口模式,绑定ssrc + if !stream.AppConfig.GB28181.IsMultiPort() { + var success bool + if tcp { + success = SharedTCPServer.filter.AddSource(ssrc, source) + } else { + success = SharedUDPServer.filter.AddSource(ssrc, source) + } + + if !success { + return nil, 0, fmt.Errorf("ssrc conflict") + } + + port = stream.AppConfig.GB28181.Port[0] + } else if !active { + if tcp { + err := TransportManger.AllocTransport(true, func(port_ uint16) error { + + addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_)) + server, err := NewTCPServer(addr, NewSingleFilter(source)) + if err != nil { + + return err + } + + source.(*PassiveSource).transport = server.tcp + port = port_ + return nil + }) + + if err != nil { + return nil, 0, err + } + } else { + err := TransportManger.AllocTransport(false, func(port_ uint16) error { + + addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", stream.AppConfig.GB28181.Addr, port_)) + server, err := NewUDPServer(addr, NewSingleFilter(source)) + if err != nil { + return err + } + + source.(*UDPSource).transport = server.udp + port = port_ + return nil + }) + + if err != nil { + return nil, 0, err + } + } + } + + var bufferBlockCount int + if active || tcp { + bufferBlockCount = stream.ReceiveBufferTCPBlockCount + } else { + bufferBlockCount = stream.ReceiveBufferUdpBlockCount + } + + source.SetId(id) + source.SetSSRC(ssrc) + source.Init(source.Input, source.Close, bufferBlockCount) + if _, state := stream.PreparePublishSource(source, false); utils.HookStateOK != state { + return nil, 0, fmt.Errorf("error code %d", state) + } + + go source.LoopEvent() + return source, port, err +} diff --git a/gb28181/source_passive.go b/gb28181/source_passive.go index fd40bc3..23ca294 100644 --- a/gb28181/source_passive.go +++ b/gb28181/source_passive.go @@ -16,8 +16,8 @@ func (t PassiveSource) TransportType() TransportType { return TransportTypeTCPPassive } +// InputRtp tcp收流,直接解析ps流. func (t PassiveSource) InputRtp(pkt *rtp.Packet) error { - //TCP收流, 解析rtp后直接送给ps解析 t.Input(pkt.Payload) return nil } diff --git a/gb28181/source_udp.go b/gb28181/source_udp.go index 5cabce0..e4c6d3e 100644 --- a/gb28181/source_udp.go +++ b/gb28181/source_udp.go @@ -25,7 +25,7 @@ func (u UDPSource) TransportType() TransportType { return TransportTypeUDP } -// InputRtp UDP收流会先拷贝rtp包,交给jitter buffer处理后再发给source +// InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source func (u UDPSource) InputRtp(pkt *rtp.Packet) error { block := u.receiveBuffer.GetBlock() @@ -33,12 +33,9 @@ func (u UDPSource) InputRtp(pkt *rtp.Packet) error { pkt.Payload = block[:len(pkt.Payload)] u.jitterBuffer.Push(pkt) - for { - pkt, _ := u.jitterBuffer.Pop() - if pkt == nil { - return nil - } - - u.PublishSource.Input(pkt.Payload) + for rtp, _ := u.jitterBuffer.Pop(); rtp != nil; rtp, _ = u.jitterBuffer.Pop() { + u.PublishSource.Input(rtp.Payload) } + + return nil } diff --git a/gb28181/tcp_server.go b/gb28181/tcp_server.go index 3491213..eeb9402 100644 --- a/gb28181/tcp_server.go +++ b/gb28181/tcp_server.go @@ -1,10 +1,8 @@ package gb28181 import ( - "github.com/pion/rtp" "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/lkm/log" - "github.com/yangjiechina/lkm/stream" "net" ) @@ -13,10 +11,46 @@ type TCPServer struct { filter Filter } -type TCPSession struct { - source GBSource - decoder *transport.LengthFieldFrameDecoder - receiveBuffer *stream.ReceiveBuffer +func (T *TCPServer) OnConnected(conn net.Conn) []byte { + log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String()) + + con := conn.(*transport.Conn) + session := NewTCPSession(conn, T.filter) + con.Data = session + + //TCP使用ReceiveBuffer区别在于,多端口模式从第一包就使用ReceiveBuffer, 单端口模式先解析出ssrc, 找到source. 后续再使用ReceiveBuffer. + if session.source != nil { + return session.receiveBuffer.GetBlock() + } + return nil +} + +func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte { + session := conn.(*transport.Conn).Data.(*TCPSession) + + //单端口收流 + if session.source == nil { + //直接传给解码器, 先根据ssrc找到source. 后续还是会直接传给source + session.Input(data) + } else { + session.source.(*PassiveSource).PublishSource.Input(data) + } + + if session.source != nil { + return session.receiveBuffer.GetBlock() + } + return nil +} + +func (T *TCPServer) OnDisConnected(conn net.Conn, err error) { + log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String()) + + con := conn.(*transport.Conn) + if con.Data != nil && con.Data.(*TCPSession).source != nil { + con.Data.(*TCPSession).source.Close() + } + + con.Data = nil } func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) { @@ -33,81 +67,3 @@ func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) { server.tcp = tcp return server, nil } - -func (T *TCPServer) OnConnected(conn net.Conn) []byte { - log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String()) - - con := conn.(*transport.Conn) - session := &TCPSession{} - if stream.AppConfig.GB28181.IsMultiPort() { - session.source = T.filter.(*singleFilter).source - session.source.SetConn(con) - session.receiveBuffer = stream.NewTCPReceiveBuffer() - } - - session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) { - packet := rtp.Packet{} - err := packet.Unmarshal(bytes) - if err != nil { - log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String()) - return - } - - //单端口模式,ssrc匹配source - if session.source == nil { - //匹配不到直接关闭链接 - source := T.filter.FindSource(packet.SSRC) - if source == nil { - conn.Close() - return - } - - session.source = source - session.receiveBuffer = stream.NewTCPReceiveBuffer() - session.source.SetConn(con) - - //直接丢给ps解析器, 虽然是非线程安全, 但是是阻塞执行的, 不会和后续走loop event的包冲突 - session.source.InputRtp(&packet) - } - - if stream.SessionStateHandshakeDone == session.source.State() { - session.source.PreparePublishSource(conn, packet.SSRC, session.source) - } - - session.source.InputRtp(&packet) - }) - - con.Data = session - - if session.source != nil { - return session.receiveBuffer.GetBlock() - } - - return nil -} - -func (T *TCPServer) OnPacket(conn net.Conn, data []byte) []byte { - session := conn.(*transport.Conn).Data.(*TCPSession) - - //单端口收流 - if session.source == nil { - //直接传给解码器, 先根据ssrc找到source. 后续还是会直接传给source - if err := session.decoder.Input(data); err != nil { - conn.Close() - } - } else { - session.source.Input(data) - } - - return session.receiveBuffer.GetBlock() -} - -func (T *TCPServer) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String()) - - con := conn.(*transport.Conn) - if con.Data != nil && con.Data.(*TCPSession).source != nil { - con.Data.(*TCPSession).source.Close() - } - con.Data = nil -} diff --git a/gb28181/tcp_session.go b/gb28181/tcp_session.go new file mode 100644 index 0000000..9169880 --- /dev/null +++ b/gb28181/tcp_session.go @@ -0,0 +1,77 @@ +package gb28181 + +import ( + "encoding/hex" + "github.com/pion/rtp" + "github.com/yangjiechina/avformat/transport" + "github.com/yangjiechina/lkm/log" + "github.com/yangjiechina/lkm/stream" + "net" +) + +type TCPSession struct { + conn net.Conn + source GBSource + decoder *transport.LengthFieldFrameDecoder + receiveBuffer *stream.ReceiveBuffer +} + +// Input 处理source收到的流 +func (t *TCPSession) Input(data []byte) error { + if err := t.decoder.Input(data); err != nil { + t.conn.Close() + } + + return nil +} + +func (t *TCPSession) Init(source GBSource) { + t.source = source + t.source.SetConn(t.conn) + //重新设置收流回调 + t.source.SetInputCb(t.Input) + t.receiveBuffer = stream.NewTCPReceiveBuffer() +} + +func NewTCPSession(conn net.Conn, filter Filter) *TCPSession { + session := &TCPSession{ + conn: conn, + } + + if stream.AppConfig.GB28181.IsMultiPort() { + session.Init(filter.(*singleFilter).source) + } + + session.decoder = transport.NewLengthFieldFrameDecoder(0xFFFF, 2, func(bytes []byte) { + packet := rtp.Packet{} + err := packet.Unmarshal(bytes) + if err != nil { + log.Sugar.Errorf("解析rtp失败 err:%s conn:%s data:%s", err.Error(), conn.RemoteAddr().String(), hex.EncodeToString(bytes)) + + conn.Close() + return + } + + //单端口模式,ssrc匹配source + if session.source == nil { + //匹配不到直接关闭链接 + source := filter.FindSource(packet.SSRC) + if source == nil { + log.Sugar.Errorf("gb28181推流失败 ssrc:%x配置不到source conn:%s data:%s", packet.SSRC, session.conn.RemoteAddr().String(), hex.EncodeToString(bytes)) + + conn.Close() + return + } + + session.Init(source) + } + + if stream.SessionStateHandshakeDone == session.source.State() { + session.source.PreparePublish(conn, packet.SSRC, session.source) + } + + session.source.InputRtp(&packet) + }) + + return session +} diff --git a/gb28181/udp_server.go b/gb28181/udp_server.go index 7d9b203..4bf8cc9 100644 --- a/gb28181/udp_server.go +++ b/gb28181/udp_server.go @@ -13,6 +13,36 @@ type UDPServer struct { filter Filter } +func (U UDPServer) OnConnected(conn net.Conn) []byte { + return nil +} + +func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte { + packet := rtp.Packet{} + err := packet.Unmarshal(data) + if err != nil { + log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String()) + return nil + } + + source := U.filter.FindSource(packet.SSRC) + if source == nil { + log.Sugar.Errorf("ssrc匹配source失败 ssrc:%x conn:%s", packet.SSRC, conn.RemoteAddr().String()) + return nil + } + + if stream.SessionStateHandshakeDone == source.State() { + source.PreparePublish(conn, packet.SSRC, source) + } + + source.InputRtp(&packet) + return nil +} + +func (U UDPServer) OnDisConnected(conn net.Conn, err error) { + +} + func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) { server := &UDPServer{ filter: filter, @@ -26,34 +56,3 @@ func NewUDPServer(addr net.Addr, filter Filter) (*UDPServer, error) { server.udp = udp return server, nil } - -func (U UDPServer) OnConnected(conn net.Conn) []byte { - return nil -} - -func (U UDPServer) OnPacket(conn net.Conn, data []byte) []byte { - packet := rtp.Packet{} - err := packet.Unmarshal(data) - - if err != nil { - log.Sugar.Errorf("解析rtp失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String()) - return nil - } - - source := U.filter.FindSource(packet.SSRC) - if source == nil { - log.Sugar.Errorf("ssrc匹配source失败 ssrc:%x conn:%s", packet.SSRC, conn.RemoteAddr().String()) - return nil - } - - if stream.SessionStateHandshakeDone == source.State() { - source.PreparePublishSource(conn, packet.SSRC, source) - } - - source.InputRtp(&packet) - return nil -} - -func (U UDPServer) OnDisConnected(conn net.Conn, err error) { - -} diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index f545d5b..79a7b41 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -50,8 +50,6 @@ func (s *server) OnConnected(conn net.Conn) []byte { } func (s *server) OnPacket(conn net.Conn, data []byte) []byte { - log.Sugar.Infof("rtmp包大小:%d", len(data)) - t := conn.(*transport.Conn) session := t.Data.(*Session) err := session.Input(conn, data) diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index d17986c..6048d5d 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -38,7 +38,7 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState) //设置推流的音视频回调 s.stack.SetOnPublishHandler(source) - //初始化放在add source前面, 以防add-init空窗期, 拉流队列空指针. + //初始化放在add source前面, 以防add后再init,空窗期拉流队列空指针. source.Init(source.Input, source.Close, stream.ReceiveBufferTCPBlockCount) //推流事件Source统一处理, 是否已经存在, Hook回调.... diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index cb026d1..0d048a2 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -66,7 +66,7 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro if err == nil { //创建rtp udp server - sender.Rtp = &transport.UDPTransport{} + sender.Rtp = &transport.UDPServer{} sender.Rtp.SetHandler2(nil, sender.OnRTPPacket, nil) err = sender.Rtp.Bind(addr) } @@ -80,7 +80,7 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro if err == nil { //创建rtcp udp server - sender.Rtcp = &transport.UDPTransport{} + sender.Rtcp = &transport.UDPServer{} sender.Rtcp.SetHandler2(nil, sender.OnRTCPPacket, nil) err = sender.Rtcp.Bind(addr) } else { diff --git a/stream/receive_buffer.go b/stream/receive_buffer.go index 101dcb6..4955f06 100644 --- a/stream/receive_buffer.go +++ b/stream/receive_buffer.go @@ -1,9 +1,9 @@ package stream const ( - ReceiveBufferUdpBlockCount = 200 + ReceiveBufferUdpBlockCount = 300 - ReceiveBufferTCPBlockCount = 100 + ReceiveBufferTCPBlockCount = 50 ) // ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率. @@ -19,7 +19,7 @@ type ReceiveBuffer struct { func (r *ReceiveBuffer) GetBlock() []byte { bytes := r.data[r.index*r.blockSize:] - r.index = r.index + 1%r.blockCount + r.index = (r.index + 1) % r.blockCount return bytes[:r.blockSize] } diff --git a/stream/source.go b/stream/source.go index f987321..469405c 100644 --- a/stream/source.go +++ b/stream/source.go @@ -3,7 +3,6 @@ package stream import ( "fmt" "github.com/yangjiechina/lkm/log" - "math" "net" "time" @@ -56,6 +55,8 @@ type Source interface { // Id Source的唯一ID/** Id() string + SetId(id string) + // Input 输入推流数据 //@Return bool fatal error.释放Source Input(data []byte) error @@ -173,6 +174,10 @@ func (s *PublishSource) Id() string { return s.Id_ } +func (s *PublishSource) SetId(id string) { + s.Id_ = id +} + func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), receiveQueueSize int) { s.inputCB = inputCB s.closeCB = closeCB @@ -234,7 +239,7 @@ func (s *PublishSource) LoopEvent() { for { select { case data := <-s.inputDataEvent: - if !s.closed { + if s.closed { break } @@ -242,10 +247,6 @@ func (s *PublishSource) LoopEvent() { s.lastPacketTime = time.Now() } - if s.state == SessionStateHandshakeDone { - s.state = SessionStateTransferring - } - if err := s.inputCB(data); err != nil { log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error()) s.Close() @@ -622,7 +623,8 @@ func (s *PublishSource) StartReceiveDataTimer() { } } - s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis)))) + //对精度没要求 + s.receiveDataTimer.Reset(time.Duration(AppConfig.ReceiveTimeout)) }) } @@ -643,7 +645,7 @@ func (s *PublishSource) StartIdleTimer() { } } - s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis))))) + s.idleTimer.Reset(time.Duration(AppConfig.IdleTimeout)) }) }