From ec707c8dc1dc3e513e3227ee39138f2d3d4b624c Mon Sep 17 00:00:00 2001
From: yangjiechina <1534796060@qq.com>
Date: Mon, 28 Oct 2024 19:15:53 +0800
Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=BE=93=E5=87=BA=E6=B5=81?=
=?UTF-8?q?=E8=BD=AC=E5=8F=91,=20TransStream=E4=B8=8D=E5=86=8D=E6=8C=81?=
=?UTF-8?q?=E6=9C=89Sink?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
flv/flv_sink.go | 2 +-
flv/http_flv.go | 172 +++++++++++++------------
gb28181/forward_sink.go | 28 +---
gb28181/forward_stream.go | 18 ++-
gb28181/source.go | 13 +-
hls/hls_sink.go | 18 ++-
hls/hls_stream.go | 69 +++++-----
main.go | 6 +-
rtc.html | 8 --
rtc/rtc_sink.go | 121 +++++++++++++++---
rtc/rtc_stream.go | 125 +++---------------
rtmp/rtmp_sink.go | 8 +-
rtmp/rtmp_stream.go | 80 ++++++------
rtsp/rtsp_handler.go | 38 +++---
rtsp/rtsp_server.go | 2 +-
rtsp/rtsp_session.go | 8 +-
rtsp/rtsp_sink.go | 130 +++++++++----------
rtsp/rtsp_stream.go | 187 ++++++++++++---------------
rtsp/rtsp_track.go | 24 ++--
stream/config.go | 22 ++--
stream/hook_sink.go | 5 +
stream/mw_buffer.go | 26 ++--
stream/receive_buffer.go | 12 ++
stream/sink.go | 92 +++++++++----
stream/source.go | 262 ++++++++++++++++++++++++++------------
stream/source_utils.go | 14 +-
stream/trans_stream.go | 151 +++++++++++-----------
27 files changed, 894 insertions(+), 747 deletions(-)
diff --git a/flv/flv_sink.go b/flv/flv_sink.go
index d8a6781..103efe4 100644
--- a/flv/flv_sink.go
+++ b/flv/flv_sink.go
@@ -7,5 +7,5 @@ import (
)
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
- return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn)}
+ return &stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}
}
diff --git a/flv/http_flv.go b/flv/http_flv.go
index b999981..f4c37c5 100644
--- a/flv/http_flv.go
+++ b/flv/http_flv.go
@@ -9,24 +9,25 @@ import (
)
const (
- // HttpFlvBlockHeaderSize 在每块HttpFlv块头部,预留指定大小的数据, 用于描述flv数据块的长度信息
- //实际发送流 http-flv: |length\r\n|flv data\r\n
- //方便封装 http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
- //skip count是因为flv-length不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
+ // HttpFlvBlockHeaderSize 在每块http-flv流的头部,预留指定大小的数据, 用于描述flv数据块的长度信息
+ // http-flv是以文件流的形式传输http流, 格式如下: length\r\n|flv data\r\n
+ // 我们对http-flv-block的封装: |block size[4]|skip count[2]|length\r\n|flv data\r\n
+ // skip count是因为length长度不固定, 需要一个字段说明, 跳过多少字节才是http-flv数据
HttpFlvBlockHeaderSize = 20
)
-type httpTransStream struct {
+type TransStream struct {
stream.TCPTransStream
muxer libflv.Muxer
- mwBuffer stream.MergeWritingBuffer
header []byte
headerSize int
headerTagSize int
}
-func (t *httpTransStream) Input(packet utils.AVPacket) error {
+func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
+ t.ClearOutStreamBuffer()
+
var flvSize int
var data []byte
var videoKey bool
@@ -45,39 +46,42 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
videoKey = packet.KeyFrame()
}
- //关键帧都放在切片头部,所以需要创建新切片, 发送当前切片剩余流
- if videoKey && !t.mwBuffer.IsNewSegment() {
- t.forceFlushSegment()
+ // 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
+ if videoKey && !t.MWBuffer.IsNewSegment() {
+ segment := t.forceFlushSegment()
+ t.AppendOutStreamBuffer(segment)
}
var n int
var separatorSize int
- //新的合并写切片, 预留包长字节
- if t.mwBuffer.IsNewSegment() {
+ // 新的合并写切片, 预留包长字节
+ if t.MWBuffer.IsNewSegment() {
separatorSize = HttpFlvBlockHeaderSize
- //10字节描述flv包长, 前2个字节描述无效字节长度
+ // 10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockHeaderSize
}
- //切片末尾, 预留换行符
- if t.mwBuffer.IsFull(dts) {
+ // 切片末尾, 预留换行符
+ if t.MWBuffer.IsFull(dts) {
separatorSize += 2
}
- //分配flv block
- bytes := t.mwBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
+ // 分配flv block
+ bytes := t.MWBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
copy(bytes[n:], data)
- //合并写满再发
- if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
- t.sendUnpackedSegment(segment)
+ // 合并写满再发
+ if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
+ // 已经分配末尾换行符内存
+ t.AppendOutStreamBuffer(t.FormatSegment(segment))
}
- return nil
+
+ return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}
-func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
+func (t *TransStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
@@ -93,35 +97,7 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
return nil
}
-func (t *httpTransStream) AddSink(sink stream.Sink) error {
- utils.Assert(t.headerSize > 0)
-
- t.TCPTransStream.AddSink(sink)
- //发送sequence header
- t.sendSegment(sink, t.header[:t.headerSize])
-
- //发送当前内存池已有的合并写切片
- first := true
- t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
- if first {
- //修改第一个flv tag的pre tag size
- binary.BigEndian.PutUint32(bytes[20:], uint32(t.headerTagSize))
- first = false
- }
-
- //遍历发送合并写切片
- var index int
- for ; index < len(bytes); index += 4 {
- size := binary.BigEndian.Uint32(bytes[index:])
- t.sendSegment(sink, bytes[index:index+4+int(size)])
- index += int(size)
- }
- })
-
- return nil
-}
-
-func (t *httpTransStream) WriteHeader() error {
+func (t *TransStream) WriteHeader() error {
t.headerSize += t.muxer.WriteHeader(t.header[HttpFlvBlockHeaderSize:])
for _, track := range t.BaseTransStream.Tracks {
@@ -140,76 +116,110 @@ func (t *httpTransStream) WriteHeader() error {
t.headerTagSize = n - 15 + len(data) + 11
}
- //加上末尾换行符
+ // 加上末尾换行符
t.headerSize += 2
t.writeSeparator(t.header[:t.headerSize])
- t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
+ t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
-func (t *httpTransStream) Close() error {
- //发送剩余的流
- if !t.mwBuffer.IsNewSegment() {
- t.forceFlushSegment()
+func (t *TransStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
+ utils.Assert(t.headerSize > 0)
+ // 发送sequence header
+ return [][]byte{t.GetHttpFLVBlock(t.header[:t.headerSize])}, 0, nil
+}
+
+func (t *TransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
+ t.ClearOutStreamBuffer()
+
+ // 发送当前内存池已有的合并写切片
+ t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
+ if t.OutBufferSize < 1 {
+ // 修改第一个flv tag的pre tag size
+ binary.BigEndian.PutUint32(bytes[HttpFlvBlockHeaderSize:], uint32(t.headerTagSize))
+ }
+
+ // 遍历发送合并写切片
+ var index int
+ for ; index < len(bytes); index += 4 {
+ size := binary.BigEndian.Uint32(bytes[index:])
+ t.AppendOutStreamBuffer(t.GetHttpFLVBlock(bytes[index : index+4+int(size)]))
+ index += int(size)
+ }
+ })
+
+ return t.OutBuffer[:t.OutBufferSize], 0, nil
+}
+
+func (t *TransStream) Close() ([][]byte, int64, error) {
+ t.ClearOutStreamBuffer()
+
+ // 发送剩余的流
+ if !t.MWBuffer.IsNewSegment() {
+ if segment := t.forceFlushSegment(); len(segment) > 0 {
+ t.AppendOutStreamBuffer(segment)
+ }
}
- return nil
+
+ return t.OutBuffer[:t.OutBufferSize], 0, nil
}
-func (t *httpTransStream) forceFlushSegment() {
- t.mwBuffer.Reserve(2)
- segment := t.mwBuffer.FlushSegment()
- t.sendUnpackedSegment(segment)
+// 保存为完整的http-flv切片
+func (t *TransStream) forceFlushSegment() []byte {
+ // 预览末尾换行符
+ t.MWBuffer.Reserve(2)
+ segment := t.MWBuffer.FlushSegment()
+ return t.FormatSegment(segment)
}
-// 为单个sink发送flv切片, 切片已经添加分隔符
-func (t *httpTransStream) sendSegment(sink stream.Sink, data []byte) error {
- return sink.Input(data[t.computeSkipCount(data):])
+// GetHttpFLVBlock 跳过头部的无效数据,返回http-flv块
+func (t *TransStream) GetHttpFLVBlock(data []byte) []byte {
+ return data[t.computeSkipCount(data):]
}
-// 发送还未添加包长和换行符的切片
-func (t *httpTransStream) sendUnpackedSegment(segment []byte) {
+// FormatSegment 为切片添加包长和换行符
+func (t *TransStream) FormatSegment(segment []byte) []byte {
t.writeSeparator(segment)
- skip := t.computeSkipCount(segment)
- t.SendPacket(segment[skip:])
+ return t.GetHttpFLVBlock(segment)
}
-func (t *httpTransStream) computeSkipCount(data []byte) int {
+func (t *TransStream) computeSkipCount(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
// 为http-flv数据块添加长度和换行符
// @dst http-flv数据块, 头部需要空出HttpFlvBlockLengthSize字节长度, 末尾空出2字节换行符
-func (t *httpTransStream) writeSeparator(dst []byte) {
- //http-flv: length\r\n|flv data\r\n
- //http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
+func (t *TransStream) writeSeparator(dst []byte) {
+ // http-flv: length\r\n|flv data\r\n
+ // http-flv-block: |block size[4]|skip count[2]|length\r\n|flv data\r\n
- //写block size
+ // 写block size
binary.BigEndian.PutUint32(dst, uint32(len(dst)-4))
- //写flv实际长度字符串, 16进制表达
+ // 写flv实际长度字符串, 16进制表达
flvSize := len(dst) - HttpFlvBlockHeaderSize - 2
hexStr := fmt.Sprintf("%X", flvSize)
- //+2是跳过length后的换行符
+ // +2是跳过length后的换行符
n := len(hexStr) + 2
copy(dst[HttpFlvBlockHeaderSize-n:], hexStr)
- //写跳过字节数量
- //-6是block size和skip count字段合计长度
+ // 写跳过字节数量
+ // -6是block size和skip count字段合计长度
skipCount := HttpFlvBlockHeaderSize - n - 6
binary.BigEndian.PutUint16(dst[4:], uint16(skipCount))
- //flv length字段和flv数据之间的换行符
+ // flv length字段和flv数据之间的换行符
dst[HttpFlvBlockHeaderSize-2] = 0x0D
dst[HttpFlvBlockHeaderSize-1] = 0x0A
- //末尾换行符
+ // 末尾换行符
dst[len(dst)-2] = 0x0D
dst[len(dst)-1] = 0x0A
}
func NewHttpTransStream() stream.TransStream {
- return &httpTransStream{
+ return &TransStream{
muxer: libflv.NewMuxer(),
header: make([]byte, 1024),
headerSize: HttpFlvBlockHeaderSize,
diff --git a/gb28181/forward_sink.go b/gb28181/forward_sink.go
index e2c6617..2fea04a 100644
--- a/gb28181/forward_sink.go
+++ b/gb28181/forward_sink.go
@@ -1,7 +1,6 @@
package gb28181
import (
- "encoding/binary"
"github.com/lkmio/avformat/librtp"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
@@ -20,7 +19,6 @@ type ForwardSink struct {
setup SetupType
socket transport.ITransport
ssrc uint32
- buffer *stream.ReceiveBuffer //发送缓冲区
}
func (f *ForwardSink) OnConnected(conn net.Conn) []byte {
@@ -41,7 +39,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
f.Close()
}
-func (f *ForwardSink) Input(data []byte) error {
+func (f *ForwardSink) Write(index int, data [][]byte, ts int64) error {
if SetupUDP != f.setup && f.Conn == nil {
return nil
}
@@ -52,23 +50,13 @@ func (f *ForwardSink) Input(data []byte) error {
}
// 修改为与上级协商的SSRC
- librtp.ModifySSRC(data, f.ssrc)
+ librtp.ModifySSRC(data[0], f.ssrc)
if SetupUDP == f.setup {
- // UDP转发, 不拷贝直接发送
- f.socket.(*transport.UDPClient).Write(data)
+ f.socket.(*transport.UDPClient).Write(data[0][2:])
} else {
- // TCP转发, 拷贝一次再发送
- block := f.buffer.GetBlock()
- copy(block[2:], data)
- binary.BigEndian.PutUint16(block, uint16(len(data)))
-
- if _, err := f.Conn.Write(block[:2+len(data)]); err == nil {
- return nil
- } else if _, ok := err.(*transport.ZeroWindowSizeError); ok {
- log.Sugar.Errorf("发送缓冲区阻塞")
- f.Conn.Close()
- f.Conn = nil
+ if _, err := f.Conn.Write(data[0]); err != nil {
+ return err
}
}
@@ -107,6 +95,7 @@ func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stre
return nil, 0, err
}
+ sink.TCPStreaming = true
sink.socket = server
} else if SetupPassive == setup {
client := transport.TCPClient{}
@@ -135,14 +124,11 @@ func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stre
return nil, 0, err
}
+ sink.TCPStreaming = true
sink.socket = &client
} else {
utils.Assert(false)
}
- if SetupUDP != setup {
- sink.buffer = stream.NewReceiveBuffer(RTPOverTCPPacketSize, TcpStreamForwardBufferBlockSize)
- }
-
return sink, sink.socket.ListenPort(), nil
}
diff --git a/gb28181/forward_stream.go b/gb28181/forward_stream.go
index 78b2a9b..e7e42a9 100644
--- a/gb28181/forward_stream.go
+++ b/gb28181/forward_stream.go
@@ -1,6 +1,7 @@
package gb28181
import (
+ "encoding/binary"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/stream"
)
@@ -8,14 +9,29 @@ import (
// ForwardStream 国标级联转发流, 下级推什么, 就向上级发什么.
type ForwardStream struct {
stream.BaseTransStream
+ buffer *stream.ReceiveBuffer
}
func (f *ForwardStream) WriteHeader() error {
return nil
}
+func (f *ForwardStream) WrapData(data []byte) []byte {
+ block := f.buffer.GetBlock()
+ copy(block[2:], data)
+ binary.BigEndian.PutUint16(block, uint16(len(data)))
+ return block
+}
+
+func (f *ForwardStream) OutStreamBufferCapacity() int {
+ return f.buffer.BlockCount()
+}
+
func NewTransStream() (stream.TransStream, error) {
- return &ForwardStream{BaseTransStream: stream.BaseTransStream{Protocol_: stream.TransStreamGBStreamForward}}, nil
+ return &ForwardStream{
+ BaseTransStream: stream.BaseTransStream{Protocol: stream.TransStreamGBStreamForward},
+ buffer: stream.NewReceiveBuffer(1500, 512),
+ }, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
diff --git a/gb28181/source.go b/gb28181/source.go
index 81db021..b406234 100644
--- a/gb28181/source.go
+++ b/gb28181/source.go
@@ -69,18 +69,21 @@ func (source *BaseGBSource) Init(receiveQueueSize int) {
source.PublishSource.Init(receiveQueueSize)
}
-// Input 输入rtp包, 处理PS流, 负责解析->封装->推流. 所有GBSource, 均到此处处理, 在event协程调用此函数
+// Input 输入rtp包, 处理PS流, 负责解析->封装->推流
func (source *BaseGBSource) Input(data []byte) error {
// 国标级联转发
for _, transStream := range source.TransStreams {
- if transStream.Protocol() != stream.TransStreamGBStreamForward {
+ if transStream.GetProtocol() != stream.TransStreamGBStreamForward {
continue
}
- transStream.(*ForwardStream).SendPacket(data)
+ bytes := transStream.(*ForwardStream).WrapData(data)
+ rtpPacket := [1][]byte{bytes}
+ source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true)
}
packet := rtp.Packet{}
+ packet.Marshal()
_ = packet.Unmarshal(data)
return source.deMuxerCtx.Input(packet.Payload)
}
@@ -228,13 +231,13 @@ func (source *BaseGBSource) correctTimestamp(packet utils.AVPacket, dts, pts int
func (source *BaseGBSource) Close() {
log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.String())
- //释放收流端口
+ // 释放收流端口
if source.transport != nil {
source.transport.Close()
source.transport = nil
}
- //删除ssrc关联
+ // 删除ssrc关联
if !stream.AppConfig.GB28181.IsMultiPort() {
if SharedTCPServer != nil {
SharedTCPServer.filter.RemoveSource(source.ssrc)
diff --git a/hls/hls_sink.go b/hls/hls_sink.go
index 39147c1..9751c8c 100644
--- a/hls/hls_sink.go
+++ b/hls/hls_sink.go
@@ -27,7 +27,19 @@ func (s *M3U8Sink) SendM3U8Data(data *string) error {
return nil
}
-func (s *M3U8Sink) Start() {
+func (s *M3U8Sink) StartStreaming(transStream stream.TransStream) error {
+ hls := transStream.(*TransStream)
+
+ if hls.m3u8.Size() > 0 {
+ if err := s.SendM3U8Data(&hls.m3u8StringFormat); err != nil {
+ return err
+ }
+ } else {
+ // m3u8文件中还没有切片时, 将sink添加到等待队列
+ hls.m3u8Sinks[s.GetID()] = s
+ }
+
+ // 开启拉流超时计时器, 如果拉流端查时间没有拉流, 关闭sink
timeout := time.Duration(stream.AppConfig.IdleTimeout)
if timeout < time.Second {
timeout = time.Duration(stream.AppConfig.Hls.Duration) * 2 * 3 * time.Second
@@ -43,6 +55,8 @@ func (s *M3U8Sink) Start() {
s.playTimer.Reset(timeout)
})
+
+ return nil
}
func (s *M3U8Sink) GetM3U8String() string {
@@ -67,7 +81,7 @@ func (s *M3U8Sink) Close() {
func NewM3U8Sink(id stream.SinkID, sourceId string, cb func(m3u8 []byte), sessionId string) stream.Sink {
return &M3U8Sink{
- BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamHls},
+ BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamHls, TCPStreaming: true},
cb: cb,
sessionId: sessionId,
}
diff --git a/hls/hls_stream.go b/hls/hls_stream.go
index 86cdd76..569c5b0 100644
--- a/hls/hls_stream.go
+++ b/hls/hls_stream.go
@@ -16,12 +16,12 @@ type tsContext struct {
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互,以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
- url string // @See transStream.tsUrl
+ url string // @See TransStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
-type transStream struct {
+type TransStream struct {
stream.BaseTransStream
muxer libmpeg.TSMuxer
context *tsContext
@@ -39,9 +39,9 @@ type transStream struct {
m3u8StringFormat string // 一个协程写, 多个协程读, 不用加锁保护
}
-func (t *transStream) Input(packet utils.AVPacket) error {
+func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
if packet.Index() >= t.muxer.TrackCount() {
- return fmt.Errorf("track not available")
+ return nil, -1, false, fmt.Errorf("track not available")
}
// 创建一下个切片
@@ -51,31 +51,33 @@ func (t *transStream) Input(packet utils.AVPacket) error {
if t.context.file != nil {
err := t.flushSegment(false)
if err != nil {
- return err
+ return nil, -1, false, err
}
}
// 创建新的切片
if err := t.createSegment(); err != nil {
- return err
+ return nil, -1, false, err
}
}
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
- return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
+ t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
} else {
- return t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
+ t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
}
+
+ return nil, -1, true, nil
}
-func (t *transStream) AddTrack(stream utils.AVStream) error {
- err := t.BaseTransStream.AddTrack(stream)
- if err != nil {
+func (t *TransStream) AddTrack(stream utils.AVStream) error {
+ if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
+ var err error
if utils.AVMediaTypeVideo == stream.Type() {
data := stream.CodecParameters().AnnexBExtraData()
_, err = t.muxer.AddTrack(stream.Type(), stream.CodecId(), data)
@@ -85,26 +87,15 @@ func (t *transStream) AddTrack(stream utils.AVStream) error {
return err
}
-func (t *transStream) WriteHeader() error {
+func (t *TransStream) WriteHeader() error {
return t.createSegment()
}
-func (t *transStream) AddSink(sink stream.Sink) error {
- t.BaseTransStream.AddSink(sink)
-
- if t.m3u8.Size() > 0 {
- return sink.(*M3U8Sink).SendM3U8Data(&t.m3u8StringFormat)
- }
-
- t.m3u8Sinks[sink.GetID()] = sink.(*M3U8Sink)
- return nil
-}
-
-func (t *transStream) onTSWrite(data []byte) {
+func (t *TransStream) onTSWrite(data []byte) {
t.context.writeBufferSize += len(data)
}
-func (t *transStream) onTSAlloc(size int) []byte {
+func (t *TransStream) onTSAlloc(size int) []byte {
n := len(t.context.writeBuffer) - t.context.writeBufferSize
if n < size {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
@@ -114,7 +105,7 @@ func (t *transStream) onTSAlloc(size int) []byte {
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
}
-func (t *transStream) flushSegment(end bool) error {
+func (t *TransStream) flushSegment(end bool) error {
defer func() {
t.context.segmentSeq++
}()
@@ -165,7 +156,7 @@ func (t *transStream) flushSegment(end bool) error {
}
// 创建一个新的ts切片
-func (t *transStream) createSegment() error {
+func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
@@ -187,7 +178,7 @@ func (t *transStream) createSegment() error {
return err
}
- // 继续创建, 认为是文件名冲突, 并且文件已经被打开.
+ // 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
t.context.segmentSeq++
}
@@ -196,7 +187,7 @@ func (t *transStream) createSegment() error {
return nil
}
-func (t *transStream) Close() error {
+func (t *TransStream) Close() ([][]byte, int64, error) {
var err error
if t.context.file != nil {
@@ -215,7 +206,7 @@ func (t *transStream) Close() error {
t.m3u8File = nil
}
- return err
+ return nil, 0, err
}
func DeleteOldSegments(id string) {
@@ -256,7 +247,7 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
return nil, err
}
- stream_ := &transStream{
+ transStream := &TransStream{
m3u8Name: m3u8Name,
tsFormat: tsFormat,
tsUrl: tsUrl,
@@ -267,22 +258,22 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
// 创建TS封装器
muxer := libmpeg.NewTSMuxer()
- muxer.SetWriteHandler(stream_.onTSWrite)
- muxer.SetAllocHandler(stream_.onTSAlloc)
+ muxer.SetWriteHandler(transStream.onTSWrite)
+ muxer.SetAllocHandler(transStream.onTSAlloc)
// ts封装上下文对象
- stream_.context = &tsContext{
+ transStream.context = &tsContext{
segmentSeq: 0,
writeBuffer: make([]byte, 1024*1024),
writeBufferSize: 0,
}
- stream_.muxer = muxer
- stream_.m3u8 = NewM3U8Writer(playlistLength)
- stream_.m3u8File = file
+ transStream.muxer = muxer
+ transStream.m3u8 = NewM3U8Writer(playlistLength)
+ transStream.m3u8File = file
- stream_.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
- return stream_, nil
+ transStream.m3u8Sinks = make(map[stream.SinkID]*M3U8Sink, 24)
+ return transStream, nil
}
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
diff --git a/main.go b/main.go
index 3542205..73a0b99 100644
--- a/main.go
+++ b/main.go
@@ -88,7 +88,7 @@ func main() {
log.Sugar.Info("启动rtsp服务成功 addr:", rtspAddr.String())
}
- log.Sugar.Info("启动Http服务 addr:", stream.ListenAddr(stream.AppConfig.Http.Port))
+ log.Sugar.Info("启动http服务 addr:", stream.ListenAddr(stream.AppConfig.Http.Port))
go startApiServer(net.JoinHostPort(stream.AppConfig.ListenIP, strconv.Itoa(stream.AppConfig.Http.Port)))
//单端口模式下, 启动时就创建收流端口
@@ -101,7 +101,7 @@ func main() {
}
gb28181.SharedUDPServer = server
- log.Sugar.Info("启动GB28181 UDP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
+ log.Sugar.Info("启动GB28181 udp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
}
if stream.AppConfig.GB28181.IsEnableTCP() {
@@ -111,7 +111,7 @@ func main() {
}
gb28181.SharedTCPServer = server
- log.Sugar.Info("启动GB28181 TCP收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
+ log.Sugar.Info("启动GB28181 tcp收流端口成功:" + stream.ListenAddr(stream.AppConfig.GB28181.Port[0]))
}
}
diff --git a/rtc.html b/rtc.html
index 6aafeb1..2bc9018 100644
--- a/rtc.html
+++ b/rtc.html
@@ -36,14 +36,6 @@
-
diff --git a/rtc/rtc_sink.go b/rtc/rtc_sink.go
index 31fcb1c..7b76f83 100644
--- a/rtc/rtc_sink.go
+++ b/rtc/rtc_sink.go
@@ -1,13 +1,16 @@
package rtc
import (
+ "fmt"
+ "github.com/lkmio/avformat/utils"
+ "github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"time"
)
-type sink struct {
+type Sink struct {
stream.BaseSink
offer string
@@ -20,31 +23,117 @@ type sink struct {
cb func(sdp string)
}
-func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
- return &sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
-}
+func (s *Sink) StartStreaming(transStream stream.TransStream) error {
+ // 创建PeerConnection
+ var videoTrack *webrtc.TrackLocalStaticSample
+ s.setTrackCount(transStream.TrackCount())
-func (s *sink) setTrackCount(count int) {
+ connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
+ connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
+
+ })
+
+ tracks := transStream.GetTracks()
+ for index, track := range tracks {
+ var mimeType string
+ var id string
+ if utils.AVCodecIdH264 == track.CodecId() {
+ mimeType = webrtc.MimeTypeH264
+ } else if utils.AVCodecIdH265 == track.CodecId() {
+ mimeType = webrtc.MimeTypeH265
+ } else if utils.AVCodecIdAV1 == track.CodecId() {
+ mimeType = webrtc.MimeTypeAV1
+ } else if utils.AVCodecIdVP8 == track.CodecId() {
+ mimeType = webrtc.MimeTypeVP8
+ } else if utils.AVCodecIdVP9 == track.CodecId() {
+ mimeType = webrtc.MimeTypeVP9
+ } else if utils.AVCodecIdOPUS == track.CodecId() {
+ mimeType = webrtc.MimeTypeOpus
+ } else if utils.AVCodecIdPCMALAW == track.CodecId() {
+ mimeType = webrtc.MimeTypePCMA
+ } else if utils.AVCodecIdPCMMULAW == track.CodecId() {
+ mimeType = webrtc.MimeTypePCMU
+ } else {
+ log.Sugar.Errorf("codec %s not compatible with webrtc", track.CodecId())
+ continue
+ }
+
+ if utils.AVMediaTypeAudio == track.Type() {
+ id = "audio"
+ } else {
+ id = "video"
+ }
+
+ videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
+ if err != nil {
+ panic(err)
+ } else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
+ return err
+ } else if _, err = connection.AddTrack(videoTrack); err != nil {
+ return err
+ }
+
+ s.addTrack(index, videoTrack)
+ }
+
+ if len(connection.GetTransceivers()) == 0 {
+ return fmt.Errorf("no track added")
+ } else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: s.offer}); err != nil {
+ return err
+ }
+
+ complete := webrtc.GatheringCompletePromise(connection)
+ answer, err := connection.CreateAnswer(nil)
+ if err != nil {
+ return err
+ } else if err = connection.SetLocalDescription(answer); err != nil {
+ return err
+ }
+
+ <-complete
+ connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
+ s.state = state
+ log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), s.GetID(), s.SourceID)
+
+ if state > webrtc.ICEConnectionStateDisconnected {
+ log.Sugar.Errorf("webrtc peer断开连接 sink: %v source :%s", s.GetID(), s.SourceID)
+ s.Close()
+ }
+ })
+
+ s.peer = connection
+ // offer的sdp, 应答给http请求
+ s.cb(connection.LocalDescription().SDP)
+ return nil
+}
+func (s *Sink) setTrackCount(count int) {
s.tracks = make([]*webrtc.TrackLocalStaticSample, count)
}
-func (s *sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error {
+func (s *Sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error {
s.tracks[index] = track
return nil
}
-func (s *sink) SendHeader(data []byte) error {
- s.cb(string(data))
- return nil
-}
-
-func (s *sink) input(index int, data []byte, ts uint32) error {
+func (s *Sink) Write(index int, data [][]byte, ts int64) error {
if s.tracks[index] == nil {
return nil
}
- return s.tracks[index].WriteSample(media.Sample{
- Data: data,
- Duration: time.Duration(ts) * time.Millisecond,
- })
+ for _, bytes := range data {
+ err := s.tracks[index].WriteSample(media.Sample{
+ Data: bytes,
+ Duration: time.Duration(ts) * time.Millisecond,
+ })
+
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
+ return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}
diff --git a/rtc/rtc_stream.go b/rtc/rtc_stream.go
index 620a87f..4b1373b 100644
--- a/rtc/rtc_stream.go
+++ b/rtc/rtc_stream.go
@@ -1,9 +1,7 @@
package rtc
import (
- "fmt"
"github.com/lkmio/avformat/utils"
- "github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
@@ -18,119 +16,27 @@ type transStream struct {
stream.BaseTransStream
}
-func (t *transStream) Input(packet utils.AVPacket) error {
- for _, iSink := range t.Sinks {
- sink_ := iSink.(*sink)
- if sink_.state < webrtc.ICEConnectionStateConnected {
- continue
+func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
+ t.ClearOutStreamBuffer()
+
+ if utils.AVMediaTypeAudio == packet.MediaType() {
+ t.AppendOutStreamBuffer(packet.Data())
+ } else if utils.AVMediaTypeVideo == packet.MediaType() {
+ if packet.KeyFrame() {
+ extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
+ t.AppendOutStreamBuffer(extra)
}
- if utils.AVMediaTypeAudio == packet.MediaType() {
- sink_.input(packet.Index(), packet.Data(), uint32(packet.Duration(1000)))
- } else if utils.AVMediaTypeVideo == packet.MediaType() {
- if packet.KeyFrame() {
- extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().AnnexBExtraData()
- sink_.input(packet.Index(), extra, 0)
- }
-
- sink_.input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), uint32(packet.Duration(1000)))
- }
+ t.AppendOutStreamBuffer(packet.Data())
}
- return nil
-}
-
-func (t *transStream) AddSink(sink_ stream.Sink) error {
- //创建PeerConnection
- var videoTrack *webrtc.TrackLocalStaticSample
- rtcSink := sink_.(*sink)
- rtcSink.setTrackCount(len(t.Tracks))
- connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
- connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
-
- })
-
- for index, track := range t.Tracks {
- var mimeType string
- var id string
- if utils.AVCodecIdH264 == track.CodecId() {
- mimeType = webrtc.MimeTypeH264
- } else if utils.AVCodecIdH265 == track.CodecId() {
- mimeType = webrtc.MimeTypeH265
- } else if utils.AVCodecIdAV1 == track.CodecId() {
- mimeType = webrtc.MimeTypeAV1
- } else if utils.AVCodecIdVP8 == track.CodecId() {
- mimeType = webrtc.MimeTypeVP8
- } else if utils.AVCodecIdVP9 == track.CodecId() {
- mimeType = webrtc.MimeTypeVP9
- } else if utils.AVCodecIdOPUS == track.CodecId() {
- mimeType = webrtc.MimeTypeOpus
- } else if utils.AVCodecIdPCMALAW == track.CodecId() {
- mimeType = webrtc.MimeTypePCMA
- } else if utils.AVCodecIdPCMMULAW == track.CodecId() {
- mimeType = webrtc.MimeTypePCMU
- } else {
- log.Sugar.Errorf("codec %d not compatible with webrtc", track.CodecId())
- continue
- }
-
- if utils.AVMediaTypeAudio == track.Type() {
- id = "audio"
- } else {
- id = "video"
- }
-
- videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
- if err != nil {
- panic(err)
- } else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
- return err
- } else if _, err = connection.AddTrack(videoTrack); err != nil {
- return err
- }
-
- rtcSink.addTrack(index, videoTrack)
- }
-
- if len(connection.GetTransceivers()) == 0 {
- return fmt.Errorf("no track added")
- } else if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil {
- return err
- }
-
- complete := webrtc.GatheringCompletePromise(connection)
- answer, err := connection.CreateAnswer(nil)
- if err != nil {
- return err
- } else if err = connection.SetLocalDescription(answer); err != nil {
- return err
- }
-
- <-complete
- connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
- rtcSink.state = state
- log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), rtcSink.GetID(), rtcSink.SourceID)
-
- if state > webrtc.ICEConnectionStateDisconnected {
- log.Sugar.Errorf("webrtc peer断开连接 sink:%v source:%s", rtcSink.GetID(), rtcSink.SourceID)
- rtcSink.Close()
- }
- })
-
- rtcSink.peer = connection
- rtcSink.SendHeader([]byte(connection.LocalDescription().SDP))
- return t.BaseTransStream.AddSink(sink_)
+ return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
}
func (t *transStream) WriteHeader() error {
return nil
}
-func NewTransStream() stream.TransStream {
- t := &transStream{}
- return t
-}
-
func InitConfig() {
setting := webrtc.SettingEngine{}
var ips []string
@@ -145,11 +51,11 @@ func InitConfig() {
panic(err)
}
- //设置公网ip和监听端口
+ // 设置公网ip和监听端口
setting.SetICEUDPMux(webrtc.NewICEUDPMux(nil, udpListener))
setting.SetNAT1To1IPs(ips, webrtc.ICECandidateTypeHost)
- //注册音视频编码器
+ // 注册音视频编码器
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
@@ -163,6 +69,11 @@ func InitConfig() {
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(setting))
}
+func NewTransStream() stream.TransStream {
+ t := &transStream{}
+ return t
+}
+
func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(), nil
}
diff --git a/rtmp/rtmp_sink.go b/rtmp/rtmp_sink.go
index 6331af6..c17cc81 100644
--- a/rtmp/rtmp_sink.go
+++ b/rtmp/rtmp_sink.go
@@ -12,11 +12,11 @@ type Sink struct {
stack *librtmp.Stack
}
-func (s *Sink) Start() {
- _ = s.stack.SendStreamBeginChunk(s.Conn)
+func (s *Sink) StartStreaming(_ stream.TransStream) error {
+ return s.stack.SendStreamBeginChunk(s.Conn)
}
-func (s *Sink) Flush() {
+func (s *Sink) StopStreaming(_ stream.TransStream) {
_ = s.stack.SendStreamEOFChunk(s.Conn)
}
@@ -27,7 +27,7 @@ func (s *Sink) Close() {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
return &Sink{
- BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE},
+ BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE, TCPStreaming: true},
stack: stack,
}
}
diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go
index 5e4f720..b11cf79 100644
--- a/rtmp/rtmp_stream.go
+++ b/rtmp/rtmp_stream.go
@@ -18,23 +18,18 @@ type transStream struct {
muxer libflv.Muxer
audioChunk librtmp.Chunk
videoChunk librtmp.Chunk
-
- //合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
- //起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后,mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
- //看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
- mwBuffer stream.MergeWritingBuffer //合并写同时作为, 用户态的发送缓冲区
}
-func (t *transStream) Input(packet utils.AVPacket) error {
- utils.Assert(t.BaseTransStream.Completed)
+func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
+ t.ClearOutStreamBuffer()
var data []byte
var chunk *librtmp.Chunk
var videoPkt bool
var videoKey bool
- //rtmp chunk消息体的数据大小
+ // rtmp chunk消息体的数据大小
var payloadSize int
- //先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
+ // 先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
var chunkPayloadOffset int
var dts int64
var pts int64
@@ -57,32 +52,32 @@ func (t *transStream) Input(packet utils.AVPacket) error {
payloadSize += chunkPayloadOffset + len(data)
}
- //遇到视频关键帧, 发送剩余的流, 创建新切片
+ // 遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey {
- if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
- t.SendPacket(segment)
+ if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
+ t.AppendOutStreamBuffer(segment)
}
}
- //分配内存
- //固定type0
+ // 分配内存
+ // 固定type0
chunkHeaderSize := 12
- //type3chunk数量
+ // type3chunk数量
numChunks := (payloadSize - 1) / t.chunkSize
rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
- //如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
+ // 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
if dts >= 0xFFFFFF && dts <= 0xFFFFFFFF {
rtmpMsgSize += (1 + numChunks) * 4
}
- allocate := t.mwBuffer.Allocate(rtmpMsgSize, dts, videoKey)
+ allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)
- //写chunk header
+ // 写chunk header
chunk.Length = payloadSize
chunk.Timestamp = uint32(dts)
n := chunk.ToBytes(allocate)
- //写flv
+ // 写flv
if videoPkt {
n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
} else {
@@ -92,25 +87,30 @@ func (t *transStream) Input(packet utils.AVPacket) error {
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
utils.Assert(len(allocate) == n)
- //合并写满了再发
- if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
- t.SendPacket(segment)
+ // 合并写满了再发
+ if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
+ t.AppendOutStreamBuffer(segment)
}
- return nil
+
+ return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}
-func (t *transStream) AddSink(sink stream.Sink) error {
+func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
utils.Assert(t.headerSize > 0)
- t.TCPTransStream.AddSink(sink)
- //发送sequence header
- sink.Input(t.header[:t.headerSize])
+ // 发送sequence header
+ return [][]byte{t.header[:t.headerSize]}, 0, nil
+}
- //发送当前内存池已有的合并写切片
- t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
- sink.Input(bytes)
+func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
+ t.ClearOutStreamBuffer()
+
+ // 发送当前内存池已有的合并写切片
+ t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
+ t.AppendOutStreamBuffer(bytes)
})
- return nil
+
+ return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func (t *transStream) WriteHeader() error {
@@ -136,7 +136,7 @@ func (t *transStream) WriteHeader() error {
utils.Assert(audioStream != nil || videoStream != nil)
- //初始化
+ // 初始化
t.BaseTransStream.Completed = true
t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer()
@@ -148,7 +148,7 @@ func (t *transStream) WriteHeader() error {
t.muxer.AddVideoTrack(videoCodecId)
}
- //统一生成rtmp拉流需要的数据头(chunk+sequence header)
+ // 生成推流的数据头(chunk+sequence header)
var n int
if audioStream != nil {
n += t.muxer.WriteAudioData(t.header[12:], true)
@@ -174,17 +174,19 @@ func (t *transStream) WriteHeader() error {
}
t.headerSize = n
-
- t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
+ t.MWBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil
}
-func (t *transStream) Close() error {
+func (t *transStream) Close() ([][]byte, int64, error) {
+ t.ClearOutStreamBuffer()
+
//发送剩余的流
- if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
- t.SendPacket(segment)
+ if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
+ t.AppendOutStreamBuffer(segment)
}
- return nil
+
+ return t.OutBuffer[:t.OutBufferSize], 0, nil
}
func NewTransStream(chunkSize int) stream.TransStream {
diff --git a/rtsp/rtsp_handler.go b/rtsp/rtsp_handler.go
index 288ef46..22aef24 100644
--- a/rtsp/rtsp_handler.go
+++ b/rtsp/rtsp_handler.go
@@ -66,7 +66,7 @@ func (h handler) Process(session *session, method string, url_ *url.URL, headers
//确保拉流要经过授权
state, ok := method2StateMap[method]
- if ok && state > SessionStateSetup && session.sink_ == nil {
+ if ok && state > SessionStateSetup && session.sink == nil {
return fmt.Errorf("please establish a session first")
}
@@ -107,17 +107,17 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
var response *http.Response
var body []byte
- //校验密码
+ // 校验密码
if h.password != "" {
- var success bool
+ var ok bool
authorization := request.headers.Get("Authorization")
if authorization != "" {
params, err := parseAuthParams(authorization)
- success = err == nil && DoAuthenticatePlainTextPassword(params, h.password)
+ ok = err == nil && DoAuthenticatePlainTextPassword(params, h.password)
}
- if !success {
+ if !ok {
response401 := NewResponse(http.StatusUnauthorized, request.headers.Get("Cseq"))
response401.Header.Set("WWW-Authenticate", generateAuthHeader("lkm"))
return response401, nil, nil
@@ -125,26 +125,27 @@ func (h handler) OnDescribe(request Request) (*http.Response, []byte, error) {
}
sinkId := stream.NetAddr2SinkId(request.session.conn.RemoteAddr())
- sink_ := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
+ sink := NewSink(sinkId, request.sourceId, request.session.conn, func(sdp string) {
+ // 响应sdp回调
response = NewOKResponse(request.headers.Get("Cseq"))
response.Header.Set("Content-Type", "application/sdp")
request.session.response(response, []byte(sdp))
})
- sink_.SetUrlValues(request.url.Query())
- _, code := stream.PreparePlaySink(sink_)
+ sink.SetUrlValues(request.url.Query())
+ _, code := stream.PreparePlaySinkWithReady(sink, false)
if utils.HookStateOK != code {
- return nil, nil, fmt.Errorf("hook failed. code:%d", code)
+ return nil, nil, fmt.Errorf("hook failed. code: %d", code)
}
- request.session.sink_ = sink_.(*sink)
+ request.session.sink = sink.(*Sink)
return nil, body, err
}
func (h handler) OnSetup(request Request) (*http.Response, []byte, error) {
var response *http.Response
- //修复rtsp拉流携带参数,参数解析失败.
+ // 修复解析拉流携带的参数失败问题
params := strings.ReplaceAll(request.url.RawQuery, "/?", "&")
query, err := url.ParseQuery(params)
if err != nil {
@@ -196,14 +197,14 @@ func (h handler) OnSetup(request Request) (*http.Response, []byte, error) {
}
ssrc := 0xFFFFFFFF
- rtpPort, rtcpPort, err := request.session.sink_.addSender(index, tcp, uint32(ssrc))
+ rtpPort, rtcpPort, err := request.session.sink.AddSender(index, tcp, uint32(ssrc))
if err != nil {
return nil, nil, err
}
responseHeader := transportHeader
if tcp {
- //修改interleaved为实际的stream index
+ // 修改interleaved为实际的stream index
responseHeader += ";interleaved=" + fmt.Sprintf("%d-%d", index, index)
} else {
responseHeader += ";server_port=" + fmt.Sprintf("%d-%d", rtpPort, rtcpPort)
@@ -225,7 +226,14 @@ func (h handler) OnPlay(request Request) (*http.Response, []byte, error) {
response.Header.Set("Session", sessionHeader)
}
- request.session.sink_.playing = true
+ sink := request.session.sink
+ sink.SetReady(true)
+ source := stream.SourceManager.Find(sink.GetSourceID())
+ if source == nil {
+ return nil, nil, fmt.Errorf("Source with ID %s does not exist.", request.sourceId)
+ }
+
+ source.AddSink(sink)
return response, nil, nil
}
@@ -239,7 +247,7 @@ func (h handler) OnPause(request Request) (*http.Response, []byte, error) {
return response, nil, nil
}
-func newHandler(password string) *handler {
+func NewHandler(password string) *handler {
h := handler{
methods: make(map[string]reflect.Value, 10),
password: password,
diff --git a/rtsp/rtsp_server.go b/rtsp/rtsp_server.go
index 328fdc0..28a5998 100644
--- a/rtsp/rtsp_server.go
+++ b/rtsp/rtsp_server.go
@@ -16,7 +16,7 @@ type Server interface {
func NewServer(password string) Server {
return &server{
- handler: newHandler(password),
+ handler: NewHandler(password),
}
}
diff --git a/rtsp/rtsp_session.go b/rtsp/rtsp_session.go
index 732d39c..8940d26 100644
--- a/rtsp/rtsp_session.go
+++ b/rtsp/rtsp_session.go
@@ -44,7 +44,7 @@ func init() {
type session struct {
conn net.Conn
- sink_ *sink
+ sink *Sink
sessionId string
writeBuffer *bytes.Buffer //响应体缓冲区
state SessionState
@@ -89,9 +89,9 @@ func (s *session) close() {
s.conn = nil
}
- if s.sink_ != nil {
- s.sink_.Close()
- s.sink_ = nil
+ if s.sink != nil {
+ s.sink.Close()
+ s.sink = nil
}
}
diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go
index 85457d5..7cb61cd 100644
--- a/rtsp/rtsp_sink.go
+++ b/rtsp/rtsp_sink.go
@@ -15,34 +15,31 @@ var (
TransportManger transport.Manager
)
-// rtsp拉流sink
+// Sink rtsp拉流sink
// 对于udp而言, 每个sink维护多个transport
-// tcp直接单端口传输
-type sink struct {
+// tcp使用信令链路传输
+type Sink struct {
stream.BaseSink
- senders []*librtp.RtpSender //一个rtsp源,可能存在多个流, 每个流都需要拉取
- sdpCb func(sdp string) //rtsp_stream生成sdp后,使用该回调给rtsp_session, 响应describe
-
- tcp bool //tcp拉流标记
- playing bool //是否已经收到play请求
+ senders []*librtp.RtpSender // 一个rtsp源, 可能存在多个流, 每个流都需要拉取
+ sdpCb func(sdp string) // sdp回调, 响应describe
}
-func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
- return &sink{
- stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
- nil,
- cb,
- false,
- false,
+func (s *Sink) StartStreaming(transStream stream.TransStream) error {
+ if s.senders == nil {
+ s.senders = make([]*librtp.RtpSender, transStream.TrackCount())
}
+
+ // sdp回调给sink, sink应答给describe请求
+ if s.sdpCb != nil {
+ s.sdpCb(transStream.(*TranStream).sdp)
+ s.sdpCb = nil
+ }
+
+ return nil
}
-func (s *sink) setSenderCount(count int) {
- s.senders = make([]*librtp.RtpSender, count)
-}
-
-func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, error) {
+func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, error) {
utils.Assert(index < cap(s.senders))
utils.Assert(s.senders[index] == nil)
@@ -54,9 +51,8 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
SSRC: ssrc,
}
- //tcp使用信令链路
if tcp {
- s.tcp = true
+ s.TCPStreaming = true
} else {
sender.Rtp, err = TransportManger.NewUDPServer("0.0.0.0")
if err != nil {
@@ -83,39 +79,43 @@ func (s *sink) addSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
return rtpPort, rtcpPort, err
}
-func (s *sink) input(index int, data []byte, rtpTime uint32) error {
- //拉流方还没有连上来
- utils.Assert(index < cap(s.senders))
+func (s *Sink) Write(index int, data [][]byte, rtpTime int64) error {
+ // 拉流方还没有连接上来
+ if index >= cap(s.senders) || s.senders[index] == nil {
+ return nil
+ }
- sender := s.senders[index]
- sender.PktCount++
- sender.OctetCount += len(data)
- if s.tcp {
- s.Conn.Write(data)
- } else {
- //发送rtcp sr包
- sender.RtpConn.Write(data)
+ for _, bytes := range data {
+ sender := s.senders[index]
+ sender.PktCount++
+ sender.OctetCount += len(bytes)
+ if s.TCPStreaming {
+ s.Conn.Write(bytes)
+ } else {
+ //发送rtcp sr包
+ sender.RtpConn.Write(bytes[OverTcpHeaderSize:])
- if sender.RtcpConn == nil || sender.PktCount%100 != 0 {
- return nil
+ if sender.RtcpConn == nil || sender.PktCount%100 != 0 {
+ continue
+ }
+
+ nano := uint64(time.Now().UnixNano())
+ ntp := (nano/1000000000 + 2208988800<<32) | (nano % 1000000000)
+ sr := rtcp.SenderReport{
+ SSRC: sender.SSRC,
+ NTPTime: ntp,
+ RTPTime: uint32(rtpTime),
+ PacketCount: uint32(sender.PktCount),
+ OctetCount: uint32(sender.OctetCount),
+ }
+
+ marshal, err := sr.Marshal()
+ if err != nil {
+ log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
+ }
+
+ sender.RtcpConn.Write(marshal)
}
-
- nano := uint64(time.Now().UnixNano())
- ntp := (nano/1000000000 + 2208988800<<32) | (nano % 1000000000)
- sr := rtcp.SenderReport{
- SSRC: sender.SSRC,
- NTPTime: ntp,
- RTPTime: rtpTime,
- PacketCount: uint32(sender.PktCount),
- OctetCount: uint32(sender.OctetCount),
- }
-
- marshal, err := sr.Marshal()
- if err != nil {
- log.Sugar.Errorf("创建rtcp sr消息失败 err:%s msg:%v", err.Error(), sr)
- }
-
- sender.RtcpConn.Write(marshal)
}
return nil
@@ -123,22 +123,10 @@ func (s *sink) input(index int, data []byte, rtpTime uint32) error {
// 拉流链路是否已经连接上
// 拉流测发送了play请求, 并且对于udp而言, 还需要收到nat穿透包
-func (s *sink) isConnected(index int) bool {
- return s.playing && (s.tcp || (s.senders[index] != nil && s.senders[index].RtpConn != nil))
+func (s *Sink) isConnected(index int) bool {
+ return s.TCPStreaming || (s.senders[index] != nil && s.senders[index].RtpConn != nil)
}
-
-// 发送rtp包总数
-func (s *sink) pktCount(index int) int {
- return s.senders[index].PktCount
-}
-
-// SendHeader 回调rtsp stream的sdp信息
-func (s *sink) SendHeader(data []byte) error {
- s.sdpCb(string(data))
- return nil
-}
-
-func (s *sink) Close() {
+func (s *Sink) Close() {
s.BaseSink.Close()
for _, sender := range s.senders {
@@ -155,3 +143,11 @@ func (s *sink) Close() {
}
}
}
+
+func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
+ return &Sink{
+ stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
+ nil,
+ cb,
+ }
+}
diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go
index a749630..a280bca 100644
--- a/rtsp/rtsp_stream.go
+++ b/rtsp/rtsp_stream.go
@@ -17,136 +17,87 @@ const (
OverTcpMagic = 0x24
)
-// rtsp传输流封装
+// TranStream rtsp传输流封装
// 低延迟是rtsp特性, 所以不考虑实现GOP缓存
-type tranStream struct {
+type TranStream struct {
stream.BaseTransStream
addr net.IPAddr
addrType string
urlFormat string
- rtpTracks []*rtspTrack
+ rtpTracks []*Track
sdp string
+ buffer *stream.ReceiveBuffer
}
-// rtpMuxer申请输出流内存的回调
-// 无论是tcp/udp拉流, 均使用同一块内存, 并且给tcp预留4字节的包长.
-func (t *tranStream) onAllocBuffer(params interface{}) []byte {
- return t.rtpTracks[params.(int)].buffer[OverTcpHeaderSize:]
-}
-
-// onRtpPacket 所有封装后的RTP流都将回调于此
-func (t *tranStream) onRtpPacket(data []byte, timestamp uint32, params interface{}) {
- //params传递track索引
- index := params.(int)
- track := t.rtpTracks[index]
-
- //保存带有sps和ssp等编码信息的rtp包, 对所有sink通用
- if track.cache && track.extraDataBuffer == nil {
- bytes := make([]byte, OverTcpHeaderSize+len(data))
- copy(bytes[OverTcpHeaderSize:], data)
-
- track.tmpExtraDataBuffer = append(track.tmpExtraDataBuffer, bytes)
- t.overTCP(bytes, index)
- return
- }
-
- //将rtp包发送给各个sink
- for _, value := range t.Sinks {
- sink_ := value.(*sink)
- if !sink_.isConnected(index) {
- continue
- }
-
- //为刚刚连接上的sink, 发送sps和pps等rtp包
- if sink_.pktCount(index) < 1 && utils.AVMediaTypeVideo == track.mediaType {
- seq := binary.BigEndian.Uint16(data[2:])
- count := len(track.extraDataBuffer)
-
- for i, rtp := range track.extraDataBuffer {
- //回滚rtp包的序号
- librtp.RollbackSeq(rtp[OverTcpHeaderSize:], int(seq)-(count-i-1))
- if sink_.tcp {
- sink_.input(index, rtp, 0)
- } else {
- sink_.input(index, rtp[OverTcpHeaderSize:], timestamp)
- }
- }
- }
-
- end := OverTcpHeaderSize + len(data)
- t.overTCP(track.buffer[:end], index)
-
- //发送rtp包
- if sink_.tcp {
- sink_.input(index, track.buffer[:end], 0)
- } else {
- sink_.input(index, data, timestamp)
- }
- }
-}
-
-func (t *tranStream) overTCP(data []byte, channel int) {
+func (t *TranStream) OverTCP(data []byte, channel int) {
data[0] = OverTcpMagic
data[1] = byte(channel)
binary.BigEndian.PutUint16(data[2:], uint16(len(data)-4))
}
-func (t *tranStream) Input(packet utils.AVPacket) error {
- stream_ := t.rtpTracks[packet.Index()]
+func (t *TranStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
+ t.ClearOutStreamBuffer()
+
+ var ts uint32
+ track := t.rtpTracks[packet.Index()]
+ track.seq = track.muxer.GetHeader().Seq
if utils.AVMediaTypeAudio == packet.MediaType() {
- stream_.muxer.Input(packet.Data(), uint32(packet.ConvertPts(stream_.rate)))
+ ts = uint32(packet.ConvertPts(track.rate))
+ t.PackRtpPayload(track.muxer, packet.Index(), packet.Data(), ts)
} else if utils.AVMediaTypeVideo == packet.MediaType() {
+ ts = uint32(packet.ConvertPts(track.rate))
+ data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
+ t.PackRtpPayload(track.muxer, packet.Index(), data, ts)
+ }
- //将sps和pps按照单一模式打包
- if stream_.extraDataBuffer == nil {
- if !packet.KeyFrame() {
- return nil
- }
+ return t.OutBuffer[:t.OutBufferSize], int64(ts), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil
+}
- stream_.cache = true
- parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters()
-
- if utils.AVCodecIdH265 == packet.CodecId() {
- bytes := parameters.(*utils.HEVCCodecData).VPS()
- stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate)))
- }
-
- spsBytes := parameters.SPS()
- ppsBytes := parameters.PPS()
-
- stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
- stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
- stream_.extraDataBuffer = stream_.tmpExtraDataBuffer
+func (t *TranStream) ReadExtraData(ts int64) ([][]byte, int64, error) {
+ // 返回视频编码数据的rtp包
+ for _, track := range t.rtpTracks {
+ if utils.AVMediaTypeVideo != track.mediaType {
+ continue
}
- data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
- stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate)))
+ // 回滚序号和时间戳
+ index := int(track.seq) - len(track.extraDataBuffer)
+ for i, bytes := range track.extraDataBuffer {
+ librtp.RollbackSeq(bytes[OverTcpHeaderSize:], index+i+1)
+ binary.BigEndian.PutUint32(bytes[OverTcpHeaderSize+4:], uint32(ts))
+ }
+
+ return track.extraDataBuffer, ts, nil
}
- return nil
+ return nil, ts, nil
}
-func (t *tranStream) AddSink(sink_ stream.Sink) error {
- sink_.(*sink).setSenderCount(len(t.BaseTransStream.Tracks))
- if err := sink_.(*sink).SendHeader([]byte(t.sdp)); err != nil {
- return err
- }
-
- return t.BaseTransStream.AddSink(sink_)
+func (t *TranStream) PackRtpPayload(muxer librtp.Muxer, channel int, data []byte, timestamp uint32) {
+ var index int
+ muxer.Input(data, timestamp, func() []byte {
+ index = t.buffer.Index()
+ block := t.buffer.GetBlock()
+ return block[OverTcpHeaderSize:]
+ }, func(bytes []byte) {
+ packet := t.buffer.Get(index)[:OverTcpHeaderSize+len(bytes)]
+ t.OverTCP(packet, channel)
+ t.AppendOutStreamBuffer(packet)
+ })
}
-func (t *tranStream) AddTrack(stream utils.AVStream) error {
+func (t *TranStream) AddTrack(stream utils.AVStream) error {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
payloadType, ok := librtp.CodecIdPayloads[stream.CodecId()]
if !ok {
- return fmt.Errorf("no payload type was found for codecid:%d", stream.CodecId())
+ return fmt.Errorf("no payload type was found for codecid: %d", stream.CodecId())
}
- //创建RTP封装
+ // 创建RTP封装器
var muxer librtp.Muxer
if utils.AVCodecIdH264 == stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, 0, 0xFFFFFFFF)
@@ -158,25 +109,51 @@ func (t *tranStream) AddTrack(stream utils.AVStream) error {
muxer = librtp.NewMuxer(payloadType.Pt, 0, 0xFFFFFFFF)
}
- muxer.SetAllocHandler(t.onAllocBuffer)
- muxer.SetWriteHandler(t.onRtpPacket)
-
t.rtpTracks = append(t.rtpTracks, NewRTSPTrack(muxer, byte(payloadType.Pt), payloadType.ClockRate, stream.Type()))
- muxer.SetParams(len(t.rtpTracks) - 1)
+ index := len(t.rtpTracks) - 1
+
+ // 将sps和pps按照单一模式打包
+ bufferIndex := t.buffer.Index()
+ if utils.AVMediaTypeVideo == stream.Type() {
+ parameters := stream.CodecParameters()
+
+ if utils.AVCodecIdH265 == stream.CodecId() {
+ bytes := parameters.(*utils.HEVCCodecData).VPS()
+ t.PackRtpPayload(muxer, index, bytes[0], 0)
+ }
+
+ spsBytes := parameters.SPS()
+ ppsBytes := parameters.PPS()
+ t.PackRtpPayload(muxer, index, spsBytes[0], 0)
+ t.PackRtpPayload(muxer, index, ppsBytes[0], 0)
+
+ // 拷贝扩展数据的rtp包
+ size := t.buffer.Index() - bufferIndex
+ extraRtpBuffer := make([][]byte, size)
+ for i := 0; i < size; i++ {
+ src := t.buffer.Get(bufferIndex + i)
+ dst := make([]byte, len(src))
+ copy(dst, src)
+ extraRtpBuffer[i] = dst[:OverTcpHeaderSize+binary.BigEndian.Uint16(dst[2:])]
+ }
+
+ t.rtpTracks[index].extraDataBuffer = extraRtpBuffer
+ }
+
return nil
}
-func (t *tranStream) Close() error {
+func (t *TranStream) Close() ([][]byte, int64, error) {
for _, track := range t.rtpTracks {
if track != nil {
track.Close()
}
}
- return nil
+ return nil, 0, nil
}
-func (t *tranStream) WriteHeader() error {
+func (t *TranStream) WriteHeader() error {
description := sdp.SessionDescription{
Version: 0,
Origin: sdp.Origin{
@@ -261,9 +238,11 @@ func (t *tranStream) WriteHeader() error {
}
func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
- t := &tranStream{
+ t := &TranStream{
addr: addr,
urlFormat: urlFormat,
+ // 在将AVPacket打包rtp时, 会使用多个buffer块, 回环覆盖多个rtp块, 如果是TCP拉流并且网络不好, 推流的数据会错乱.
+ buffer: stream.NewReceiveBuffer(1500, 1024),
}
if addr.IP.To4() != nil {
diff --git a/rtsp/rtsp_track.go b/rtsp/rtsp_track.go
index 3f56b06..6300844 100644
--- a/rtsp/rtsp_track.go
+++ b/rtsp/rtsp_track.go
@@ -5,33 +5,25 @@ import (
"github.com/lkmio/avformat/utils"
)
-// 对rtsp每路输出流的封装
-type rtspTrack struct {
+// Track RtspTrack 对rtsp每路输出流的封装
+type Track struct {
pt byte
rate int
mediaType utils.AVMediaType
+ seq uint16
- buffer []byte //buffer of rtp packet
- muxer librtp.Muxer
- cache bool
-
- extraDataBuffer [][]byte //缓存带有编码信息的rtp包, 对所有sink通用
- tmpExtraDataBuffer [][]byte //缓存带有编码信息的rtp包, 整个过程会多次回调(sps->pps->sei...), 先保存到临时区, 最后再缓存到extraDataBuffer
+ muxer librtp.Muxer
+ extraDataBuffer [][]byte // 缓存带有编码信息的rtp包, 对所有sink通用
}
-func (r *rtspTrack) Close() {
- if r.muxer != nil {
- r.muxer.Close()
- r.muxer = nil
- }
+func (r *Track) Close() {
}
-func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *rtspTrack {
- stream := &rtspTrack{
+func NewRTSPTrack(muxer librtp.Muxer, pt byte, rate int, mediaType utils.AVMediaType) *Track {
+ stream := &Track{
pt: pt,
rate: rate,
muxer: muxer,
- buffer: make([]byte, 1500),
mediaType: mediaType,
}
diff --git a/stream/config.go b/stream/config.go
index 2f54138..4786770 100644
--- a/stream/config.go
+++ b/stream/config.go
@@ -231,16 +231,16 @@ func init() {
// AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写.
type AppConfig_ struct {
- GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频
- GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
- ProbeTimeout int `json:"probe_timeout"` //收流解析AVStream的超时时间
- WriteTimeout int `json:"write_timeout"` //Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
- WriteBufferNumber int `json:"-"`
- PublicIP string `json:"public_ip"`
- ListenIP string `json:"listen_ip"`
- IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
- ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
- Debug bool `json:"debug"` //debug模式, 开启将保存推流
+ GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存,只缓存一组音视频
+ GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
+ ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
+ WriteTimeout int `json:"write_timeout"` // Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
+ WriteBufferCapacity int `json:"-"` // 发送缓冲区容量大小, 缓冲区由多个内存块构成.
+ PublicIP string `json:"public_ip"`
+ ListenIP string `json:"listen_ip"`
+ IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
+ ReceiveTimeout int64 `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
+ Debug bool `json:"debug"` // debug模式, 开启将保存推流
//缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例.
@@ -284,7 +284,7 @@ func SetDefaultConfig(config_ *AppConfig_) {
config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream
config_.WriteTimeout = limitInt(2000, 10000, config_.WriteTimeout)
- config_.WriteBufferNumber = config_.WriteTimeout/config_.MergeWriteLatency + 1
+ config_.WriteBufferCapacity = config_.WriteTimeout/config_.MergeWriteLatency + 1
config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level)
config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize)
diff --git a/stream/hook_sink.go b/stream/hook_sink.go
index 730e1a9..9e7a698 100644
--- a/stream/hook_sink.go
+++ b/stream/hook_sink.go
@@ -7,6 +7,10 @@ import (
)
func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
+ return PreparePlaySinkWithReady(sink, true)
+}
+
+func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hooks.IsEnableOnPlay() {
@@ -20,6 +24,7 @@ func PreparePlaySink(sink Sink) (*http.Response, utils.HookState) {
response = hook
}
+ sink.SetReady(ok)
source := SourceManager.Find(sink.GetSourceID())
if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID())
diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go
index 17d08ef..5f1c9e8 100644
--- a/stream/mw_buffer.go
+++ b/stream/mw_buffer.go
@@ -27,6 +27,8 @@ type MergeWritingBuffer interface {
// ReadSegmentsFromKeyFrameIndex 从最近的关键帧读取切片
ReadSegmentsFromKeyFrameIndex(cb func([]byte))
+
+ Capacity() int
}
type mwBlock struct {
@@ -39,17 +41,13 @@ type mwBlock struct {
type mergeWritingBuffer struct {
mwBlocks []mwBlock
- //空闲合并写块
- keyFrameFreeMWBlocks collections.LinkedList[collections.MemoryPool]
- noneKeyFreeFrameMWBlocks collections.LinkedList[collections.MemoryPool]
+ index int // 当前切片位于mwBlocks的索引
+ startTS int64 // 当前切片的开始时间
+ duration int // 当前切片时长
- index int //当前切片位于mwBlocks的索引
- startTS int64 //当前切片的开始时间
- duration int //当前切片时长
-
- lastKeyFrameIndex int //最新关键帧所在切片的索引
- keyFrameCount int //关键帧计数
- existVideo bool //是否存在视频
+ lastKeyFrameIndex int // 最新关键帧所在切片的索引
+ keyFrameCount int // 关键帧计数
+ existVideo bool // 是否存在视频
keyFrameBufferMaxLength int
nonKeyFrameBufferMaxLength int
@@ -205,11 +203,15 @@ func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
}
}
+func (m *mergeWritingBuffer) Capacity() int {
+ return cap(m.mwBlocks)
+}
+
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
- //开启GOP缓存, 输出流也缓存整个GOP
+ // 开启GOP缓存, 输出流也缓存整个GOP
var blocks []mwBlock
if existVideo {
- blocks = make([]mwBlock, AppConfig.WriteBufferNumber)
+ blocks = make([]mwBlock, AppConfig.WriteBufferCapacity)
} else {
blocks = make([]mwBlock, 1)
}
diff --git a/stream/receive_buffer.go b/stream/receive_buffer.go
index 4955f06..0ecd55d 100644
--- a/stream/receive_buffer.go
+++ b/stream/receive_buffer.go
@@ -17,12 +17,24 @@ type ReceiveBuffer struct {
index int
}
+func (r *ReceiveBuffer) Index() int {
+ return r.index
+}
+
+func (r *ReceiveBuffer) Get(index int) []byte {
+ return r.data[index*r.blockSize : (index+1)*r.blockSize]
+}
+
func (r *ReceiveBuffer) GetBlock() []byte {
bytes := r.data[r.index*r.blockSize:]
r.index = (r.index + 1) % r.blockCount
return bytes[:r.blockSize]
}
+func (r *ReceiveBuffer) BlockCount() int {
+ return r.blockCount
+}
+
func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer {
return &ReceiveBuffer{blockSize: blockSize, blockCount: blockCount, data: make([]byte, blockSize*blockCount), index: 0}
}
diff --git a/stream/sink.go b/stream/sink.go
index 5e09256..5bc4074 100644
--- a/stream/sink.go
+++ b/stream/sink.go
@@ -16,9 +16,7 @@ type Sink interface {
GetSourceID() string
- Input(data []byte) error
-
- SendHeader(data []byte) error
+ Write(index int, data [][]byte, ts int64) error
GetTransStreamID() TransStreamID
@@ -61,11 +59,25 @@ type Sink interface {
SetUrlValues(values url.Values)
- Start()
+ // StartStreaming Source向Sink开始推流时调用
+ StartStreaming(stream TransStream) error
- Flush()
+ // StopStreaming Source向Sink停止推流时调用
+ StopStreaming(stream TransStream)
GetConn() net.Conn
+
+ IsTCPStreaming() bool
+
+ GetSentPacketCount() int
+
+ SetSentPacketCount(int)
+
+ IncreaseSentPacketCount()
+
+ IsReady() bool
+
+ SetReady(ok bool)
}
type BaseSink struct {
@@ -76,14 +88,17 @@ type BaseSink struct {
TransStreamID TransStreamID
disableVideo bool
- lock sync.RWMutex
- HasSentKeyVideo bool // 是否已经发送视频关键帧,未开启GOP缓存的情况下,为避免播放花屏,发送的首个视频帧必须为关键帧
+ lock sync.RWMutex
DesiredAudioCodecId_ utils.AVCodecID
DesiredVideoCodecId_ utils.AVCodecID
- Conn net.Conn
- urlValues url.Values // 拉流时携带的Url参数
+ Conn net.Conn // 拉流信令链路
+ TCPStreaming bool // 是否是TCP流式拉流
+ urlValues url.Values // 拉流时携带的Url参数
+
+ SentPacketCount int // 发包计数
+ Ready bool
}
func (s *BaseSink) GetID() SinkID {
@@ -94,20 +109,19 @@ func (s *BaseSink) SetID(id SinkID) {
s.ID = id
}
-func (s *BaseSink) Input(data []byte) error {
+func (s *BaseSink) Write(index int, data [][]byte, ts int64) error {
if s.Conn != nil {
- _, err := s.Conn.Write(data)
-
- return err
+ for _, bytes := range data {
+ _, err := s.Conn.Write(bytes)
+ if err != nil {
+ return err
+ }
+ }
}
return nil
}
-func (s *BaseSink) SendHeader(data []byte) error {
- return s.Input(data)
-}
-
func (s *BaseSink) GetSourceID() string {
return s.SourceID
}
@@ -133,7 +147,7 @@ func (s *BaseSink) UnLock() {
}
func (s *BaseSink) GetState() SessionState {
- utils.Assert(!s.lock.TryLock())
+ //utils.Assert(!s.lock.TryLock())
return s.State
}
@@ -161,11 +175,8 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
}
// Close 做如下事情:
-// 1. Sink如果正在拉流,删除任务交给Source处理. 否则直接从等待队列删除Sink.
+// 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink.
// 2. 发送PlayDoneHook事件
-// 什么时候调用Close? 是否考虑线程安全?
-// 拉流断开连接,不需要考虑线程安全
-// 踢流走source管道删除,并且关闭Conn
func (s *BaseSink) Close() {
if SessionStateClosed == s.State {
return
@@ -195,9 +206,10 @@ func (s *BaseSink) Close() {
}
if state == SessionStateTransferring {
- // 从Source中删除Sink
- source := SourceManager.Find(s.SourceID)
- source.RemoveSink(s)
+ // 从source中删除sink, 如果source为nil, 已经结束推流.
+ if source := SourceManager.Find(s.SourceID); source != nil {
+ source.RemoveSink(s)
+ }
} else if state == SessionStateWait {
// 从等待队列中删除Sink
RemoveSinkFromWaitingQueue(s.SourceID, s.ID)
@@ -225,14 +237,38 @@ func (s *BaseSink) SetUrlValues(values url.Values) {
s.urlValues = values
}
-func (s *BaseSink) Start() {
-
+func (s *BaseSink) StartStreaming(stream TransStream) error {
+ return nil
}
-func (s *BaseSink) Flush() {
+func (s *BaseSink) StopStreaming(stream TransStream) {
}
func (s *BaseSink) GetConn() net.Conn {
return s.Conn
}
+
+func (s *BaseSink) IsTCPStreaming() bool {
+ return s.TCPStreaming
+}
+
+func (s *BaseSink) GetSentPacketCount() int {
+ return s.SentPacketCount
+}
+
+func (s *BaseSink) SetSentPacketCount(count int) {
+ s.SentPacketCount = count
+}
+
+func (s *BaseSink) IncreaseSentPacketCount() {
+ s.SentPacketCount++
+}
+
+func (s *BaseSink) IsReady() bool {
+ return s.Ready
+}
+
+func (s *BaseSink) SetReady(ok bool) {
+ s.Ready = ok
+}
diff --git a/stream/source.go b/stream/source.go
index 2e1e20d..5ae1335 100644
--- a/stream/source.go
+++ b/stream/source.go
@@ -2,6 +2,7 @@ package stream
import (
"fmt"
+ "github.com/lkmio/avformat/transport"
"github.com/lkmio/lkm/collections"
"github.com/lkmio/lkm/log"
"net"
@@ -90,12 +91,15 @@ type Source interface {
// PostEvent 切换到主协程执行当前函数
PostEvent(cb func())
+ // LastPacketTime 返回最近收流时间戳
LastPacketTime() time.Time
SetLastPacketTime(time2 time.Time)
+ // SinkCount 返回拉流计数
SinkCount() int
+ // LastStreamEndTime 返回最近结束拉流时间戳
LastStreamEndTime() time.Time
SetReceiveDataTimer(timer *time.Timer)
@@ -111,6 +115,8 @@ type Source interface {
CreateTime() time.Time
SetCreateTime(time time.Time)
+
+ PlaySink(sin Sink)
}
type PublishSource struct {
@@ -138,7 +144,9 @@ type PublishSource struct {
receiveDataTimer *time.Timer // 收流超时计时器
idleTimer *time.Timer // 拉流空闲计时器
- TransStreams map[TransStreamID]TransStream //所有的输出流, 持有Sink
+ TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
+ Sinks map[SinkID]Sink // 保存所有Sink
+ TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
streamPipe chan []byte // 推流数据管道
mainContextEvents chan func() // 切换到主协程执行函数的事件管道
@@ -202,6 +210,10 @@ func (s *PublishSource) Init(receiveQueueSize int) {
// -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖
s.streamPipe = make(chan []byte, receiveQueueSize-2)
s.mainContextEvents = make(chan func(), 128)
+
+ s.TransStreams = make(map[TransStreamID]TransStream, 10)
+ s.Sinks = make(map[SinkID]Sink, 128)
+ s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
}
func (s *PublishSource) CreateDefaultOutStreams() {
@@ -225,14 +237,15 @@ func (s *PublishSource) CreateDefaultOutStreams() {
streams := s.OriginStreams()
utils.Assert(len(streams) > 0)
- hlsStream, err := s.CreateTransStream(TransStreamHls, streams)
+ id := GenerateTransStreamID(TransStreamHls, streams...)
+ hlsStream, err := s.CreateTransStream(id, TransStreamHls, streams)
if err != nil {
panic(err)
}
- s.dispatchGOPBuffer(hlsStream)
+ s.DispatchGOPBuffer(hlsStream)
s.hlsStream = hlsStream
- s.TransStreams[GenerateTransStreamID(TransStreamHls, streams...)] = s.hlsStream
+ s.TransStreams[id] = s.hlsStream
}
}
@@ -279,8 +292,8 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils
return true
}
-func (s *PublishSource) CreateTransStream(protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
- log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.ID)
+func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
+ log.Sugar.Debugf("创建%s-stream source: %s", protocol.ToString(), s.ID)
transStream, err := CreateTransStream(s, protocol, streams)
if err != nil {
@@ -288,22 +301,71 @@ func (s *PublishSource) CreateTransStream(protocol TransStreamProtocol, streams
return nil, err
}
- for _, avStream := range streams {
- transStream.AddTrack(avStream)
+ for _, track := range streams {
+ transStream.AddTrack(track)
}
- transStream.Init()
+ transStream.SetID(id)
+ // 创建输出流对应的拉流队列
+ s.TransStreamSinks[id] = make(map[SinkID]Sink, 128)
_ = transStream.WriteHeader()
return transStream, err
}
-func (s *PublishSource) dispatchGOPBuffer(transStream TransStream) {
+func (s *PublishSource) DispatchGOPBuffer(transStream TransStream) {
s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
- transStream.Input(packet)
+ s.DispatchPacket(transStream, packet)
})
}
+// DispatchPacket 分发AVPacket
+func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket) {
+ data, timestamp, videoKey, err := transStream.Input(packet)
+ if err != nil || len(data) < 1 {
+ return
+ }
+
+ s.DispatchBuffer(transStream, packet.Index(), data, timestamp, videoKey)
+}
+
+// DispatchBuffer 分发传输流
+func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) {
+ sinks := s.TransStreamSinks[transStream.GetID()]
+ exist := transStream.IsExistVideo()
+ for _, sink := range sinks {
+
+ // 如果存在视频, 确保向sink发送的第一帧是关键帧
+ if exist && sink.GetSentPacketCount() < 1 {
+ if !videoKey {
+ continue
+ }
+
+ if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
+ s.write(sink, index, extraData, timestamp)
+ }
+ }
+
+ s.write(sink, index, data, timestamp)
+ }
+}
+
+// 向sink推流
+func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int64) {
+ err := sink.Write(index, data, timestamp)
+ if err == nil {
+ sink.IncreaseSentPacketCount()
+ //return
+ }
+
+ // 内核发送缓冲区满, 清空sink的发送缓冲区, 等下次关键帧时再尝试发送。
+ //_, ok := err.(*transport.ZeroWindowSizeError)
+ //if ok {
+ // conn, ok := sink.GetConn().(*transport.Conn)
+ //}
+}
+
+// 创建sink需要的输出流
func (s *PublishSource) doAddSink(sink Sink) bool {
// 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
@@ -352,14 +414,10 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
}
transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
- transStream, ok := s.TransStreams[transStreamId]
- if !ok {
- if s.TransStreams == nil {
- s.TransStreams = make(map[TransStreamID]TransStream, 10)
- }
-
+ transStream, exist := s.TransStreams[transStreamId]
+ if !exist {
var err error
- transStream, err = s.CreateTransStream(sink.GetProtocol(), streams[:size])
+ transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), streams[:size])
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return false
@@ -377,19 +435,50 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
} else {
- transStream.AddSink(sink)
+ sink.SetState(SessionStateTransferring)
}
- sink.SetState(SessionStateTransferring)
}
+ err := sink.StartStreaming(transStream)
+ if err != nil {
+ log.Sugar.Errorf("开始推流失败 err: %s", err.Error())
+ return false
+ }
+
+ // 还没准备好推流, 暂不推流
+ if !sink.IsReady() {
+ return true
+ }
+
+ // TCP拉流开启异步发包
+ conn, ok := sink.GetConn().(*transport.Conn)
+ if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
+ conn.EnableAsyncWriteMode(transStream.OutStreamBufferCapacity() - 2)
+ }
+
+ // 发送已有的缓存数据
+ // 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
+ data, timestamp, _ := transStream.ReadKeyFrameBuffer()
+ if len(data) > 0 {
+ if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
+ s.write(sink, 0, extraData, timestamp)
+ }
+
+ s.write(sink, 0, data, timestamp)
+ }
+
+ // 累加拉流计数
if s.recordSink != sink {
s.sinkCount++
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
- // 新建传输流,发送缓存的音视频帧
- if !ok && AppConfig.GOPCache && s.existVideo {
- s.dispatchGOPBuffer(transStream)
+ s.Sinks[sink.GetID()] = sink
+ s.TransStreamSinks[transStreamId][sink.GetID()] = sink
+
+ // 新建传输流,发送已经缓存的音视频帧
+ if !exist && AppConfig.GOPCache && s.existVideo {
+ s.DispatchGOPBuffer(transStream)
}
return true
@@ -415,38 +504,31 @@ func (s *PublishSource) RemoveSink(sink Sink) {
func (s *PublishSource) RemoveSinkWithID(id SinkID) {
s.PostEvent(func() {
- for _, transStream := range s.TransStreams {
- if sink, _ := transStream.RemoveSink(id); sink != nil {
- s.doRemoveSink(sink)
- break
- }
+ sink, ok := s.Sinks[id]
+ if ok {
+ s.doRemoveSink(sink)
}
})
}
func (s *PublishSource) doRemoveSink(sink Sink) bool {
- id := sink.GetTransStreamID()
- if id > 0 {
- transStream := s.TransStreams[id]
+ transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()]
+ delete(s.Sinks, sink.GetID())
+ delete(transStreamSinks, sink.GetID())
- // 从输出流中删除Sink
- _, b := transStream.RemoveSink(sink.GetID())
- if b {
- s.sinkCount--
- s.lastStreamEndTime = time.Now()
- HookPlayDoneEvent(sink)
- log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
- return true
- }
+ s.sinkCount--
+ s.lastStreamEndTime = time.Now()
+
+ log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
+
+ if sink.GetProtocol() == TransStreamHls {
+ // 从HLS拉流队列删除Sink
+ SinkManager.Remove(sink.GetID())
}
- // 从等待队列中删除Sink
- _, b := RemoveSinkFromWaitingQueue(sink.GetSourceID(), sink.GetID())
-
- // 从HLS拉流队列删除
- SinkManager.Remove(sink.GetID())
-
- return b
+ sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()])
+ HookPlayDoneEvent(sink)
+ return true
}
func (s *PublishSource) SetState(state SessionState) {
@@ -458,7 +540,9 @@ func (s *PublishSource) DoClose() {
return
}
- log.Sugar.Infof("关闭推流源 id: %s", s.ID)
+ s.closed = true
+
+ log.Sugar.Infof("关闭推流源: %s", s.ID)
if s.TransDeMuxer != nil {
s.TransDeMuxer.Close()
@@ -479,6 +563,7 @@ func (s *PublishSource) DoClose() {
s.gopBuffer = nil
}
+ // 停止所有计时器
if s.probeTimer != nil {
s.probeTimer.Stop()
}
@@ -491,6 +576,7 @@ func (s *PublishSource) DoClose() {
s.idleTimer.Stop()
}
+ // 关闭录制流
if s.recordSink != nil {
s.recordSink.Close()
}
@@ -503,36 +589,46 @@ func (s *PublishSource) DoClose() {
log.Sugar.Errorf("删除源失败 source:%s err:%s", s.ID, err.Error())
}
- // 将所有Sink添加到等待队列
+ // 关闭所有输出流
for _, transStream := range s.TransStreams {
- transStream.Close()
-
- transStream.PopAllSink(func(sink Sink) {
- sink.SetTransStreamID(0)
- if s.recordSink == sink {
- return
- }
-
- {
- sink.Lock()
- defer sink.UnLock()
-
- if SessionStateClosed == sink.GetState() {
- log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
- } else {
- sink.SetState(SessionStateWait)
- AddSinkToWaitingQueue(s.ID, sink)
- }
- }
-
- if SessionStateClosed != sink.GetState() {
- sink.Flush()
- }
- })
+ // 发送剩余包
+ data, ts, _ := transStream.Close()
+ if len(data) > 0 {
+ s.DispatchBuffer(transStream, -1, data, ts, true)
+ }
+ }
+
+ // 将所有sink添加到等待队列
+ for _, sink := range s.Sinks {
+ transStreamID := sink.GetTransStreamID()
+ sink.SetTransStreamID(0)
+ if s.recordSink == sink {
+ return
+ }
+
+ {
+ sink.Lock()
+
+ if SessionStateClosed == sink.GetState() {
+ log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
+ } else {
+ sink.SetState(SessionStateWait)
+ AddSinkToWaitingQueue(s.ID, sink)
+ }
+
+ sink.UnLock()
+ }
+
+ if SessionStateClosed != sink.GetState() {
+ sink.StopStreaming(s.TransStreams[transStreamID])
+ }
}
- s.closed = true
s.TransStreams = nil
+ s.Sinks = nil
+ s.TransStreamSinks = nil
+
+ // 异步hook
go func() {
if s.Conn != nil {
s.Conn.Close()
@@ -582,12 +678,12 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
- //设置GOP缓存溢出回调
+ // 设置GOP缓存溢出回调
s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
}
}
-// 解析完所有track后, 做一些初始化工作
+// 解析完所有track后, 创建各种输出流
func (s *PublishSource) writeHeader() {
if s.completed {
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.ID)
@@ -600,7 +696,7 @@ func (s *PublishSource) writeHeader() {
}
if len(s.originStreams.All()) == 0 {
- log.Sugar.Errorf("没有一路流, 删除source:%s", s.ID)
+ log.Sugar.Errorf("没有一路track, 删除source: %s", s.ID)
s.DoClose()
return
}
@@ -608,7 +704,7 @@ func (s *PublishSource) writeHeader() {
// 创建录制流和HLS
s.CreateDefaultOutStreams()
- // 将等待队列的Sink添加到输出流队列
+ // 将等待队列的sink添加到输出流队列
sinks := PopWaitingSinks(s.ID)
if s.recordSink != nil {
sinks = append(sinks, s.recordSink)
@@ -659,8 +755,8 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
}
// 分发给各个传输流
- for _, stream_ := range s.TransStreams {
- stream_.Input(packet)
+ for _, transStream := range s.TransStreams {
+ s.DispatchPacket(transStream, packet)
}
// 未开启GOP缓存或只存在音频流, 释放掉内存
@@ -716,3 +812,9 @@ func (s *PublishSource) CreateTime() time.Time {
func (s *PublishSource) SetCreateTime(time time.Time) {
s.createTime = time
}
+
+func (s *PublishSource) PlaySink(sink Sink) {
+ s.PostEvent(func() {
+
+ })
+}
diff --git a/stream/source_utils.go b/stream/source_utils.go
index 5b5c7d6..c3288cf 100644
--- a/stream/source_utils.go
+++ b/stream/source_utils.go
@@ -36,13 +36,13 @@ const (
)
const (
- SessionStateCreate = SessionState(1) //新建状态
- SessionStateHandshaking = SessionState(2) //握手中
- SessionStateHandshakeFailure = SessionState(3) //握手失败
- SessionStateHandshakeDone = SessionState(4) //握手完成
- SessionStateWait = SessionState(5) //位于等待队列中
- SessionStateTransferring = SessionState(6) //推拉流中
- SessionStateClosed = SessionState(7) //关闭状态
+ SessionStateCreate = SessionState(1) // 新建状态
+ SessionStateHandshaking = SessionState(2) // 握手中
+ SessionStateHandshakeFailure = SessionState(3) // 握手失败
+ SessionStateHandshakeDone = SessionState(4) // 握手完成
+ SessionStateWait = SessionState(5) // 位于等待队列中
+ SessionStateTransferring = SessionState(6) // 推拉流中
+ SessionStateClosed = SessionState(7) // 关闭状态
)
func (s SourceType) ToString() string {
diff --git a/stream/trans_stream.go b/stream/trans_stream.go
index 7afa643..e93191f 100644
--- a/stream/trans_stream.go
+++ b/stream/trans_stream.go
@@ -1,53 +1,68 @@
package stream
import (
- "github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
- "github.com/lkmio/lkm/log"
)
-// TransStream 将AVPacket封装成传输流,转发给各个Sink
+// TransStream 将AVPacket封装成传输流
type TransStream interface {
- Init()
+ GetID() TransStreamID
- Input(packet utils.AVPacket) error
+ SetID(id TransStreamID)
+
+ Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
AddTrack(stream utils.AVStream) error
+ TrackCount() int
+
+ GetTracks() []utils.AVStream
+
WriteHeader() error
- AddSink(sink Sink) error
+ // GetProtocol 返回输出流协议
+ GetProtocol() TransStreamProtocol
- ExistSink(id SinkID) bool
+ // ReadExtraData 获取封装后的编码器扩展数据
+ ReadExtraData(timestamp int64) ([][]byte, int64, error)
- RemoveSink(id SinkID) (Sink, bool)
+ // ReadKeyFrameBuffer 读取已经缓存的包含关键视频帧的输出流
+ ReadKeyFrameBuffer() ([][]byte, int64, error)
- PopAllSink(handler func(sink Sink))
+ Close() ([][]byte, int64, error)
- AllSink() []Sink
+ ClearOutStreamBuffer()
- Close() error
+ AppendOutStreamBuffer(buffer []byte)
- SendPacket(data []byte) error
+ // OutStreamBufferCapacity 返回输出流缓冲区的容量大小, 输出流缓冲区同时作为向sink推流的发送缓冲区, 容量大小决定向sink异步推流的队列大小;
+ OutStreamBufferCapacity() int
- Protocol() TransStreamProtocol
+ IsExistVideo() bool
}
type BaseTransStream struct {
- Sinks map[SinkID]Sink
//muxer stream.Muxer
+ ID TransStreamID
Tracks []utils.AVStream
Completed bool
ExistVideo bool
- Protocol_ TransStreamProtocol
+ Protocol TransStreamProtocol
+
+ OutBuffer [][]byte // 完成封装的输出流队列
+ OutBufferSize int
}
-func (t *BaseTransStream) Init() {
- t.Sinks = make(map[SinkID]Sink, 64)
+func (t *BaseTransStream) GetID() TransStreamID {
+ return t.ID
}
-func (t *BaseTransStream) Input(packet utils.AVPacket) error {
- return nil
+func (t *BaseTransStream) SetID(id TransStreamID) {
+ t.ID = id
+}
+
+func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
+ return nil, -1, false, nil
}
func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
@@ -58,82 +73,68 @@ func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
return nil
}
-func (t *BaseTransStream) AddSink(sink Sink) error {
- t.Sinks[sink.GetID()] = sink
- sink.Start()
- return nil
+func (t *BaseTransStream) Close() ([][]byte, int64, error) {
+ return nil, 0, nil
}
-func (t *BaseTransStream) ExistSink(id SinkID) bool {
- _, ok := t.Sinks[id]
- return ok
+func (t *BaseTransStream) GetProtocol() TransStreamProtocol {
+ return t.Protocol
}
-func (t *BaseTransStream) RemoveSink(id SinkID) (Sink, bool) {
- sink, ok := t.Sinks[id]
- if ok {
- delete(t.Sinks, id)
+func (t *BaseTransStream) ClearOutStreamBuffer() {
+ t.OutBufferSize = 0
+}
+
+func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte) {
+ if t.OutBufferSize+1 > len(t.OutBuffer) {
+ // 扩容
+ size := (t.OutBufferSize + 1) * 2
+ newBuffer := make([][]byte, size)
+ for i := 0; i < t.OutBufferSize; i++ {
+ newBuffer[i] = t.OutBuffer[i]
+ }
+
+ t.OutBuffer = newBuffer
}
- return sink, ok
+ t.OutBuffer[t.OutBufferSize] = buffer
+ t.OutBufferSize++
}
-func (t *BaseTransStream) PopAllSink(handler func(sink Sink)) {
- for _, sink := range t.Sinks {
- handler(sink)
- }
-
- t.Sinks = nil
+func (t *BaseTransStream) OutStreamBufferCapacity() int {
+ return 0
}
-func (t *BaseTransStream) AllSink() []Sink {
- //TODO implement me
- panic("implement me")
+func (t *BaseTransStream) TrackCount() int {
+ return len(t.Tracks)
}
-func (t *BaseTransStream) Close() error {
- return nil
+func (t *BaseTransStream) GetTracks() []utils.AVStream {
+ return t.Tracks
}
-func (t *BaseTransStream) SendPacket(data []byte) error {
- for _, sink := range t.Sinks {
- sink.Input(data)
- }
-
- return nil
+func (t *BaseTransStream) IsExistVideo() bool {
+ return t.ExistVideo
}
-func (t *BaseTransStream) Protocol() TransStreamProtocol {
- return t.Protocol_
+func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error) {
+ return nil, 0, nil
+}
+
+func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
+ return nil, 0, nil
}
type TCPTransStream struct {
BaseTransStream
+
+ // 合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
+ // 起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后,mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
+ // 看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
+ MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区
}
-func (t *TCPTransStream) AddSink(sink Sink) error {
- if err := t.BaseTransStream.AddSink(sink); err != nil {
- return err
- }
-
- if sink.GetConn() != nil {
- sink.GetConn().(*transport.Conn).EnableAsyncWriteMode(AppConfig.WriteBufferNumber - 1)
- }
- return nil
-}
-
-func (t *TCPTransStream) SendPacket(data []byte) error {
- for _, sink := range t.Sinks {
- err := sink.Input(data)
- if err == nil {
- continue
- }
-
- if _, ok := err.(*transport.ZeroWindowSizeError); ok {
- log.Sugar.Errorf("发送超时, 强制断开连接 sink:%s", sink.String())
- sink.GetConn().Close()
- }
- }
-
- return nil
+func (t *TCPTransStream) OutStreamBufferCapacity() int {
+ utils.Assert(t.MWBuffer != nil)
+ return t.MWBuffer.Capacity()
}