feat: sync.pool管理rtsp流和国标级联流

This commit is contained in:
yangjiechina
2025-04-19 22:45:11 +08:00
parent 3fe77446e0
commit 1e951cdfcd
8 changed files with 117 additions and 123 deletions

View File

@@ -10,11 +10,6 @@ import (
"net"
)
const (
TcpStreamForwardBufferBlockSize = 1024
RTPOverTCPPacketSize = 1600
)
type ForwardSink struct {
stream.BaseSink
setup SetupType
@@ -26,7 +21,7 @@ func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
log.Sugar.Infof("级联连接 conn: %s", conn.RemoteAddr())
f.Conn = conn
f.Conn.(*transport.Conn).EnableAsyncWriteMode(TcpStreamForwardBufferBlockSize - 2)
f.Conn.(*transport.Conn).EnableAsyncWriteMode(512)
return nil
}
@@ -41,24 +36,18 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
}
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error {
// TCP等待连接后再转发数据
if SetupUDP != f.setup && f.Conn == nil {
return nil
}
if len(data)+2 > RTPOverTCPPacketSize {
log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(data), RTPOverTCPPacketSize)
return nil
}
// 修改为与上级协商的SSRC
rtp.ModifySSRC(data[0].Get(), f.ssrc)
if SetupUDP == f.setup {
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
} else {
if _, err := f.Conn.Write(data[0].Get()); err != nil {
return err
}
return f.BaseSink.Write(index, data, ts)
}
return nil

View File

@@ -2,34 +2,57 @@ package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
)
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
type ForwardStream struct {
stream.BaseTransStream
buffer *stream.ReceiveBuffer
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
}
func (f *ForwardStream) WriteHeader() error {
return nil
}
func (f *ForwardStream) WrapData(data []byte) []byte {
block := f.buffer.GetBlock()
copy(block[2:], data)
binary.BigEndian.PutUint16(block, uint16(len(data)))
return block
}
func (f *ForwardStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
size := 2 + uint16(len(packet.Data))
if size > stream.UDPReceiveBufferSize {
log.Sugar.Errorf("国标级联转发流失败 rtp包过长, 长度:%d, 最大允许:%d", len(packet.Data), stream.UDPReceiveBufferSize)
return nil, 0, false, nil
}
func (f *ForwardStream) Capacity() int {
return f.buffer.BlockCount()
// 释放rtp包
for f.rtpBuffers.Size() > 0 {
rtp := f.rtpBuffers.Peek(0)
if rtp.UseCount() > 1 {
break
}
f.rtpBuffers.Pop()
// 放回池中
data := rtp.Get()
stream.UDPReceiveBufferPool.Put(data[:cap(data)])
}
bytes := stream.UDPReceiveBufferPool.Get().([]byte)
binary.BigEndian.PutUint16(bytes, size-2)
copy(bytes[2:], packet.Data)
rtp := collections.NewReferenceCounter(bytes[:size])
f.rtpBuffers.Push(rtp)
// 每帧都当关键帧, 直接发给上级
return []*collections.ReferenceCounter[[]byte]{rtp}, -1, true, nil
}
func NewTransStream() (stream.TransStream, error) {
return &ForwardStream{
BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward},
buffer: stream.NewReceiveBuffer(1500, 512),
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](1024),
}, nil
}

View File

