diff --git a/gb28181/forward_sink.go b/gb28181/forward_sink.go index 2cb3ccc..86f11a7 100644 --- a/gb28181/forward_sink.go +++ b/gb28181/forward_sink.go @@ -10,11 +10,6 @@ import ( "net" ) -const ( - TcpStreamForwardBufferBlockSize = 1024 - RTPOverTCPPacketSize = 1600 -) - type ForwardSink struct { stream.BaseSink setup SetupType @@ -26,7 +21,7 @@ func (f *ForwardSink) OnConnected(conn net.Conn) []byte { log.Sugar.Infof("级联连接 conn: %s", conn.RemoteAddr()) f.Conn = conn - f.Conn.(*transport.Conn).EnableAsyncWriteMode(TcpStreamForwardBufferBlockSize - 2) + f.Conn.(*transport.Conn).EnableAsyncWriteMode(512) return nil } @@ -41,24 +36,18 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) { } func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { + // TCP等待连接后再转发数据 if SetupUDP != f.setup && f.Conn == nil { return nil } - if len(data)+2 > RTPOverTCPPacketSize { - log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(data), RTPOverTCPPacketSize) - return nil - } - // 修改为与上级协商的SSRC rtp.ModifySSRC(data[0].Get(), f.ssrc) if SetupUDP == f.setup { f.socket.(*transport.UDPClient).Write(data[0].Get()[2:]) } else { - if _, err := f.Conn.Write(data[0].Get()); err != nil { - return err - } + return f.BaseSink.Write(index, data, ts) } return nil diff --git a/gb28181/forward_stream.go b/gb28181/forward_stream.go index 7b850b0..87e1768 100644 --- a/gb28181/forward_stream.go +++ b/gb28181/forward_stream.go @@ -2,34 +2,57 @@ package gb28181 import ( "encoding/binary" + "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" + "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" ) // ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么. type ForwardStream struct { stream.BaseTransStream - buffer *stream.ReceiveBuffer + rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]] } func (f *ForwardStream) WriteHeader() error { return nil } -func (f *ForwardStream) WrapData(data []byte) []byte { - block := f.buffer.GetBlock() - copy(block[2:], data) - binary.BigEndian.PutUint16(block, uint16(len(data))) - return block -} +func (f *ForwardStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { + size := 2 + uint16(len(packet.Data)) + if size > stream.UDPReceiveBufferSize { + log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(packet.Data), stream.UDPReceiveBufferSize) + return nil, 0, false, nil + } -func (f *ForwardStream) Capacity() int { - return f.buffer.BlockCount() + // 释放rtp包 + for f.rtpBuffers.Size() > 0 { + rtp := f.rtpBuffers.Peek(0) + if rtp.UseCount() > 1 { + break + } + + f.rtpBuffers.Pop() + + // 放回池中 + data := rtp.Get() + stream.UDPReceiveBufferPool.Put(data[:cap(data)]) + } + + bytes := stream.UDPReceiveBufferPool.Get().([]byte) + binary.BigEndian.PutUint16(bytes, size-2) + copy(bytes[2:], packet.Data) + + rtp := collections.NewReferenceCounter(bytes[:size]) + f.rtpBuffers.Push(rtp) + // 每帧都当关键帧, 直接发给上级 + return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil } func NewTransStream() (stream.TransStream, error) { return &ForwardStream{ BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward}, - buffer: stream.NewReceiveBuffer(1500, 512), + rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](1024), }, nil } diff --git a/gb28181/source.go b/gb28181/source.go index c533653..911543e 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -3,7 +3,6 @@ package gb28181 import ( "fmt" "github.com/lkmio/avformat" - "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/lkm/log" "github.com/lkmio/lkm/stream" @@ -77,15 +76,9 @@ func (source *BaseGBSource) Init(receiveQueueSize int) { // Input 输入rtp包, 处理PS流, 负责解析->封装->推流 func (source *BaseGBSource) Input(data []byte) error { // 国标级联转发 - for _, transStream := range source.TransStreams { - if transStream.GetProtocol() != stream.TransStreamGBStreamForward { - continue - } - - bytes := transStream.(*ForwardStream).WrapData(data) - rtpPacket := [1]*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(bytes)} - - source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true) + if source.ForwardTransStream != nil { + packet := avformat.AVPacket{Data: data} + source.DispatchPacket(source.ForwardTransStream, &packet) } packet := rtp.Packet{} diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index 1699dbf..815ec6d 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -54,6 +54,7 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro if tcp { s.TCPStreaming = true + s.BaseSink.EnableAsyncWriteMode(512) } else { sender.Rtp, err = TransportManger.NewUDPServer() if err != nil { @@ -86,14 +87,17 @@ func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rt return nil } - for _, bytes := range data { + for i, bytes := range data { sender := s.senders[index] sender.PktCount++ sender.OctetCount += len(bytes.Get()) if s.TCPStreaming { - s.Conn.Write(bytes.Get()) + // 一次发送会花屏? + // return s.BaseSink.Write(index, data, rtpTime) + s.BaseSink.Write(index, data[i:i+1], rtpTime) + //s.Conn.Write(bytes.Get()) } else { - //发送rtcp sr包 + // 发送rtcp sr包 sender.RtpConn.Write(bytes.Get()[OverTcpHeaderSize:]) if sender.RtcpConn == nil || sender.PktCount%100 != 0 { diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index affd86e..8b57822 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -31,7 +31,8 @@ type TransStream struct { //oldTracks []*Track oldTracks map[byte]uint16 sdp string - buffer *stream.ReceiveBuffer // 保存封装后的rtp包 + + rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]] } func (t *TransStream) OverTCP(data []byte, channel int) { @@ -41,26 +42,37 @@ func (t *TransStream) OverTCP(data []byte, channel int) { } func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) { - t.ClearOutStreamBuffer() + // 释放rtp包 + for t.rtpBuffers.Size() > 0 { + rtp := t.rtpBuffers.Peek(0) + if rtp.UseCount() > 1 { + break + } + + t.rtpBuffers.Pop() + + // 放回池中 + data := rtp.Get() + stream.UDPReceiveBufferPool.Put(data[:cap(data)]) + } var ts uint32 + var result []*collections.ReferenceCounter[[]byte] track := t.RtspTracks[packet.Index] if utils.AVMediaTypeAudio == packet.MediaType { ts = uint32(packet.ConvertPts(track.Rate)) - t.PackRtpPayload(track, packet.Index, packet.Data, ts) + result = t.PackRtpPayload(track, packet.Index, packet.Data, ts) } else if utils.AVMediaTypeVideo == packet.MediaType { ts = uint32(packet.ConvertPts(track.Rate)) annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet) data := avc.RemoveStartCode(annexBData) - t.PackRtpPayload(track, packet.Index, data, ts) + result = t.PackRtpPayload(track, packet.Index, data, ts) } - return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil + return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil } func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[]byte], int64, error) { - t.ClearOutStreamBuffer() - // 返回视频编码数据的rtp包 for _, track := range t.RtspTracks { if utils.AVMediaTypeVideo != track.MediaType { @@ -69,37 +81,39 @@ func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[ // 回滚序号和时间戳 index := int(track.StartSeq) - len(track.ExtraDataBuffer) - for i, bytes := range track.ExtraDataBuffer { - rtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1) - binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts)) + for i, packet := range track.ExtraDataBuffer { + rtp.RollbackSeq(packet.Get()[OverTcpHeaderSize:], index+i+1) + binary.BigEndian.PutUint32(packet.Get()[OverTcpHeaderSize+4:], uint32(ts)) } - for _, data := range track.ExtraDataBuffer { - t.AppendOutStreamBuffer(collections.NewReferenceCounter(data)) - } - - return t.OutBuffer[:t.OutBufferSize], ts, nil + // 目前只有视频需要发送扩展数据的rtp包, 所以直接返回 + return track.ExtraDataBuffer, ts, nil } return nil, ts, nil } -func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) { - var index int +// PackRtpPayload 打包返回rtp over tcp的数据包 +func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) []*collections.ReferenceCounter[[]byte] { + var result []*collections.ReferenceCounter[[]byte] + var packet []byte // 保存开始序号 track.StartSeq = track.Muxer.GetHeader().Seq track.Muxer.Input(data, timestamp, func() []byte { - index = t.buffer.Index() - block := t.buffer.GetBlock() - return block[OverTcpHeaderSize:] + packet = stream.UDPReceiveBufferPool.Get().([]byte) + return packet[OverTcpHeaderSize:] }, func(bytes []byte) { track.EndSeq = track.Muxer.GetHeader().Seq + overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)] + t.OverTCP(overTCPPacket, channel) - packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)] - t.OverTCP(packet, channel) - t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet)) + refPacket := collections.NewReferenceCounter(overTCPPacket) + result = append(result, refPacket) + t.rtpBuffers.Push(refPacket) }) + + return result } func (t *TransStream) AddTrack(track *stream.Track) error { @@ -133,33 +147,32 @@ func (t *TransStream) AddTrack(track *stream.Track) error { rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType) t.RtspTracks = append(t.RtspTracks, rtspTrack) - index := len(t.RtspTracks) - 1 + trackIndex := len(t.RtspTracks) - 1 // 将sps和pps按照单一模式打包 - bufferIndex := t.buffer.Index() + var extraDataPackets []*collections.ReferenceCounter[[]byte] + packAndAdd := func(data []byte) { + packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0) + for _, packet := range packets { + extraDataPackets = append(extraDataPackets, packet) + // 出队列, 单独保存 + t.rtpBuffers.Pop() + } + } + if utils.AVMediaTypeVideo == track.Stream.MediaType { parameters := track.Stream.CodecParameters if utils.AVCodecIdH265 == track.Stream.CodecID { bytes := parameters.(*avformat.HEVCCodecData).VPS() - t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(bytes[0]), 0) + packAndAdd(avc.RemoveStartCode(bytes[0])) } spsBytes := parameters.SPS() ppsBytes := parameters.PPS() - t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(spsBytes[0]), 0) - t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(ppsBytes[0]), 0) + packAndAdd(avc.RemoveStartCode(spsBytes[0])) + packAndAdd(avc.RemoveStartCode(ppsBytes[0])) - // 拷贝扩展数据的rtp包 - size := t.buffer.Index() - bufferIndex - extraRtpBuffer := make([][]byte, size) - for i := 0; i < size; i++ { - src := t.buffer.Get(bufferIndex + i) - dst := make([]byte, len(src)) - copy(dst, src) - extraRtpBuffer[i] = dst[:OverTcpHeaderSize+binary.BigEndian.Uint16(dst[2:])] - } - - t.RtspTracks[index].ExtraDataBuffer = extraRtpBuffer + t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets } return nil @@ -261,11 +274,10 @@ func (t *TransStream) WriteHeader() error { func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream { t := &TransStream{ - addr: addr, - urlFormat: urlFormat, - // 在将AVPacket打包rtp时, 会使用多个buffer块, 回环覆盖多个rtp块, 如果是TCP拉流并且网络不好, 推流的数据会错乱. - buffer: stream.NewReceiveBuffer(1500, 1024), - oldTracks: oldTracks, + addr: addr, + urlFormat: urlFormat, + oldTracks: oldTracks, + rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](512), } if addr.IP.To4() != nil { diff --git a/rtsp/rtsp_track.go b/rtsp/rtsp_track.go index 70b29f7..fa73ad2 100644 --- a/rtsp/rtsp_track.go +++ b/rtsp/rtsp_track.go @@ -1,12 +1,12 @@ package rtsp - import ( + "github.com/lkmio/avformat/collections" "github.com/lkmio/avformat/utils" "github.com/lkmio/rtp" ) -// Track RtspTrack 对rtsp每路输出流的封装 +// Track rtsp每路输出流的封装 type Track struct { PT byte Rate int @@ -15,7 +15,7 @@ type Track struct { EndSeq uint16 Muxer rtp.Muxer - ExtraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用 + ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用 } func (r *Track) Close() { diff --git a/stream/receive_buffer.go b/stream/receive_buffer.go index 9465d98..84e582a 100644 --- a/stream/receive_buffer.go +++ b/stream/receive_buffer.go @@ -24,36 +24,3 @@ var ( }, } ) - -// ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率. -// 从网络IO读取数据->送给解复用器, 此过程需做到无内存拷贝 -// rtmp和1078推流直接使用ReceiveBuffer -// 国标推流,UDP收流都要经过jitter buffer处理, 还是需要拷贝一次, 没必要使用ReceiveBuffer. TCP全都使用ReceiveBuffer, 区别在于多端口模式, 第一包传给source, 单端口模式先解析出ssrc, 找到source. 后续再传给source. -type ReceiveBuffer struct { - blockCapacity int // 单个内存块的容量 - blockCount int // 内存块数量 - data []byte // 由一块大内存分割成多个块使用 - index int // 使用到第几块的索引 -} - -func (r *ReceiveBuffer) Index() int { - return r.index -} - -func (r *ReceiveBuffer) Get(index int) []byte { - return r.data[index*r.blockCapacity : (index+1)*r.blockCapacity] -} - -func (r *ReceiveBuffer) GetBlock() []byte { - bytes := r.data[r.index*r.blockCapacity:] - r.index = (r.index + 1) % r.blockCount - return bytes[:r.blockCapacity] -} - -func (r *ReceiveBuffer) BlockCount() int { - return r.blockCount -} - -func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer { - return &ReceiveBuffer{blockCapacity: blockSize, blockCount: blockCount, data: make([]byte, blockSize*blockCount), index: 0} -} diff --git a/stream/source.go b/stream/source.go index 3beeddd..bb18d48 100644 --- a/stream/source.go +++ b/stream/source.go @@ -135,6 +135,7 @@ type PublishSource struct { existVideo bool // 是否存在视频 TransStreams map[TransStreamID]TransStream // 所有输出流 + ForwardTransStream TransStream // 转发流 sinks map[SinkID]Sink // 保存所有Sink TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink streamEndInfo *StreamEndInfo // 之前推流源信息 @@ -286,6 +287,11 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream s.TransStreamSinks[id] = make(map[SinkID]Sink, 128) _ = transStream.WriteHeader() + // 设置转发流 + if TransStreamGBStreamForward == transStream.GetProtocol() { + s.ForwardTransStream = transStream + } + return transStream, err } @@ -428,6 +434,12 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { } } + err := sink.StartStreaming(transStream) + if err != nil { + log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID) + return false + } + // 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流 if !sink.IsReady() { return true @@ -439,12 +451,6 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) } - err := sink.StartStreaming(transStream) - if err != nil { - log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID) - return false - } - s.sinks[sink.GetID()] = sink s.TransStreamSinks[transStreamId][sink.GetID()] = sink