重构输出流转发, TransStream不再持有Sink

This commit is contained in:
yangjiechina
2024-10-28 19:15:53 +08:00
parent 9090e28077
commit ec707c8dc1
27 changed files with 894 additions and 747 deletions

View File

@@ -7,5 +7,5 @@ import (
)
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn)}
return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}
}

View File

@@ -9,24 +9,25 @@ import (
)
const (
// HttpFlvBlockHeaderSize 在每块HttpFlv块头部,预留指定大小的数据, 用于描述flv数据块的长度信息
//实际发送流 http-flv: |length\r\n|flv data\r\n
//方便封装 http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
//skip count是因为flv-length不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
// HttpFlvBlockHeaderSize 在每块http-flv流的头部,预留指定大小的数据, 用于描述flv数据块的长度信息
// http-flv是以文件流的形式传输http流, 格式如下: length\r\n|flv data\r\n
// 我们对http-flv-block的封装: |block size[4]|skip count[2]|length\r\n|flv data\r\n
// skip count是因为length长度不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
HttpFlvBlockHeaderSize = 20
)
type httpTransStream struct {
type TransStream struct {
stream.TCPTransStream
muxer libflv.Muxer
mwBuffer stream.MergeWritingBuffer
header []byte
headerSize int
headerTagSize int
}
func (t *httpTransStream) Input(packet utils.AVPacket) error {
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var flvSize int
var data []byte
var videoKey bool
@@ -45,39 +46,42 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
videoKey = packet.KeyFrame()
}
//关键帧都放在切片头部,所以需要创建新切片, 发送当前切片剩余流
if videoKey && !t.mwBuffer.IsNewSegment() {
t.forceFlushSegment()
// 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
if videoKey && !t.MWBuffer.IsNewSegment() {
segment := t.forceFlushSegment()
t.AppendOutStreamBuffer(segment)
}
var n int
var separatorSize int
//新的合并写切片, 预留包长字节
if t.mwBuffer.IsNewSegment() {
// 新的合并写切片, 预留包长字节
if t.MWBuffer.IsNewSegment() {
separatorSize = HttpFlvBlockHeaderSize
//10字节描述flv包长, 前2个字节描述无效字节长度
// 10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockHeaderSize
}
//切片末尾, 预留换行符
if t.mwBuffer.IsFull(dts) {
// 切片末尾, 预留换行符
if t.MWBuffer.IsFull(dts) {
separatorSize += 2
}
//分配flv block
bytes := t.mwBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
// 分配flv block
bytes := t.MWBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
copy(bytes[n:], data)
//合并写满再发
if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
t.sendUnpackedSegment(segment)
// 合并写满再发
if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
// 已经分配末尾换行符内存
t.AppendOutStreamBuffer(t.FormatSegment(segment))
}
return nil
return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}
func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
func (t *TransStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
@@ -93,35 +97,7 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
return nil
}
func (t *httpTransStream) AddSink(sink stream.Sink) error {
utils.Assert(t.headerSize > 0)
t.TCPTransStream.AddSink(sink)
//发送sequence header
t.sendSegment(sink, t.header[:t.headerSize])
//发送当前内存池已有的合并写切片
first := true
t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
if first {
//修改第一个flv tag的pre tag size
binary.BigEndian.PutUint32(bytes[20:], uint32(t.headerTagSize))
first = false
}
//遍历发送合并写切片
var index int
for ; index < len(bytes); index += 4 {
size := binary.BigEndian.Uint32(bytes[index:])
t.sendSegment(sink, bytes[index:index+4+int(size)])
index += int(size)
}
})
return nil
}
func (t *httpTransStream) WriteHeader() error {
func (t *TransStream) WriteHeader() error {
t.headerSize += t.muxer.WriteHeader(t.header[HttpFlvBlockHeaderSize:])
for _, track := range t.BaseTransStream.Tracks {
@@ -140,76 +116,110 @@ func (t *httpTransStream) WriteHeader() error {
t.headerTagSize = n - 15 + len(data) + 11
}
//加上末尾换行符
// 加上末尾换行符
t.headerSize += 2
t.writeSeparator(t.header[:t.headerSize])
t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
func (t *httpTransStream) Close() error {
//发送剩余的流
if !t.mwBuffer.IsNewSegment() {
t.forceFlushSegment()
func (t *TransStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
utils.Assert(t.headerSize > 0)
// 发送sequence header
return [][]byte{t.GetHttpFLVBlock(t.header[:t.headerSize])}, 0, nil
}
func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
t.ClearOutStreamBuffer()
// 发送当前内存池已有的合并写切片
t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
if t.OutBufferSize < 1 {
// 修改第一个flv tag的pre tag size
binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], uint32(t.headerTagSize))
}
// 遍历发送合并写切片
var index int
for ; index < len(bytes); index += 4 {
size := binary.BigEndian.Uint32(bytes[index:])
t.AppendOutStreamBuffer(t.GetHttpFLVBlock(bytes[index : index+4+int(size)]))
index += int(size)
}
})
return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func (t *TransStream) Close() ([][]byte, int64, error) {
t.ClearOutStreamBuffer()
// 发送剩余的流
if !t.MWBuffer.IsNewSegment() {
if segment := t.forceFlushSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
}
return nil
return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func (t *httpTransStream) forceFlushSegment() {
t.mwBuffer.Reserve(2)
segment := t.mwBuffer.FlushSegment()
t.sendUnpackedSegment(segment)
// 保存为完整的http-flv切片
func (t *TransStream) forceFlushSegment() []byte {
// 预览末尾换行符
t.MWBuffer.Reserve(2)
segment := t.MWBuffer.FlushSegment()
return t.FormatSegment(segment)
}
// 为单个sink发送flv切片, 切片已经添加分隔符
func (t *httpTransStream) sendSegment(sink stream.Sink, data []byte) error {
return sink.Input(data[t.computeSkipCount(data):])
// GetHttpFLVBlock 跳过头部的无效数据返回http-flv块
func (t *TransStream) GetHttpFLVBlock(data []byte) []byte {
return data[t.computeSkipCount(data):]
}
// 发送还未添加包长和换行符的切片
func (t *httpTransStream) sendUnpackedSegment(segment []byte) {
// FormatSegment 为切片添加包长和换行符
func (t *TransStream) FormatSegment(segment []byte) []byte {
t.writeSeparator(segment)
skip := t.computeSkipCount(segment)
t.SendPacket(segment[skip:])
return t.GetHttpFLVBlock(segment)
}
func (t *httpTransStream) computeSkipCount(data []byte) int {
func (t *TransStream) computeSkipCount(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
// 为http-flv数据块添加长度和换行符
// @dst http-flv数据块, 头部需要空出HttpFlvBlockLengthSize字节长度, 末尾空出2字节换行符
func (t *httpTransStream) writeSeparator(dst []byte) {
//http-flv: length\r\n|flv data\r\n
//http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
func (t *TransStream) writeSeparator(dst []byte) {
// http-flv: length\r\n|flv data\r\n
// http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
//写block size
// 写block size
binary.BigEndian.PutUint32(dst, uint32(len(dst)-4))
//写flv实际长度字符串, 16进制表达
// 写flv实际长度字符串, 16进制表达
flvSize := len(dst) - HttpFlvBlockHeaderSize - 2
hexStr := fmt.Sprintf("%X", flvSize)
//+2是跳过length后的换行符
// +2是跳过length后的换行符
n := len(hexStr) + 2
copy(dst[HttpFlvBlockHeaderSize-n:], hexStr)
//写跳过字节数量
//-6是block size和skip count字段合计长度
// 写跳过字节数量
// -6是block size和skip count字段合计长度
skipCount := HttpFlvBlockHeaderSize - n - 6
binary.BigEndian.PutUint16(dst[4:], uint16(skipCount))
//flv length字段和flv数据之间的换行符
// flv length字段和flv数据之间的换行符
dst[HttpFlvBlockHeaderSize-2] = 0x0D
dst[HttpFlvBlockHeaderSize-1] = 0x0A
//末尾换行符
// 末尾换行符
dst[len(dst)-2] = 0x0D
dst[len(dst)-1] = 0x0A
}
func NewHttpTransStream() stream.TransStream {
return &httpTransStream{
return &TransStream{
muxer: libflv.NewMuxer(),
header: make([]byte, 1024),
headerSize: HttpFlvBlockHeaderSize,

View File

@@ -1,7 +1,6 @@
package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
@@ -20,7 +19,6 @@ type ForwardSink struct {
setup SetupType
socket transport.ITransport
ssrc uint32
buffer *stream.ReceiveBuffer //发送缓冲区
}
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
@@ -41,7 +39,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
f.Close()
}
func (f *ForwardSink) Input(data []byte) error {
func (f *ForwardSink) Write(index int, data [][]byte, ts int64) error {
if SetupUDP != f.setup && f.Conn == nil {
return nil
}
@@ -52,23 +50,13 @@ func (f *ForwardSink) Input(data []byte) error {
}
// 修改为与上级协商的SSRC
librtp.ModifySSRC(data, f.ssrc)
librtp.ModifySSRC(data[0], f.ssrc)
if SetupUDP == f.setup {
// UDP转发, 不拷贝直接发送
f.socket.(*transport.UDPClient).Write(data)
f.socket.(*transport.UDPClient).Write(data[0][2:])
} else {
// TCP转发, 拷贝一次再发送
block := f.buffer.GetBlock()
copy(block[2:], data)
binary.BigEndian.PutUint16(block, uint16(len(data)))
if _, err := f.Conn.Write(block[:2+len(data)]); err == nil {
return nil
} else if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送缓冲区阻塞")
f.Conn.Close()
f.Conn = nil
if _, err := f.Conn.Write(data[0]); err != nil {
return err
}
}
@@ -107,6 +95,7 @@ func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stre
return nil, 0, err
}
sink.TCPStreaming = true
sink.socket = server
} else if SetupPassive == setup {
client := transport.TCPClient{}
@@ -135,14 +124,11 @@ func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stre
return nil, 0, err
}
sink.TCPStreaming = true
sink.socket = &client
} else {
utils.Assert(false)
}
if SetupUDP != setup {
sink.buffer = stream.NewReceiveBuffer(RTPOverTCPPacketSize, TcpStreamForwardBufferBlockSize)
}
return sink, sink.socket.ListenPort(), nil
}

View File

@@ -1,6 +1,7 @@
package gb28181
import (
"encoding/binary"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
)
@@ -8,14 +9,29 @@ import (
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
type ForwardStream struct {
stream.BaseTransStream
buffer *stream.ReceiveBuffer
}
func (f *ForwardStream) WriteHeader() error {
return nil
}
func (f *ForwardStream) WrapData(data []byte) []byte {
block := f.buffer.GetBlock()
copy(block[2:], data)
binary.BigEndian.PutUint16(block, uint16(len(data)))
return block
}
func (f *ForwardStream) OutStreamBufferCapacity() int {
return f.buffer.BlockCount()
}
func NewTransStream() (stream.TransStream, error) {
return &ForwardStream{BaseTransStream: stream.BaseTransStream{Protocol_: stream.TransStreamGBStreamForward}}, nil
return &ForwardStream{
BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward},
buffer: stream.NewReceiveBuffer(1500, 512),
}, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {

View File

@@ -69,18 +69,21 @@ func (source *BaseGBSource) Init(receiveQueueSize int) {
source.PublishSource.Init(receiveQueueSize)
}
// Input 输入rtp包, 处理PS流, 负责解析->封装->推流. 所有GBSource, 均到此处处理, 在event协程调用此函数
// Input 输入rtp包, 处理PS流, 负责解析->封装->推流
func (source *BaseGBSource) Input(data []byte) error {
// 国标级联转发
for _, transStream := range source.TransStreams {
if transStream.Protocol() != stream.TransStreamGBStreamForward {
if transStream.GetProtocol() != stream.TransStreamGBStreamForward {
continue
}
transStream.(*ForwardStream).SendPacket(data)
bytes := transStream.(*ForwardStream).WrapData(data)
rtpPacket := [1][]byte{bytes}
source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true)
}
packet := rtp.Packet{}
packet.Marshal()
_ = packet.Unmarshal(data)
return source.deMuxerCtx.Input(packet.Payload)
}
@@ -228,13 +231,13 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
func (source *BaseGBSource) Close() {
log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.String())
//释放收流端口
// 释放收流端口
if source.transport != nil {
source.transport.Close()
source.transport = nil
}
//删除ssrc关联
// 删除ssrc关联
if !stream.AppConfig.GB28181.IsMultiPort() {
if SharedTCPServer != nil {
SharedTCPServer.filter.RemoveSource(source.ssrc)

View File

@@ -27,7 +27,19 @@ func (s *M3U8Sink) SendM3U8Data(data *string) error {
return nil
}
func (s *M3U8Sink) Start() {
func (s *M3U8Sink) StartStreaming(transStream stream.TransStream) error {
hls := transStream.(*TransStream)
if hls.m3u8.Size() > 0 {
if err := s.SendM3U8Data(&hls.m3u8StringFormat); err != nil {
return err
}
} else {
// m3u8文件中还没有切片时, 将sink添加到等待队列
hls.m3u8Sinks[s.GetID()] = s
}
// 开启拉流超时计时器, 如果拉流端查时间没有拉流, 关闭sink
timeout := time.Duration(stream.AppConfig.IdleTimeout)
if timeout < time.Second {
timeout = time.Duration(stream.AppConfig.Hls.Duration) * 2 * 3 * time.Second
@@ -43,6 +55,8 @@ func (s *M3U8Sink) Start() {
s.playTimer.Reset(timeout)
})
return nil
}
func (s *M3U8Sink) GetM3U8String() string {
@@ -67,7 +81,7 @@ func (s *M3U8Sink) Close() {
func NewM3U8Sink(id stream.SinkID, sourceId string, cb func(m3u8 []byte), sessionId string) stream.Sink {
return &M3U8Sink{
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamHls},
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamHls, TCPStreaming: true},
cb: cb,
sessionId: sessionId,
}

View File

@@ -16,12 +16,12 @@ type tsContext struct {
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
url string // @See transStream.tsUrl
url string // @See TransStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
type transStream struct {
type TransStream struct {
stream.BaseTransStream
muxer libmpeg.TSMuxer
context *tsContext
@@ -39,9 +39,9 @@ type transStream struct {
m3u8StringFormat string // 一个协程写, 多个协程读, 不用加锁保护
}
func (t *transStream) Input(packet utils.AVPacket) error {
func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
if packet.Index() >= t.muxer.TrackCount() {
return fmt.Errorf("track not available")
return nil, -1, false, fmt.Errorf("track not available")
}
// 创建一下个切片
@@ -51,31 +51,33 @@ func (t *transStream) Input(packet utils.AVPacket) error {
if t.context.file != nil {
err := t.flushSegment(false)
if err != nil {
return err
return nil, -1, false, err
}
}
// 创建新的切片
if err := t.createSegment(); err != nil {
return err
return nil, -1, false, err
}
}
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
} else {
return t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
}
return nil, -1, true, nil
}
func (t *transStream) AddTrack(stream utils.AVStream) error {
err := t.BaseTransStream.AddTrack(stream)
if err != nil {
func (t *TransStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
var err error
if utils.AVMediaTypeVideo == stream.Type() {
data := stream.CodecParameters().AnnexBExtraData()
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data)
@@ -85,26 +87,15 @@ func (t *transStream) AddTrack(stream utils.AVStream) error {
return err
}
func (t *transStream) WriteHeader() error {
func (t *TransStream) WriteHeader() error {
return t.createSegment()
}
func (t *transStream) AddSink(sink stream.Sink) error {
t.BaseTransStream.AddSink(sink)
if t.m3u8.Size() > 0 {
return sink.(*M3U8Sink).SendM3U8Data(&t.m3u8StringFormat)
}
t.m3u8Sinks[sink.GetID()] = sink.(*M3U8Sink)
return nil
}
func (t *transStream) onTSWrite(data []byte) {
func (t *TransStream) onTSWrite(data []byte) {
t.context.writeBufferSize += len(data)
}
func (t *transStream) onTSAlloc(size int) []byte {
func (t *TransStream) onTSAlloc(size int) []byte {
n := len(t.context.writeBuffer) - t.context.writeBufferSize
if n < size {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
@@ -114,7 +105,7 @@ func (t *transStream) onTSAlloc(size int) []byte {
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
}
func (t *transStream) flushSegment(end bool) error {
func (t *TransStream) flushSegment(end bool) error {
defer func() {
t.context.segmentSeq++
}()
@@ -165,7 +156,7 @@ func (t *transStream) flushSegment(end bool) error {
}
// 创建一个新的ts切片
func (t *transStream) createSegment() error {
func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
@@ -187,7 +178,7 @@ func (t *transStream) createSegment() error {
return err
}
// 继续创建, 认为是文件名冲突, 并且文件已经被打开.
// 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
t.context.segmentSeq++
}
@@ -196,7 +187,7 @@ func (t *transStream) createSegment() error {
return nil
}
func (t *transStream) Close() error {
func (t *TransStream) Close() ([][]byte, int64, error) {
var err error
if t.context.file != nil {
@@ -215,7 +206,7 @@ func (t *transStream) Close() error {
t.m3u8File = nil
}
return err
return nil, 0, err
}
func DeleteOldSegments(id string) {
@@ -256,7 +247,7 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
return nil, err
}
stream_ := &transStream{
transStream := &TransStream{
m3u8Name: m3u8Name,
tsFormat: tsFormat,
tsUrl: tsUrl,
@@ -267,22 +258,22 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
// 创建TS封装器
muxer := libmpeg.NewTSMuxer()
muxer.SetWriteHandler(stream_.onTSWrite)
muxer.SetAllocHandler(stream_.onTSAlloc)
muxer.SetWriteHandler(transStream.onTSWrite)
muxer.SetAllocHandler(transStream.onTSAlloc)
// ts封装上下文对象
stream_.context = &tsContext{
transStream.context = &tsContext{
segmentSeq: 0,
writeBuffer: make([]byte, 1024*1024),
writeBufferSize: 0,
}
stream_.muxer = muxer
stream_.m3u8 = NewM3U8Writer(playlistLength)
stream_.m3u8File = file
transStream.muxer = muxer
transStream.m3u8 = NewM3U8Writer(playlistLength)
transStream.m3u8File = file
stream_.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
return stream_, nil
transStream.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
return transStream, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {

View File

@@ -88,7 +88,7 @@ func main() {
log.Sugar.Info("启动rtsp服务成功 addr:", rtspAddr.String())
}
log.Sugar.Info("启动Http服务 addr:", stream.ListenAddr(stream.AppConfig.Http.Port))
log.Sugar.Info("启动http服务 addr:", stream.ListenAddr(stream.AppConfig.Http.Port))
go startApiServer(net.JoinHostPort(stream.AppConfig.ListenIP, strconv.Itoa(stream.AppConfig.Http.Port)))
//单端口模式下, 启动时就创建收流端口
@@ -101,7 +101,7 @@ func main() {
}
gb28181.SharedUDPServer = server
log.Sugar.Info("启动GB28181 UDP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
log.Sugar.Info("启动GB28181 udp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
}
if stream.AppConfig.GB28181.IsEnableTCP() {
@@ -111,7 +111,7 @@ func main() {
}
gb28181.SharedTCPServer = server
log.Sugar.Info("启动GB28181 TCP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
log.Sugar.Info("启动GB28181 tcp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
}
}

View File

@@ -36,14 +36,6 @@
<button onclick="play()"> 播放</button>
</div>
<div id="dialog" style="display:none;">
<p id="call_title">确实要执行操作吗?</p>
<div class="modal-footer">
<button id="confirmBtn">接听</button>
<button id="cancelBtn">拒绝</button>
</div>
</div>
<div style="margin-top: 10px;">
<div style="float: left">
<video id="videoview" width="310" autoplay muted controls ></video>

View File

@@ -1,13 +1,16 @@
package rtc
import (
"fmt"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"time"
)
type sink struct {
type Sink struct {
stream.BaseSink
offer string
@@ -20,31 +23,117 @@ type sink struct {
cb func(sdp string)
}
func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}
func (s *Sink) StartStreaming(transStream stream.TransStream) error {
// 创建PeerConnection
var videoTrack *webrtc.TrackLocalStaticSample
s.setTrackCount(transStream.TrackCount())
func (s *sink) setTrackCount(count int) {
connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
})
tracks := transStream.GetTracks()
for index, track := range tracks {
var mimeType string
var id string
if utils.AVCodecIdH264 == track.CodecId() {
mimeType = webrtc.MimeTypeH264
} else if utils.AVCodecIdH265 == track.CodecId() {
mimeType = webrtc.MimeTypeH265
} else if utils.AVCodecIdAV1 == track.CodecId() {
mimeType = webrtc.MimeTypeAV1
} else if utils.AVCodecIdVP8 == track.CodecId() {
mimeType = webrtc.MimeTypeVP8
} else if utils.AVCodecIdVP9 == track.CodecId() {
mimeType = webrtc.MimeTypeVP9
} else if utils.AVCodecIdOPUS == track.CodecId() {
mimeType = webrtc.MimeTypeOpus
} else if utils.AVCodecIdPCMALAW == track.CodecId() {
mimeType = webrtc.MimeTypePCMA
} else if utils.AVCodecIdPCMMULAW == track.CodecId() {
mimeType = webrtc.MimeTypePCMU
} else {
log.Sugar.Errorf("codec %s not compatible with webrtc", track.CodecId())
continue
}
if utils.AVMediaTypeAudio == track.Type() {
id = "audio"
} else {
id = "video"
}
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
if err != nil {
panic(err)
} else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
return err
} else if _, err = connection.AddTrack(videoTrack); err != nil {
return err
}
s.addTrack(index, videoTrack)
}
if len(connection.GetTransceivers()) == 0 {
return fmt.Errorf("no track added")
} else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: s.offer}); err != nil {
return err
}
complete := webrtc.GatheringCompletePromise(connection)
answer, err := connection.CreateAnswer(nil)
if err != nil {
return err
} else if err = connection.SetLocalDescription(answer); err != nil {
return err
}
<-complete
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
s.state = state
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), s.GetID(), s.SourceID)
if state > webrtc.ICEConnectionStateDisconnected {
log.Sugar.Errorf("webrtc peer断开连接 sink: %v source :%s", s.GetID(), s.SourceID)
s.Close()
}
})
s.peer = connection
// offer的sdp, 应答给http请求
s.cb(connection.LocalDescription().SDP)
return nil
}
func (s *Sink) setTrackCount(count int) {
s.tracks = make([]*webrtc.TrackLocalStaticSample, count)
}
func (s *sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error {
func (s *Sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error {
s.tracks[index] = track
return nil
}
func (s *sink) SendHeader(data []byte) error {
s.cb(string(data))
return nil
}
func (s *sink) input(index int, data []byte, ts uint32) error {
func (s *Sink) Write(index int, data [][]byte, ts int64) error {
if s.tracks[index] == nil {
return nil
}
return s.tracks[index].WriteSample(media.Sample{
Data: data,
Duration: time.Duration(ts) * time.Millisecond,
})
for _, bytes := range data {
err := s.tracks[index].WriteSample(media.Sample{
Data: bytes,
Duration: time.Duration(ts) * time.Millisecond,
})
if err != nil {
return err
}
}
return nil
}
func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}

View File

@@ -1,9 +1,7 @@
package rtc
import (
"fmt"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
@@ -18,119 +16,27 @@ type transStream struct {
stream.BaseTransStream
}
func (t *transStream) Input(packet utils.AVPacket) error {
for _, iSink := range t.Sinks {
sink_ := iSink.(*sink)
if sink_.state < webrtc.ICEConnectionStateConnected {
continue
func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
if utils.AVMediaTypeAudio == packet.MediaType() {
t.AppendOutStreamBuffer(packet.Data())
} else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
t.AppendOutStreamBuffer(extra)
}
if utils.AVMediaTypeAudio == packet.MediaType() {
sink_.input(packet.Index(), packet.Data(), uint32(packet.Duration(1000)))
} else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
sink_.input(packet.Index(), extra, 0)
}
sink_.input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), uint32(packet.Duration(1000)))
}
t.AppendOutStreamBuffer(packet.Data())
}
return nil
}
func (t *transStream) AddSink(sink_ stream.Sink) error {
//创建PeerConnection
var videoTrack *webrtc.TrackLocalStaticSample
rtcSink := sink_.(*sink)
rtcSink.setTrackCount(len(t.Tracks))
connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
})
for index, track := range t.Tracks {
var mimeType string
var id string
if utils.AVCodecIdH264 == track.CodecId() {
mimeType = webrtc.MimeTypeH264
} else if utils.AVCodecIdH265 == track.CodecId() {
mimeType = webrtc.MimeTypeH265
} else if utils.AVCodecIdAV1 == track.CodecId() {
mimeType = webrtc.MimeTypeAV1
} else if utils.AVCodecIdVP8 == track.CodecId() {
mimeType = webrtc.MimeTypeVP8
} else if utils.AVCodecIdVP9 == track.CodecId() {
mimeType = webrtc.MimeTypeVP9
} else if utils.AVCodecIdOPUS == track.CodecId() {
mimeType = webrtc.MimeTypeOpus
} else if utils.AVCodecIdPCMALAW == track.CodecId() {
mimeType = webrtc.MimeTypePCMA
} else if utils.AVCodecIdPCMMULAW == track.CodecId() {
mimeType = webrtc.MimeTypePCMU
} else {
log.Sugar.Errorf("codec %d not compatible with webrtc", track.CodecId())
continue
}
if utils.AVMediaTypeAudio == track.Type() {
id = "audio"
} else {
id = "video"
}
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
if err != nil {
panic(err)
} else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
return err
} else if _, err = connection.AddTrack(videoTrack); err != nil {
return err
}
rtcSink.addTrack(index, videoTrack)
}
if len(connection.GetTransceivers()) == 0 {
return fmt.Errorf("no track added")
} else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil {
return err
}
complete := webrtc.GatheringCompletePromise(connection)
answer, err := connection.CreateAnswer(nil)
if err != nil {
return err
} else if err = connection.SetLocalDescription(answer); err != nil {
return err
}
<-complete
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
rtcSink.state = state
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.GetID(), rtcSink.SourceID)
if state > webrtc.ICEConnectionStateDisconnected {
log.Sugar.Errorf("webrtc peer断开连接 sink:%v source:%s", rtcSink.GetID(), rtcSink.SourceID)
rtcSink.Close()
}
})
rtcSink.peer = connection
rtcSink.SendHeader([]byte(connection.LocalDescription().SDP))
return t.BaseTransStream.AddSink(sink_)
return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
}
func (t *transStream) WriteHeader() error {
return nil
}
func NewTransStream() stream.TransStream {
t := &transStream{}
return t
}
func InitConfig() {
setting := webrtc.SettingEngine{}
var ips []string
@@ -145,11 +51,11 @@ func InitConfig() {
panic(err)
}
//设置公网ip和监听端口
// 设置公网ip和监听端口
setting.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener))
setting.SetNAT1To1IPs(ips, webrtc.ICECandidateTypeHost)
//注册音视频编码器
// 注册音视频编码器
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
@@ -163,6 +69,11 @@ func InitConfig() {
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(setting))
}
func NewTransStream() stream.TransStream {
t := &transStream{}
return t
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(), nil
}

View File

@@ -12,11 +12,11 @@ type Sink struct {
stack *librtmp.Stack
}
func (s *Sink) Start() {
_ = s.stack.SendStreamBeginChunk(s.Conn)
func (s *Sink) StartStreaming(_ stream.TransStream) error {
return s.stack.SendStreamBeginChunk(s.Conn)
}
func (s *Sink) Flush() {
func (s *Sink) StopStreaming(_ stream.TransStream) {
_ = s.stack.SendStreamEOFChunk(s.Conn)
}
@@ -27,7 +27,7 @@ func (s *Sink) Close() {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
return &Sink{
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE},
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE, TCPStreaming: true},
stack: stack,
}
}

View File

@@ -18,23 +18,18 @@ type transStream struct {
muxer libflv.Muxer
audioChunk librtmp.Chunk
videoChunk librtmp.Chunk
//合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
//起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
//看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
mwBuffer stream.MergeWritingBuffer //合并写同时作为, 用户态的发送缓冲区
}
func (t *transStream) Input(packet utils.AVPacket) error {
utils.Assert(t.BaseTransStream.Completed)
func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var data []byte
var chunk *librtmp.Chunk
var videoPkt bool
var videoKey bool
//rtmp chunk消息体的数据大小
// rtmp chunk消息体的数据大小
var payloadSize int
//先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
// 先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
var chunkPayloadOffset int
var dts int64
var pts int64
@@ -57,32 +52,32 @@ func (t *transStream) Input(packet utils.AVPacket) error {
payloadSize += chunkPayloadOffset + len(data)
}
//遇到视频关键帧, 发送剩余的流, 创建新切片
// 遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey {
if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
t.SendPacket(segment)
if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
}
//分配内存
//固定type0
// 分配内存
// 固定type0
chunkHeaderSize := 12
//type3chunk数量
// type3chunk数量
numChunks := (payloadSize - 1) / t.chunkSize
rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
//如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
// 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
if dts >= 0xFFFFFF && dts <= 0xFFFFFFFF {
rtmpMsgSize += (1 + numChunks) * 4
}
allocate := t.mwBuffer.Allocate(rtmpMsgSize, dts, videoKey)
allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)
//写chunk header
// 写chunk header
chunk.Length = payloadSize
chunk.Timestamp = uint32(dts)
n := chunk.ToBytes(allocate)
//写flv
// 写flv
if videoPkt {
n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
} else {
@@ -92,25 +87,30 @@ func (t *transStream) Input(packet utils.AVPacket) error {
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
utils.Assert(len(allocate) == n)
//合并写满了再发
if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
t.SendPacket(segment)
// 合并写满了再发
if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
return nil
return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}
func (t *transStream) AddSink(sink stream.Sink) error {
func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
utils.Assert(t.headerSize > 0)
t.TCPTransStream.AddSink(sink)
//发送sequence header
sink.Input(t.header[:t.headerSize])
// 发送sequence header
return [][]byte{t.header[:t.headerSize]}, 0, nil
}
//发送当前内存池已有的合并写切片
t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
sink.Input(bytes)
func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
t.ClearOutStreamBuffer()
// 发送当前内存池已有的合并写切片
t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
t.AppendOutStreamBuffer(bytes)
})
return nil
return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func (t *transStream) WriteHeader() error {
@@ -136,7 +136,7 @@ func (t *transStream) WriteHeader() error {
utils.Assert(audioStream != nil || videoStream != nil)
//初始化
// 初始化
t.BaseTransStream.Completed = true
t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer()
@@ -148,7 +148,7 @@ func (t *transStream) WriteHeader() error {
t.muxer.AddVideoTrack(videoCodecId)
}
//统一生成rtmp拉流需要的数据头(chunk+sequence header)
// 生成推流的数据头(chunk+sequence header)
var n int
if audioStream != nil {
n += t.muxer.WriteAudioData(t.header[12:], true)
@@ -174,17 +174,19 @@ func (t *transStream) WriteHeader() error {
}
t.headerSize = n
t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
func (t *transStream) Close() error {
func (t *transStream) Close() ([][]byte, int64, error) {
t.ClearOutStreamBuffer()
//发送剩余的流
if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
t.SendPacket(segment)
if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
return nil
return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func NewTransStream(chunkSize int) stream.TransStream {

View File

@@ -66,7 +66,7 @@ func (h handler) Process(session *session, method string, url_ *url.URL, headers
//确保拉流要经过授权
state, ok := method2StateMap[method]
if ok && state > SessionStateSetup && session.sink_ == nil {
if ok && state > SessionStateSetup && session.sink == nil {
return fmt.Errorf("please establish a session first")
}
@@ -107,17 +107,17 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
var response *http.Response
var body []byte
//校验密码
// 校验密码
if h.password != "" {
var success bool
var ok bool
authorization := request.headers.Get("Authorization")
if authorization != "" {
params, err := parseAuthParams(authorization)
success = err == nil && DoAuthenticatePlainTextPassword(params, h.password)
ok = err == nil && DoAuthenticatePlainTextPassword(params, h.password)
}
if !success {
if !ok {
response401 := NewResponse(http.StatusUnauthorized, request.headers.Get("Cseq"))
response401.Header.Set("WWW-Authenticate", generateAuthHeader("lkm"))
return response401, nil, nil
@@ -125,26 +125,27 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
}
sinkId := stream.NetAddr2SinkId(request.session.conn.RemoteAddr())
sink_ := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
sink := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
// 响应sdp回调
response = NewOKResponse(request.headers.Get("Cseq"))
response.Header.Set("Content-Type", "application/sdp")
request.session.response(response, []byte(sdp))
})
sink_.SetUrlValues(request.url.Query())
_, code := stream.PreparePlaySink(sink_)
sink.SetUrlValues(request.url.Query())
_, code := stream.PreparePlaySinkWithReady(sink, false)
if utils.HookStateOK != code {
return nil, nil, fmt.Errorf("hook failed. code:%d", code)
return nil, nil, fmt.Errorf("hook failed. code: %d", code)
}
request.session.sink_ = sink_.(*sink)
request.session.sink = sink.(*Sink)
return nil, body, err
}
func (h handler) OnSetup(request Request) (*http.Response, []byte, error) {
var response *http.Response
//修复rtsp拉流携带参数,参数解析失败.
// 修复解析拉流携带的参数失败问题
params := strings.ReplaceAll(request.url.RawQuery, "/?", "&")
query, err := url.ParseQuery(params)
if err != nil {
@@ -196,14 +197,14 @@ func (h handler) OnSetup(request Request) (*http.Response, []byte, error) {
}
ssrc := 0xFFFFFFFF
rtpPort, rtcpPort, err := request.session.sink_.addSender(index, tcp, uint32(ssrc))
rtpPort, rtcpPort, err := request.session.sink.AddSender(index, tcp, uint32(ssrc))
if err != nil {
return nil, nil, err
}
responseHeader := transportHeader
if tcp {
//修改interleaved为实际的stream index
// 修改interleaved为实际的stream index
responseHeader += ";interleaved=" + fmt.Sprintf("%d-%d", index, index)
} else {
responseHeader += ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort)
@@ -225,7 +226,14 @@ func (h handler) OnPlay(request Request) (*http.Response, []byte, error) {
response.Header.Set("Session", sessionHeader)
}
request.session.sink_.playing = true
sink := request.session.sink
sink.SetReady(true)
source := stream.SourceManager.Find(sink.GetSourceID())
if source == nil {
return nil, nil, fmt.Errorf("Source with ID %s does not exist.", request.sourceId)
}
source.AddSink(sink)
return response, nil, nil
}
@@ -239,7 +247,7 @@ func (h handler) OnPause(request Request) (*http.Response, []byte, error) {
return response, nil, nil
}
func newHandler(password string) *handler {
func NewHandler(password string) *handler {
h := handler{
methods: make(map[string]reflect.Value, 10),
password: password,

View File

@@ -16,7 +16,7 @@ type Server interface {
func NewServer(password string) Server {
return &server{
handler: newHandler(password),
handler: NewHandler(password),
}
}

View File

@@ -44,7 +44,7 @@ func init() {
type session struct {
conn net.Conn
sink_ *sink
sink *Sink
sessionId string
writeBuffer *bytes.Buffer //响应体缓冲区
state SessionState
@@ -89,9 +89,9 @@ func (s *session) close() {
s.conn = nil
}
if s.sink_ != nil {
s.sink_.Close()
s.sink_ = nil
if s.sink != nil {
s.sink.Close()
s.sink = nil
}
}

View File

@@ -15,34 +15,31 @@ var (
TransportManger transport.Manager
)
// rtsp拉流sink
// Sink rtsp拉流sink
// 对于udp而言, 每个sink维护多个transport
// tcp直接单端口传输
type sink struct {
// tcp使用信令链路传输
type Sink struct {
stream.BaseSink
senders []*librtp.RtpSender //一个rtsp源可能存在多个流, 每个流都需要拉取
sdpCb func(sdp string) //rtsp_stream生成sdp后使用该回调给rtsp_session, 响应describe
tcp bool //tcp拉流标记
playing bool //是否已经收到play请求
senders []*librtp.RtpSender // 一个rtsp源, 可能存在多个流, 每个流都需要拉取
sdpCb func(sdp string) // sdp回调, 响应describe
}
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
return &sink{
stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
nil,
cb,
false,
false,
func (s *Sink) StartStreaming(transStream stream.TransStream) error {
if s.senders == nil {
s.senders = make([]*librtp.RtpSender, transStream.TrackCount())
}
// sdp回调给sink, sink应答给describe请求
if s.sdpCb != nil {
s.sdpCb(transStream.(*TranStream).sdp)
s.sdpCb = nil
}
return nil
}
func (s *sink) setSenderCount(count int) {
s.senders = make([]*librtp.RtpSender, count)
}
func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, error) {
func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, error) {
utils.Assert(index < cap(s.senders))
utils.Assert(s.senders[index] == nil)
@@ -54,9 +51,8 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
SSRC: ssrc,
}
//tcp使用信令链路
if tcp {
s.tcp = true
s.TCPStreaming = true
} else {
sender.Rtp, err = TransportManger.NewUDPServer("0.0.0.0")
if err != nil {
@@ -83,39 +79,43 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
return rtpPort, rtcpPort, err
}
func (s *sink) input(index int, data []byte, rtpTime uint32) error {
//拉流方还没有连上来
utils.Assert(index < cap(s.senders))
func (s *Sink) Write(index int, data [][]byte, rtpTime int64) error {
// 拉流方还没有连上来
if index >= cap(s.senders) || s.senders[index] == nil {
return nil
}
sender := s.senders[index]
sender.PktCount++
sender.OctetCount += len(data)
if s.tcp {
s.Conn.Write(data)
} else {
//发送rtcp sr包
sender.RtpConn.Write(data)
for _, bytes := range data {
sender := s.senders[index]
sender.PktCount++
sender.OctetCount += len(bytes)
if s.TCPStreaming {
s.Conn.Write(bytes)
} else {
//发送rtcp sr包
sender.RtpConn.Write(bytes[OverTcpHeaderSize:])
if sender.RtcpConn == nil || sender.PktCount%100 != 0 {
return nil
if sender.RtcpConn == nil || sender.PktCount%100 != 0 {
continue
}
nano := uint64(time.Now().UnixNano())
ntp := (nano/1000000000 + 2208988800<<32) | (nano % 1000000000)
sr := rtcp.SenderReport{
SSRC: sender.SSRC,
NTPTime: ntp,
RTPTime: uint32(rtpTime),
PacketCount: uint32(sender.PktCount),
OctetCount: uint32(sender.OctetCount),
}
marshal, err := sr.Marshal()
if err != nil {
log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
}
sender.RtcpConn.Write(marshal)
}
nano := uint64(time.Now().UnixNano())
ntp := (nano/1000000000 + 2208988800<<32) | (nano % 1000000000)
sr := rtcp.SenderReport{
SSRC: sender.SSRC,
NTPTime: ntp,
RTPTime: rtpTime,
PacketCount: uint32(sender.PktCount),
OctetCount: uint32(sender.OctetCount),
}
marshal, err := sr.Marshal()
if err != nil {
log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
}
sender.RtcpConn.Write(marshal)
}
return nil
@@ -123,22 +123,10 @@ func (s *sink) input(index int, data []byte, rtpTime uint32) error {
// 拉流链路是否已经连接上
// 拉流测发送了play请求, 并且对于udp而言, 还需要收到nat穿透包
func (s *sink) isConnected(index int) bool {
return s.playing && (s.tcp || (s.senders[index] != nil && s.senders[index].RtpConn != nil))
func (s *Sink) isConnected(index int) bool {
return s.TCPStreaming || (s.senders[index] != nil && s.senders[index].RtpConn != nil)
}
// 发送rtp包总数
func (s *sink) pktCount(index int) int {
return s.senders[index].PktCount
}
// SendHeader 回调rtsp stream的sdp信息
func (s *sink) SendHeader(data []byte) error {
s.sdpCb(string(data))
return nil
}
func (s *sink) Close() {
func (s *Sink) Close() {
s.BaseSink.Close()
for _, sender := range s.senders {
@@ -155,3 +143,11 @@ func (s *sink) Close() {
}
}
}
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
return &Sink{
stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
nil,
cb,
}
}

View File

@@ -17,136 +17,87 @@ const (
OverTcpMagic = 0x24
)
// rtsp传输流封装
// TranStream rtsp传输流封装
// 低延迟是rtsp特性, 所以不考虑实现GOP缓存
type tranStream struct {
type TranStream struct {
stream.BaseTransStream
addr net.IPAddr
addrType string
urlFormat string
rtpTracks []*rtspTrack
rtpTracks []*Track
sdp string
buffer *stream.ReceiveBuffer
}
// rtpMuxer申请输出流内存的回调
// 无论是tcp/udp拉流, 均使用同一块内存, 并且给tcp预留4字节的包长.
func (t *tranStream) onAllocBuffer(params interface{}) []byte {
return t.rtpTracks[params.(int)].buffer[OverTcpHeaderSize:]
}
// onRtpPacket 所有封装后的RTP流都将回调于此
func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface{}) {
//params传递track索引
index := params.(int)
track := t.rtpTracks[index]
//保存带有sps和ssp等编码信息的rtp包, 对所有sink通用
if track.cache && track.extraDataBuffer == nil {
bytes := make([]byte, OverTcpHeaderSize+len(data))
copy(bytes[OverTcpHeaderSize:], data)
track.tmpExtraDataBuffer = append(track.tmpExtraDataBuffer, bytes)
t.overTCP(bytes, index)
return
}
//将rtp包发送给各个sink
for _, value := range t.Sinks {
sink_ := value.(*sink)
if !sink_.isConnected(index) {
continue
}
//为刚刚连接上的sink, 发送sps和pps等rtp包
if sink_.pktCount(index) < 1 && utils.AVMediaTypeVideo == track.mediaType {
seq := binary.BigEndian.Uint16(data[2:])
count := len(track.extraDataBuffer)
for i, rtp := range track.extraDataBuffer {
//回滚rtp包的序号
librtp.RollbackSeq(rtp[OverTcpHeaderSize:], int(seq)-(count-i-1))
if sink_.tcp {
sink_.input(index, rtp, 0)
} else {
sink_.input(index, rtp[OverTcpHeaderSize:], timestamp)
}
}
}
end := OverTcpHeaderSize + len(data)
t.overTCP(track.buffer[:end], index)
//发送rtp包
if sink_.tcp {
sink_.input(index, track.buffer[:end], 0)
} else {
sink_.input(index, data, timestamp)
}
}
}
func (t *tranStream) overTCP(data []byte, channel int) {
func (t *TranStream) OverTCP(data []byte, channel int) {
data[0] = OverTcpMagic
data[1] = byte(channel)
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
func (t *tranStream) Input(packet utils.AVPacket) error {
stream_ := t.rtpTracks[packet.Index()]
func (t *TranStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
t.ClearOutStreamBuffer()
var ts uint32
track := t.rtpTracks[packet.Index()]
track.seq = track.muxer.GetHeader().Seq
if utils.AVMediaTypeAudio == packet.MediaType() {
stream_.muxer.Input(packet.Data(), uint32(packet.ConvertPts(stream_.rate)))
ts = uint32(packet.ConvertPts(track.rate))
t.PackRtpPayload(track.muxer, packet.Index(), packet.Data(), ts)
} else if utils.AVMediaTypeVideo == packet.MediaType() {
ts = uint32(packet.ConvertPts(track.rate))
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
t.PackRtpPayload(track.muxer, packet.Index(), data, ts)
}
//将sps和pps按照单一模式打包
if stream_.extraDataBuffer == nil {
if !packet.KeyFrame() {
return nil
}
return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
}
stream_.cache = true
parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters()
if utils.AVCodecIdH265 == packet.CodecId() {
bytes := parameters.(*utils.HEVCCodecData).VPS()
stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate)))
}
spsBytes := parameters.SPS()
ppsBytes := parameters.PPS()
stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
stream_.extraDataBuffer = stream_.tmpExtraDataBuffer
func (t *TranStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
// 返回视频编码数据的rtp包
for _, track := range t.rtpTracks {
if utils.AVMediaTypeVideo != track.mediaType {
continue
}
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate)))
// 回滚序号和时间戳
index := int(track.seq) - len(track.extraDataBuffer)
for i, bytes := range track.extraDataBuffer {
librtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts))
}
return track.extraDataBuffer, ts, nil
}
return nil
return nil, ts, nil
}
func (t *tranStream) AddSink(sink_ stream.Sink) error {
sink_.(*sink).setSenderCount(len(t.BaseTransStream.Tracks))
if err := sink_.(*sink).SendHeader([]byte(t.sdp)); err != nil {
return err
}
return t.BaseTransStream.AddSink(sink_)
func (t *TranStream) PackRtpPayload(muxer librtp.Muxer, channel int, data []byte, timestamp uint32) {
var index int
muxer.Input(data, timestamp, func() []byte {
index = t.buffer.Index()
block := t.buffer.GetBlock()
return block[OverTcpHeaderSize:]
}, func(bytes []byte) {
packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)]
t.OverTCP(packet, channel)
t.AppendOutStreamBuffer(packet)
})
}
func (t *tranStream) AddTrack(stream utils.AVStream) error {
func (t *TranStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
payloadType, ok := librtp.CodecIdPayloads[stream.CodecId()]
if !ok {
return fmt.Errorf("no payload type was found for codecid:%d", stream.CodecId())
return fmt.Errorf("no payload type was found for codecid: %d", stream.CodecId())
}
//创建RTP封装
// 创建RTP封装
var muxer librtp.Muxer
if utils.AVCodecIdH264 == stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, 0, 0xFFFFFFFF)
@@ -158,25 +109,51 @@ func (t *tranStream) AddTrack(stream utils.AVStream) error {
muxer = librtp.NewMuxer(payloadType.Pt, 0, 0xFFFFFFFF)
}
muxer.SetAllocHandler(t.onAllocBuffer)
muxer.SetWriteHandler(t.onRtpPacket)
t.rtpTracks = append(t.rtpTracks, NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, stream.Type()))
muxer.SetParams(len(t.rtpTracks) - 1)
index := len(t.rtpTracks) - 1
// 将sps和pps按照单一模式打包
bufferIndex := t.buffer.Index()
if utils.AVMediaTypeVideo == stream.Type() {
parameters := stream.CodecParameters()
if utils.AVCodecIdH265 == stream.CodecId() {
bytes := parameters.(*utils.HEVCCodecData).VPS()
t.PackRtpPayload(muxer, index, bytes[0], 0)
}
spsBytes := parameters.SPS()
ppsBytes := parameters.PPS()
t.PackRtpPayload(muxer, index, spsBytes[0], 0)
t.PackRtpPayload(muxer, index, ppsBytes[0], 0)
// 拷贝扩展数据的rtp包
size := t.buffer.Index() - bufferIndex
extraRtpBuffer := make([][]byte, size)
for i := 0; i < size; i++ {
src := t.buffer.Get(bufferIndex + i)
dst := make([]byte, len(src))
copy(dst, src)
extraRtpBuffer[i] = dst[:OverTcpHeaderSize+binary.BigEndian.Uint16(dst[2:])]
}
t.rtpTracks[index].extraDataBuffer = extraRtpBuffer
}
return nil
}
func (t *tranStream) Close() error {
func (t *TranStream) Close() ([][]byte, int64, error) {
for _, track := range t.rtpTracks {
if track != nil {
track.Close()
}
}
return nil
return nil, 0, nil
}
func (t *tranStream) WriteHeader() error {
func (t *TranStream) WriteHeader() error {
description := sdp.SessionDescription{
Version: 0,
Origin: sdp.Origin{
@@ -261,9 +238,11 @@ func (t *tranStream) WriteHeader() error {
}
func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
t := &tranStream{
t := &TranStream{
addr: addr,
urlFormat: urlFormat,
// 在将AVPacket打包rtp时, 会使用多个buffer块, 回环覆盖多个rtp块, 如果是TCP拉流并且网络不好, 推流的数据会错乱.
buffer: stream.NewReceiveBuffer(1500, 1024),
}
if addr.IP.To4() != nil {

View File

@@ -5,33 +5,25 @@ import (
"github.com/lkmio/avformat/utils"
)
// 对rtsp每路输出流的封装
type rtspTrack struct {
// Track RtspTrack 对rtsp每路输出流的封装
type Track struct {
pt byte
rate int
mediaType utils.AVMediaType
seq uint16
buffer []byte //buffer of rtp packet
muxer librtp.Muxer
cache bool
extraDataBuffer [][]byte //缓存带有编码信息的rtp包, 对所有sink通用
tmpExtraDataBuffer [][]byte //缓存带有编码信息的rtp包, 整个过程会多次回调(sps->pps->sei...), 先保存到临时区, 最后再缓存到extraDataBuffer
muxer librtp.Muxer
extraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
}
func (r *rtspTrack) Close() {
if r.muxer != nil {
r.muxer.Close()
r.muxer = nil
}
func (r *Track) Close() {
}
func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *rtspTrack {
stream := &rtspTrack{
func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
stream := &Track{
pt: pt,
rate: rate,
muxer: muxer,
buffer: make([]byte, 1500),
mediaType: mediaType,
}

View File

@@ -231,16 +231,16 @@ func init() {
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` //收流解析AVStream的超时时间
WriteTimeout int `json:"write_timeout"` //Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
WriteBufferNumber int `json:"-"`
PublicIP string `json:"public_ip"`
ListenIP string `json:"listen_ip"`
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` //debug模式, 开启将保存推流
GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
WriteTimeout int `json:"write_timeout"` // Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
WriteBufferCapacity int `json:"-"` // 发送缓冲区容量大小, 缓冲区由多个内存块构成.
PublicIP string `json:"public_ip"`
ListenIP string `json:"listen_ip"`
IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` // debug模式, 开启将保存推流
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
@@ -284,7 +284,7 @@ func SetDefaultConfig(config_ *AppConfig_) {
config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream
config_.WriteTimeout = limitInt(2000, 10000, config_.WriteTimeout)
config_.WriteBufferNumber = config_.WriteTimeout/config_.MergeWriteLatency + 1
config_.WriteBufferCapacity = config_.WriteTimeout/config_.MergeWriteLatency + 1
config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level)
config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize)

View File

@@ -7,6 +7,10 @@ import (
)
func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
return PreparePlaySinkWithReady(sink, true)
}
func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hooks.IsEnableOnPlay() {
@@ -20,6 +24,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
response = hook
}
sink.SetReady(ok)
source := SourceManager.Find(sink.GetSourceID())
if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID())

View File

@@ -27,6 +27,8 @@ type MergeWritingBuffer interface {
// ReadSegmentsFromKeyFrameIndex 从最近的关键帧读取切片
ReadSegmentsFromKeyFrameIndex(cb func([]byte))
Capacity() int
}
type mwBlock struct {
@@ -39,17 +41,13 @@ type mwBlock struct {
type mergeWritingBuffer struct {
mwBlocks []mwBlock
//空闲合并写块
keyFrameFreeMWBlocks collections.LinkedList[collections.MemoryPool]
noneKeyFreeFrameMWBlocks collections.LinkedList[collections.MemoryPool]
index int // 当前切片位于mwBlocks的索引
startTS int64 // 当前切片的开始时间
duration int // 当前切片时长
index int //当前切片位于mwBlocks的索引
startTS int64 //当前切片的开始时间
duration int //当前切片时长
lastKeyFrameIndex int //最新关键帧所在切片的索引
keyFrameCount int //关键帧计数
existVideo bool //是否存在视频
lastKeyFrameIndex int // 最新关键帧所在切片的索引
keyFrameCount int // 关键帧计数
existVideo bool // 是否存在视频
keyFrameBufferMaxLength int
nonKeyFrameBufferMaxLength int
@@ -205,11 +203,15 @@ func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
}
}
func (m *mergeWritingBuffer) Capacity() int {
return cap(m.mwBlocks)
}
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
//开启GOP缓存, 输出流也缓存整个GOP
// 开启GOP缓存, 输出流也缓存整个GOP
var blocks []mwBlock
if existVideo {
blocks = make([]mwBlock, AppConfig.WriteBufferNumber)
blocks = make([]mwBlock, AppConfig.WriteBufferCapacity)
} else {
blocks = make([]mwBlock, 1)
}

View File

@@ -17,12 +17,24 @@ type ReceiveBuffer struct {
index int
}
func (r *ReceiveBuffer) Index() int {
return r.index
}
func (r *ReceiveBuffer) Get(index int) []byte {
return r.data[index*r.blockSize : (index+1)*r.blockSize]
}
func (r *ReceiveBuffer) GetBlock() []byte {
bytes := r.data[r.index*r.blockSize:]
r.index = (r.index + 1) % r.blockCount
return bytes[:r.blockSize]
}
func (r *ReceiveBuffer) BlockCount() int {
return r.blockCount
}
func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer {
return &ReceiveBuffer{blockSize: blockSize, blockCount: blockCount, data: make([]byte, blockSize*blockCount), index: 0}
}

View File

@@ -16,9 +16,7 @@ type Sink interface {
GetSourceID() string
Input(data []byte) error
SendHeader(data []byte) error
Write(index int, data [][]byte, ts int64) error
GetTransStreamID() TransStreamID
@@ -61,11 +59,25 @@ type Sink interface {
SetUrlValues(values url.Values)
Start()
// StartStreaming Source向Sink开始推流时调用
StartStreaming(stream TransStream) error
Flush()
// StopStreaming Source向Sink停止推流时调用
StopStreaming(stream TransStream)
GetConn() net.Conn
IsTCPStreaming() bool
GetSentPacketCount() int
SetSentPacketCount(int)
IncreaseSentPacketCount()
IsReady() bool
SetReady(ok bool)
}
type BaseSink struct {
@@ -76,14 +88,17 @@ type BaseSink struct {
TransStreamID TransStreamID
disableVideo bool
lock sync.RWMutex
HasSentKeyVideo bool // 是否已经发送视频关键帧未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
lock sync.RWMutex
DesiredAudioCodecId_ utils.AVCodecID
DesiredVideoCodecId_ utils.AVCodecID
Conn net.Conn
urlValues url.Values // 拉流时携带的Url参数
Conn net.Conn // 拉流信令链路
TCPStreaming bool // 是否是TCP流式拉流
urlValues url.Values // 拉流时携带的Url参数
SentPacketCount int // 发包计数
Ready bool
}
func (s *BaseSink) GetID() SinkID {
@@ -94,20 +109,19 @@ func (s *BaseSink) SetID(id SinkID) {
s.ID = id
}
func (s *BaseSink) Input(data []byte) error {
func (s *BaseSink) Write(index int, data [][]byte, ts int64) error {
if s.Conn != nil {
_, err := s.Conn.Write(data)
return err
for _, bytes := range data {
_, err := s.Conn.Write(bytes)
if err != nil {
return err
}
}
}
return nil
}
func (s *BaseSink) SendHeader(data []byte) error {
return s.Input(data)
}
func (s *BaseSink) GetSourceID() string {
return s.SourceID
}
@@ -133,7 +147,7 @@ func (s *BaseSink) UnLock() {
}
func (s *BaseSink) GetState() SessionState {
utils.Assert(!s.lock.TryLock())
//utils.Assert(!s.lock.TryLock())
return s.State
}
@@ -161,11 +175,8 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
}
// Close 做如下事情:
// 1. Sink如果正在拉流,删除任务交给Source处理. 否则直接从等待队列删除Sink.
// 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink.
// 2. 发送PlayDoneHook事件
// 什么时候调用Close? 是否考虑线程安全?
// 拉流断开连接,不需要考虑线程安全
// 踢流走source管道删除,并且关闭Conn
func (s *BaseSink) Close() {
if SessionStateClosed == s.State {
return
@@ -195,9 +206,10 @@ func (s *BaseSink) Close() {
}
if state == SessionStateTransferring {
// 从Source中删除Sink
source := SourceManager.Find(s.SourceID)
source.RemoveSink(s)
// 从source中删除sink, 如果source为nil, 已经结束推流.
if source := SourceManager.Find(s.SourceID); source != nil {
source.RemoveSink(s)
}
} else if state == SessionStateWait {
// 从等待队列中删除Sink
RemoveSinkFromWaitingQueue(s.SourceID, s.ID)
@@ -225,14 +237,38 @@ func (s *BaseSink) SetUrlValues(values url.Values) {
s.urlValues = values
}
func (s *BaseSink) Start() {
func (s *BaseSink) StartStreaming(stream TransStream) error {
return nil
}
func (s *BaseSink) Flush() {
func (s *BaseSink) StopStreaming(stream TransStream) {
}
func (s *BaseSink) GetConn() net.Conn {
return s.Conn
}
func (s *BaseSink) IsTCPStreaming() bool {
return s.TCPStreaming
}
func (s *BaseSink) GetSentPacketCount() int {
return s.SentPacketCount
}
func (s *BaseSink) SetSentPacketCount(count int) {
s.SentPacketCount = count
}
func (s *BaseSink) IncreaseSentPacketCount() {
s.SentPacketCount++
}
func (s *BaseSink) IsReady() bool {
return s.Ready
}
func (s *BaseSink) SetReady(ok bool) {
s.Ready = ok
}

View File

@@ -2,6 +2,7 @@ package stream
import (
"fmt"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/collections"
"github.com/lkmio/lkm/log"
"net"
@@ -90,12 +91,15 @@ type Source interface {
// PostEvent 切换到主协程执行当前函数
PostEvent(cb func())
// LastPacketTime 返回最近收流时间戳
LastPacketTime() time.Time
SetLastPacketTime(time2 time.Time)
// SinkCount 返回拉流计数
SinkCount() int
// LastStreamEndTime 返回最近结束拉流时间戳
LastStreamEndTime() time.Time
SetReceiveDataTimer(timer *time.Timer)
@@ -111,6 +115,8 @@ type Source interface {
CreateTime() time.Time
SetCreateTime(time time.Time)
PlaySink(sin Sink)
}
type PublishSource struct {
@@ -138,7 +144,9 @@ type PublishSource struct {
receiveDataTimer *time.Timer // 收流超时计时器
idleTimer *time.Timer // 拉流空闲计时器
TransStreams map[TransStreamID]TransStream //所有的输出流, 持有Sink
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
Sinks map[SinkID]Sink // 保存所有Sink
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
streamPipe chan []byte // 推流数据管道
mainContextEvents chan func() // 切换到主协程执行函数的事件管道
@@ -202,6 +210,10 @@ func (s *PublishSource) Init(receiveQueueSize int) {
// -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖
s.streamPipe = make(chan []byte, receiveQueueSize-2)
s.mainContextEvents = make(chan func(), 128)
s.TransStreams = make(map[TransStreamID]TransStream, 10)
s.Sinks = make(map[SinkID]Sink, 128)
s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
}
func (s *PublishSource) CreateDefaultOutStreams() {
@@ -225,14 +237,15 @@ func (s *PublishSource) CreateDefaultOutStreams() {
streams := s.OriginStreams()
utils.Assert(len(streams) > 0)
hlsStream, err := s.CreateTransStream(TransStreamHls, streams)
id := GenerateTransStreamID(TransStreamHls, streams...)
hlsStream, err := s.CreateTransStream(id, TransStreamHls, streams)
if err != nil {
panic(err)
}
s.dispatchGOPBuffer(hlsStream)
s.DispatchGOPBuffer(hlsStream)
s.hlsStream = hlsStream
s.TransStreams[GenerateTransStreamID(TransStreamHls, streams...)] = s.hlsStream
s.TransStreams[id] = s.hlsStream
}
}
@@ -279,8 +292,8 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils
return true
}
func (s *PublishSource) CreateTransStream(protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.ID)
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source: %s", protocol.ToString(), s.ID)
transStream, err := CreateTransStream(s, protocol, streams)
if err != nil {
@@ -288,22 +301,71 @@ func (s *PublishSource) CreateTransStream(protocol TransStreamProtocol, streams
return nil, err
}
for _, avStream := range streams {
transStream.AddTrack(avStream)
for _, track := range streams {
transStream.AddTrack(track)
}
transStream.Init()
transStream.SetID(id)
// 创建输出流对应的拉流队列
s.TransStreamSinks[id] = make(map[SinkID]Sink, 128)
_ = transStream.WriteHeader()
return transStream, err
}
func (s *PublishSource) dispatchGOPBuffer(transStream TransStream) {
func (s *PublishSource) DispatchGOPBuffer(transStream TransStream) {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
transStream.Input(packet)
s.DispatchPacket(transStream, packet)
})
}
// DispatchPacket 分发AVPacket
func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket) {
data, timestamp, videoKey, err := transStream.Input(packet)
if err != nil || len(data) < 1 {
return
}
s.DispatchBuffer(transStream, packet.Index(), data, timestamp, videoKey)
}
// DispatchBuffer 分发传输流
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) {
sinks := s.TransStreamSinks[transStream.GetID()]
exist := transStream.IsExistVideo()
for _, sink := range sinks {
// 如果存在视频, 确保向sink发送的第一帧是关键帧
if exist && sink.GetSentPacketCount() < 1 {
if !videoKey {
continue
}
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
s.write(sink, index, extraData, timestamp)
}
}
s.write(sink, index, data, timestamp)
}
}
// 向sink推流
func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int64) {
err := sink.Write(index, data, timestamp)
if err == nil {
sink.IncreaseSentPacketCount()
//return
}
// 内核发送缓冲区满, 清空sink的发送缓冲区, 等下次关键帧时再尝试发送。
//_, ok := err.(*transport.ZeroWindowSizeError)
//if ok {
// conn, ok := sink.GetConn().(*transport.Conn)
//}
}
// 创建sink需要的输出流
func (s *PublishSource) doAddSink(sink Sink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
@@ -352,14 +414,10 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
}
transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
transStream, ok := s.TransStreams[transStreamId]
if !ok {
if s.TransStreams == nil {
s.TransStreams = make(map[TransStreamID]TransStream, 10)
}
transStream, exist := s.TransStreams[transStreamId]
if !exist {
var err error
transStream, err = s.CreateTransStream(sink.GetProtocol(), streams[:size])
transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), streams[:size])
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return false
@@ -377,19 +435,50 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
} else {
transStream.AddSink(sink)
sink.SetState(SessionStateTransferring)
}
sink.SetState(SessionStateTransferring)
}
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("开始推流失败 err: %s", err.Error())
return false
}
// 还没准备好推流, 暂不推流
if !sink.IsReady() {
return true
}
// TCP拉流开启异步发包
conn, ok := sink.GetConn().(*transport.Conn)
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
conn.EnableAsyncWriteMode(transStream.OutStreamBufferCapacity() - 2)
}
// 发送已有的缓存数据
// 此处发送缓存数据必须要存在关键帧的输出流才发否则等DispatchPacket时再发送extra。
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(data) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
s.write(sink, 0, extraData, timestamp)
}
s.write(sink, 0, data, timestamp)
}
// 累加拉流计数
if s.recordSink != sink {
s.sinkCount++
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
// 新建传输流,发送缓存的音视频帧
if !ok && AppConfig.GOPCache && s.existVideo {
s.dispatchGOPBuffer(transStream)
s.Sinks[sink.GetID()] = sink
s.TransStreamSinks[transStreamId][sink.GetID()] = sink
// 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && s.existVideo {
s.DispatchGOPBuffer(transStream)
}
return true
@@ -415,38 +504,31 @@ func (s *PublishSource) RemoveSink(sink Sink) {
func (s *PublishSource) RemoveSinkWithID(id SinkID) {
s.PostEvent(func() {
for _, transStream := range s.TransStreams {
if sink, _ := transStream.RemoveSink(id); sink != nil {
s.doRemoveSink(sink)
break
}
sink, ok := s.Sinks[id]
if ok {
s.doRemoveSink(sink)
}
})
}
func (s *PublishSource) doRemoveSink(sink Sink) bool {
id := sink.GetTransStreamID()
if id > 0 {
transStream := s.TransStreams[id]
transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()]
delete(s.Sinks, sink.GetID())
delete(transStreamSinks, sink.GetID())
// 从输出流中删除Sink
_, b := transStream.RemoveSink(sink.GetID())
if b {
s.sinkCount--
s.lastStreamEndTime = time.Now()
HookPlayDoneEvent(sink)
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
return true
}
s.sinkCount--
s.lastStreamEndTime = time.Now()
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
if sink.GetProtocol() == TransStreamHls {
// 从HLS拉流队列删除Sink
SinkManager.Remove(sink.GetID())
}
// 从等待队列中删除Sink
_, b := RemoveSinkFromWaitingQueue(sink.GetSourceID(), sink.GetID())
// 从HLS拉流队列删除
SinkManager.Remove(sink.GetID())
return b
sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()])
HookPlayDoneEvent(sink)
return true
}
func (s *PublishSource) SetState(state SessionState) {
@@ -458,7 +540,9 @@ func (s *PublishSource) DoClose() {
return
}
log.Sugar.Infof("关闭推流源 id: %s", s.ID)
s.closed = true
log.Sugar.Infof("关闭推流源: %s", s.ID)
if s.TransDeMuxer != nil {
s.TransDeMuxer.Close()
@@ -479,6 +563,7 @@ func (s *PublishSource) DoClose() {
s.gopBuffer = nil
}
// 停止所有计时器
if s.probeTimer != nil {
s.probeTimer.Stop()
}
@@ -491,6 +576,7 @@ func (s *PublishSource) DoClose() {
s.idleTimer.Stop()
}
// 关闭录制流
if s.recordSink != nil {
s.recordSink.Close()
}
@@ -503,36 +589,46 @@ func (s *PublishSource) DoClose() {
log.Sugar.Errorf("删除源失败 source:%s err:%s", s.ID, err.Error())
}
// 将所有Sink添加到等待队列
// 关闭所有输出流
for _, transStream := range s.TransStreams {
transStream.Close()
transStream.PopAllSink(func(sink Sink) {
sink.SetTransStreamID(0)
if s.recordSink == sink {
return
}
{
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.ID, sink)
}
}
if SessionStateClosed != sink.GetState() {
sink.Flush()
}
})
// 发送剩余包
data, ts, _ := transStream.Close()
if len(data) > 0 {
s.DispatchBuffer(transStream, -1, data, ts, true)
}
}
// 将所有sink添加到等待队列
for _, sink := range s.Sinks {
transStreamID := sink.GetTransStreamID()
sink.SetTransStreamID(0)
if s.recordSink == sink {
return
}
{
sink.Lock()
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.ID, sink)
}
sink.UnLock()
}
if SessionStateClosed != sink.GetState() {
sink.StopStreaming(s.TransStreams[transStreamID])
}
}
s.closed = true
s.TransStreams = nil
s.Sinks = nil
s.TransStreamSinks = nil
// 异步hook
go func() {
if s.Conn != nil {
s.Conn.Close()
@@ -582,12 +678,12 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
//设置GOP缓存溢出回调
// 设置GOP缓存溢出回调
s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
}
}
// 解析完所有track后, 做一些初始化工作
// 解析完所有track后, 创建各种输出流
func (s *PublishSource) writeHeader() {
if s.completed {
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.ID)
@@ -600,7 +696,7 @@ func (s *PublishSource) writeHeader() {
}
if len(s.originStreams.All()) == 0 {
log.Sugar.Errorf("没有一路, 删除source:%s", s.ID)
log.Sugar.Errorf("没有一路track, 删除source: %s", s.ID)
s.DoClose()
return
}
@@ -608,7 +704,7 @@ func (s *PublishSource) writeHeader() {
// 创建录制流和HLS
s.CreateDefaultOutStreams()
// 将等待队列的Sink添加到输出流队列
// 将等待队列的sink添加到输出流队列
sinks := PopWaitingSinks(s.ID)
if s.recordSink != nil {
sinks = append(sinks, s.recordSink)
@@ -659,8 +755,8 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
}
// 分发给各个传输流
for _, stream_ := range s.TransStreams {
stream_.Input(packet)
for _, transStream := range s.TransStreams {
s.DispatchPacket(transStream, packet)
}
// 未开启GOP缓存或只存在音频流, 释放掉内存
@@ -716,3 +812,9 @@ func (s *PublishSource) CreateTime() time.Time {
func (s *PublishSource) SetCreateTime(time time.Time) {
s.createTime = time
}
func (s *PublishSource) PlaySink(sink Sink) {
s.PostEvent(func() {
})
}

View File

@@ -36,13 +36,13 @@ const (
)
const (
SessionStateCreate = SessionState(1) //新建状态
SessionStateHandshaking = SessionState(2) //握手中
SessionStateHandshakeFailure = SessionState(3) //握手失败
SessionStateHandshakeDone = SessionState(4) //握手完成
SessionStateWait = SessionState(5) //位于等待队列中
SessionStateTransferring = SessionState(6) //推拉流中
SessionStateClosed = SessionState(7) //关闭状态
SessionStateCreate = SessionState(1) // 新建状态
SessionStateHandshaking = SessionState(2) // 握手中
SessionStateHandshakeFailure = SessionState(3) // 握手失败
SessionStateHandshakeDone = SessionState(4) // 握手完成
SessionStateWait = SessionState(5) // 位于等待队列中
SessionStateTransferring = SessionState(6) // 推拉流中
SessionStateClosed = SessionState(7) // 关闭状态
)
func (s SourceType) ToString() string {

View File

@@ -1,53 +1,68 @@
package stream
import (
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
)
// TransStream 将AVPacket封装成传输流转发给各个Sink
// TransStream 将AVPacket封装成传输流
type TransStream interface {
Init()
GetID() TransStreamID
Input(packet utils.AVPacket) error
SetID(id TransStreamID)
Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
AddTrack(stream utils.AVStream) error
TrackCount() int
GetTracks() []utils.AVStream
WriteHeader() error
AddSink(sink Sink) error
// GetProtocol 返回输出流协议
GetProtocol() TransStreamProtocol
ExistSink(id SinkID) bool
// ReadExtraData 获取封装后的编码器扩展数据
ReadExtraData(timestamp int64) ([][]byte, int64, error)
RemoveSink(id SinkID) (Sink, bool)
// ReadKeyFrameBuffer 读取已经缓存的包含关键视频帧的输出流
ReadKeyFrameBuffer() ([][]byte, int64, error)
PopAllSink(handler func(sink Sink))
Close() ([][]byte, int64, error)
AllSink() []Sink
ClearOutStreamBuffer()
Close() error
AppendOutStreamBuffer(buffer []byte)
SendPacket(data []byte) error
// OutStreamBufferCapacity 返回输出流缓冲区的容量大小, 输出流缓冲区同时作为向sink推流的发送缓冲区, 容量大小决定向sink异步推流的队列大小;
OutStreamBufferCapacity() int
Protocol() TransStreamProtocol
IsExistVideo() bool
}
type BaseTransStream struct {
Sinks map[SinkID]Sink
//muxer stream.Muxer
ID TransStreamID
Tracks []utils.AVStream
Completed bool
ExistVideo bool
Protocol_ TransStreamProtocol
Protocol TransStreamProtocol
OutBuffer [][]byte // 完成封装的输出流队列
OutBufferSize int
}
func (t *BaseTransStream) Init() {
t.Sinks = make(map[SinkID]Sink, 64)
func (t *BaseTransStream) GetID() TransStreamID {
return t.ID
}
func (t *BaseTransStream) Input(packet utils.AVPacket) error {
return nil
func (t *BaseTransStream) SetID(id TransStreamID) {
t.ID = id
}
func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
return nil, -1, false, nil
}
func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
@@ -58,82 +73,68 @@ func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
return nil
}
func (t *BaseTransStream) AddSink(sink Sink) error {
t.Sinks[sink.GetID()] = sink
sink.Start()
return nil
func (t *BaseTransStream) Close() ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *BaseTransStream) ExistSink(id SinkID) bool {
_, ok := t.Sinks[id]
return ok
func (t *BaseTransStream) GetProtocol() TransStreamProtocol {
return t.Protocol
}
func (t *BaseTransStream) RemoveSink(id SinkID) (Sink, bool) {
sink, ok := t.Sinks[id]
if ok {
delete(t.Sinks, id)
func (t *BaseTransStream) ClearOutStreamBuffer() {
t.OutBufferSize = 0
}
func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte) {
if t.OutBufferSize+1 > len(t.OutBuffer) {
// 扩容
size := (t.OutBufferSize + 1) * 2
newBuffer := make([][]byte, size)
for i := 0; i < t.OutBufferSize; i++ {
newBuffer[i] = t.OutBuffer[i]
}
t.OutBuffer = newBuffer
}
return sink, ok
t.OutBuffer[t.OutBufferSize] = buffer
t.OutBufferSize++
}
func (t *BaseTransStream) PopAllSink(handler func(sink Sink)) {
for _, sink := range t.Sinks {
handler(sink)
}
t.Sinks = nil
func (t *BaseTransStream) OutStreamBufferCapacity() int {
return 0
}
func (t *BaseTransStream) AllSink() []Sink {
//TODO implement me
panic("implement me")
func (t *BaseTransStream) TrackCount() int {
return len(t.Tracks)
}
func (t *BaseTransStream) Close() error {
return nil
func (t *BaseTransStream) GetTracks() []utils.AVStream {
return t.Tracks
}
func (t *BaseTransStream) SendPacket(data []byte) error {
for _, sink := range t.Sinks {
sink.Input(data)
}
return nil
func (t *BaseTransStream) IsExistVideo() bool {
return t.ExistVideo
}
func (t *BaseTransStream) Protocol() TransStreamProtocol {
return t.Protocol_
func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
return nil, 0, nil
}
type TCPTransStream struct {
BaseTransStream
// 合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
// 起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
// 看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区
}
func (t *TCPTransStream) AddSink(sink Sink) error {
if err := t.BaseTransStream.AddSink(sink); err != nil {
return err
}
if sink.GetConn() != nil {
sink.GetConn().(*transport.Conn).EnableAsyncWriteMode(AppConfig.WriteBufferNumber - 1)
}
return nil
}
func (t *TCPTransStream) SendPacket(data []byte) error {
for _, sink := range t.Sinks {
err := sink.Input(data)
if err == nil {
continue
}
if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送超时, 强制断开连接 sink:%s", sink.String())
sink.GetConn().Close()
}
}
return nil
func (t *TCPTransStream) OutStreamBufferCapacity() int {
utils.Assert(t.MWBuffer != nil)
return t.MWBuffer.Capacity()
}