@@ -3,7 +3,6 @@ package gb28181
import (
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
@@ -77,15 +76,9 @@ func (source *BaseGBSource) Init(receiveQueueSize int) {
// Input 输入rtp包, 处理PS流, 负责解析->封装->推流
func (source *BaseGBSource) Input(data []byte) error {
// 国标级联转发
for _, transStream := range source.TransStreams {
if transStream.GetProtocol() != stream.TransStreamGBStreamForward {
continue
}
bytes := transStream.(*ForwardStream).WrapData(data)
rtpPacket := [1]*collections.ReferenceCounter[[]byte]{collections.NewReferenceCounter(bytes)}
source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true)
if source.ForwardTransStream != nil {
packet := avformat.AVPacket{Data: data}
source.DispatchPacket(source.ForwardTransStream, &packet)
}
packet := rtp.Packet{}

View File

@@ -54,6 +54,7 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
if tcp {
s.TCPStreaming = true
s.BaseSink.EnableAsyncWriteMode(512)
} else {
sender.Rtp, err = TransportManger.NewUDPServer()
if err != nil {
@@ -86,14 +87,17 @@ func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rt
return nil
}
for _, bytes := range data {
for i, bytes := range data {
sender := s.senders[index]
sender.PktCount++
sender.OctetCount += len(bytes.Get())
if s.TCPStreaming {
s.Conn.Write(bytes.Get())
// 一次发送会花屏?
// return s.BaseSink.Write(index, data, rtpTime)
s.BaseSink.Write(index, data[i:i+1], rtpTime)
//s.Conn.Write(bytes.Get())
} else {
//发送rtcp sr包
// 发送rtcp sr包
sender.RtpConn.Write(bytes.Get()[OverTcpHeaderSize:])
if sender.RtcpConn == nil || sender.PktCount%100 != 0 {

View File

@@ -31,7 +31,8 @@ type TransStream struct {
//oldTracks []*Track
oldTracks map[byte]uint16
sdp string
buffer *stream.ReceiveBuffer // 保存封装后的rtp包
rtpBuffers *collections.Queue[*collections.ReferenceCounter[[]byte]]
}
func (t *TransStream) OverTCP(data []byte, channel int) {
@@ -41,26 +42,37 @@ func (t *TransStream) OverTCP(data []byte, channel int) {
}
func (t *TransStream) Input(packet *avformat.AVPacket) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
t.ClearOutStreamBuffer()
// 释放rtp包
for t.rtpBuffers.Size() > 0 {
rtp := t.rtpBuffers.Peek(0)
if rtp.UseCount() > 1 {
break
}
t.rtpBuffers.Pop()
// 放回池中
data := rtp.Get()
stream.UDPReceiveBufferPool.Put(data[:cap(data)])
}
var ts uint32
var result []*collections.ReferenceCounter[[]byte]
track := t.RtspTracks[packet.Index]
if utils.AVMediaTypeAudio == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
t.PackRtpPayload(track, packet.Index, packet.Data, ts)
result = t.PackRtpPayload(track, packet.Index, packet.Data, ts)
} else if utils.AVMediaTypeVideo == packet.MediaType {
ts = uint32(packet.ConvertPts(track.Rate))
annexBData := avformat.AVCCPacket2AnnexB(t.BaseTransStream.Tracks[packet.Index].Stream, packet)
data := avc.RemoveStartCode(annexBData)
t.PackRtpPayload(track, packet.Index, data, ts)
result = t.PackRtpPayload(track, packet.Index, data, ts)
}
return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
return result, int64(ts), utils.AVMediaTypeVideo == packet.MediaType && packet.Key, nil
}
func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[]byte], int64, error) {
t.ClearOutStreamBuffer()
// 返回视频编码数据的rtp包
for _, track := range t.RtspTracks {
if utils.AVMediaTypeVideo != track.MediaType {
@@ -69,37 +81,39 @@ func (t *TransStream) ReadExtraData(ts int64) ([]*collections.ReferenceCounter[[
// 回滚序号和时间戳
index := int(track.StartSeq) - len(track.ExtraDataBuffer)
for i, bytes := range track.ExtraDataBuffer {
rtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts))
for i, packet := range track.ExtraDataBuffer {
rtp.RollbackSeq(packet.Get()[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(packet.Get()[OverTcpHeaderSize+4:], uint32(ts))
}
for _, data := range track.ExtraDataBuffer {
t.AppendOutStreamBuffer(collections.NewReferenceCounter(data))
}
return t.OutBuffer[:t.OutBufferSize], ts, nil
// 目前只有视频需要发送扩展数据的rtp包, 所以直接返回
return track.ExtraDataBuffer, ts, nil
}
return nil, ts, nil
}
func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) {
var index int
// PackRtpPayload 打包返回rtp over tcp的数据包
func (t *TransStream) PackRtpPayload(track *Track, channel int, data []byte, timestamp uint32) []*collections.ReferenceCounter[[]byte] {
var result []*collections.ReferenceCounter[[]byte]
var packet []byte
// 保存开始序号
track.StartSeq = track.Muxer.GetHeader().Seq
track.Muxer.Input(data, timestamp, func() []byte {
index = t.buffer.Index()
block := t.buffer.GetBlock()
return block[OverTcpHeaderSize:]
packet = stream.UDPReceiveBufferPool.Get().([]byte)
return packet[OverTcpHeaderSize:]
}, func(bytes []byte) {
track.EndSeq = track.Muxer.GetHeader().Seq
overTCPPacket := packet[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(overTCPPacket, channel)
packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(packet, channel)
t.AppendOutStreamBuffer(collections.NewReferenceCounter(packet))
refPacket := collections.NewReferenceCounter(overTCPPacket)
result = append(result, refPacket)
t.rtpBuffers.Push(refPacket)
})
return result
}
func (t *TransStream) AddTrack(track *stream.Track) error {
@@ -133,33 +147,32 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
rtspTrack := NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, track.Stream.MediaType)
t.RtspTracks = append(t.RtspTracks, rtspTrack)
index := len(t.RtspTracks) - 1
trackIndex := len(t.RtspTracks) - 1
// 将sps和pps按照单一模式打包
bufferIndex := t.buffer.Index()
var extraDataPackets []*collections.ReferenceCounter[[]byte]
packAndAdd := func(data []byte) {
packets := t.PackRtpPayload(rtspTrack, trackIndex, data, 0)
for _, packet := range packets {
extraDataPackets = append(extraDataPackets, packet)
// 出队列, 单独保存
t.rtpBuffers.Pop()
}
}
if utils.AVMediaTypeVideo == track.Stream.MediaType {
parameters := track.Stream.CodecParameters
if utils.AVCodecIdH265 == track.Stream.CodecID {
bytes := parameters.(*avformat.HEVCCodecData).VPS()
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(bytes[0]), 0)
packAndAdd(avc.RemoveStartCode(bytes[0]))
}
spsBytes := parameters.SPS()
ppsBytes := parameters.PPS()
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(spsBytes[0]), 0)
t.PackRtpPayload(rtspTrack, index, avc.RemoveStartCode(ppsBytes[0]), 0)
packAndAdd(avc.RemoveStartCode(spsBytes[0]))
packAndAdd(avc.RemoveStartCode(ppsBytes[0]))
// 拷贝扩展数据的rtp包
size := t.buffer.Index() - bufferIndex
extraRtpBuffer := make([][]byte, size)
for i := 0; i < size; i++ {
src := t.buffer.Get(bufferIndex + i)
dst := make([]byte, len(src))
copy(dst, src)
extraRtpBuffer[i] = dst[:OverTcpHeaderSize+binary.BigEndian.Uint16(dst[2:])]
}
t.RtspTracks[index].ExtraDataBuffer = extraRtpBuffer
t.RtspTracks[trackIndex].ExtraDataBuffer = extraDataPackets
}
return nil
@@ -261,11 +274,10 @@ func (t *TransStream) WriteHeader() error {
func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16) stream.TransStream {
t := &TransStream{
addr: addr,
urlFormat: urlFormat,
// 在将AVPacket打包rtp时, 会使用多个buffer块, 回环覆盖多个rtp块, 如果是TCP拉流并且网络不好, 推流的数据会错乱.
buffer: stream.NewReceiveBuffer(1500, 1024),
oldTracks: oldTracks,
addr: addr,
urlFormat: urlFormat,
oldTracks: oldTracks,
rtpBuffers: collections.NewQueue[*collections.ReferenceCounter[[]byte]](512),
}
if addr.IP.To4() != nil {

View File

@@ -1,12 +1,12 @@
package rtsp
import (
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/rtp"
)
// Track RtspTrack 对rtsp每路输出流的封装
// Track rtsp每路输出流的封装
type Track struct {
PT byte
Rate int
@@ -15,7 +15,7 @@ type Track struct {
EndSeq uint16
Muxer rtp.Muxer
ExtraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
ExtraDataBuffer []*collections.ReferenceCounter[[]byte] // 缓存带有编码信息的rtp包, 对所有sink通用
}
func (r *Track) Close() {

View File

@@ -24,36 +24,3 @@ var (
},
}
)
// ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率.
// 从网络IO读取数据->送给解复用器, 此过程需做到无内存拷贝
// rtmp和1078推流直接使用ReceiveBuffer
// 国标推流,UDP收流都要经过jitter buffer处理, 还是需要拷贝一次, 没必要使用ReceiveBuffer. TCP全都使用ReceiveBuffer, 区别在于多端口模式, 第一包传给source, 单端口模式先解析出ssrc, 找到source. 后续再传给source.
type ReceiveBuffer struct {
blockCapacity int // 单个内存块的容量
blockCount int // 内存块数量
data []byte // 由一块大内存分割成多个块使用
index int // 使用到第几块的索引
}
func (r *ReceiveBuffer) Index() int {
return r.index
}
func (r *ReceiveBuffer) Get(index int) []byte {
return r.data[index*r.blockCapacity : (index+1)*r.blockCapacity]
}
func (r *ReceiveBuffer) GetBlock() []byte {
bytes := r.data[r.index*r.blockCapacity:]
r.index = (r.index + 1) % r.blockCount
return bytes[:r.blockCapacity]
}
func (r *ReceiveBuffer) BlockCount() int {
return r.blockCount
}
func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer {
return &ReceiveBuffer{blockCapacity: blockSize, blockCount: blockCount, data: make([]byte, blockSize*blockCount), index: 0}
}

View File

@@ -135,6 +135,7 @@ type PublishSource struct {
existVideo bool // 是否存在视频
TransStreams map[TransStreamID]TransStream // 所有输出流
ForwardTransStream TransStream // 转发流
sinks map[SinkID]Sink // 保存所有Sink
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
streamEndInfo *StreamEndInfo // 之前推流源信息
@@ -286,6 +287,11 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
s.TransStreamSinks[id] = make(map[SinkID]Sink, 128)
_ = transStream.WriteHeader()
// 设置转发流
if TransStreamGBStreamForward == transStream.GetProtocol() {
s.ForwardTransStream = transStream
}
return transStream, err
}
@@ -428,6 +434,12 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool {
}
}
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID)
return false
}
// 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流
if !sink.IsReady() {
return true
@@ -439,12 +451,6 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool {
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID)
return false
}
s.sinks[sink.GetID()] = sink
s.TransStreamSinks[transStreamId][sink.GetID()] = sink