重构合并写实现, TCP流使用异步发送

This commit is contained in:
yangjiechina
2024-07-10 20:17:37 +08:00
parent e27f12f5a4
commit 99658f417a
14 changed files with 309 additions and 174 deletions

2
api.go
View File

@@ -75,7 +75,7 @@ func startApiServer(addr string) {
//TCP主动,设置连接地址 //TCP主动,设置连接地址
apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource) apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource)
apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource) apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource)
apiServer.router.HandleFunc("/api/v1/gb28181/gc/force", func(writer http.ResponseWriter, request *http.Request) { apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
runtime.GC() runtime.GC()
writer.WriteHeader(http.StatusOK) writer.WriteHeader(http.StatusOK)
}) })

View File

@@ -2,6 +2,7 @@
"gop_cache": true, "gop_cache": true,
"gop_buffer_size": 8192000, "gop_buffer_size": 8192000,
"probe_timeout": 2000, "probe_timeout": 2000,
"write_timeout": 5000,
"mw_latency": 350, "mw_latency": 350,
"listen_ip" : "0.0.0.0", "listen_ip" : "0.0.0.0",
"public_ip": "192.168.2.148", "public_ip": "192.168.2.148",
@@ -55,14 +56,14 @@
"hook": { "hook": {
"enable": true, "enable": true,
"timeout": 10, "timeout": 10,
"on_publish": "http://localhost:8081/api/v1/on_publish", "on_publish": "http://localhost:9000/api/v1/hook/on_publish",
"on_publish_done": "http://localhost:8081/api/v1/on_publish_done", "on_publish_done": "http://localhost:9000/api/v1/hook/on_publish_done",
"on_play" : "http://localhost:8081/api/v1/on_play", "on_play" : "http://localhost:9000/api/v1/hook/on_play",
"on_play_done" : "http://localhost:8081/api/on_play_done", "on_play_done" : "http://localhost:9000/api/on_play_done",
"on_record": "http://localhost:8081/api/v1/on_reocrd", "on_record": "http://localhost:9000/api/v1/hook/on_reocrd",
"on_idle_timeout": "http://localhost:8081/api/v1/on_idle_timeout", "on_idle_timeout": "http://localhost:9000/api/v1/hook/on_idle_timeout",
"on_receive_timeout": "http://localhost:8081/api/v1/on_recv_timeout" "on_receive_timeout": "http://localhost:9000/api/v1/hook/on_recv_timeout"
}, },
"log": { "log": {

View File

@@ -14,21 +14,8 @@ const (
HttpFlvBlockLengthSize = 20 HttpFlvBlockLengthSize = 20
) )
type HttpFlvBlock struct {
pktSize uint32
skipCount uint16
}
var separator []byte
func init() {
separator = make([]byte, 2)
separator[0] = 0x0D
separator[1] = 0x0A
}
type httpTransStream struct { type httpTransStream struct {
stream.BaseTransStream stream.TCPTransStream
muxer libflv.Muxer muxer libflv.Muxer
mwBuffer stream.MergeWritingBuffer mwBuffer stream.MergeWritingBuffer
@@ -57,9 +44,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
} }
//发送剩余数据 //发送剩余数据
if videoKey && !t.mwBuffer.IsEmpty() { if videoKey && !t.mwBuffer.IsNewSegment() {
t.mwBuffer.Reserve(2) t.mwBuffer.Reserve(2)
segment := t.mwBuffer.PopSegment() segment := t.mwBuffer.FlushSegment()
t.sendUnpackedSegment(segment) t.sendUnpackedSegment(segment)
} }
@@ -67,7 +54,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
var separatorSize int var separatorSize int
//新的合并写切片, 预留包长字节 //新的合并写切片, 预留包长字节
if t.mwBuffer.IsCompleted() { if t.mwBuffer.IsNewSegment() {
separatorSize = HttpFlvBlockLengthSize separatorSize = HttpFlvBlockLengthSize
//10字节描述flv包长, 前2个字节描述无效字节长度 //10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockLengthSize n = HttpFlvBlockLengthSize
@@ -79,7 +66,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
} }
//分配flv block //分配flv block
bytes := t.mwBuffer.Allocate(separatorSize + flvSize) bytes := t.mwBuffer.Allocate(separatorSize+flvSize, dts, videoKey)
n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
copy(bytes[n:], data) copy(bytes[n:], data)
@@ -87,8 +74,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
//每一个合并写切片开始和预留长度所需的字节数 //每一个合并写切片开始和预留长度所需的字节数
//合并写切片末尾加上换行符 //合并写切片末尾加上换行符
//长度是16进制字符串 //长度是16进制字符串
segment := t.mwBuffer.PeekCompletedSegment(dts) if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
if len(segment) > 0 {
t.sendUnpackedSegment(segment) t.sendUnpackedSegment(segment)
} }
return nil return nil
@@ -129,24 +115,27 @@ func (t *httpTransStream) computeSkipCount(data []byte) int {
func (t *httpTransStream) AddSink(sink stream.Sink) error { func (t *httpTransStream) AddSink(sink stream.Sink) error {
utils.Assert(t.headerSize > 0) utils.Assert(t.headerSize > 0)
t.BaseTransStream.AddSink(sink) t.TCPTransStream.AddSink(sink)
//发送sequence header //发送sequence header
t.sendSegment(sink, t.header[:t.headerSize]) t.sendSegment(sink, t.header[:t.headerSize])
//发送当前内存池已有的合并写切片 //发送当前内存池已有的合并写切片
segmentList := t.mwBuffer.SegmentList() first := true
if len(segmentList) > 0 { t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
//修改第一个flv tag的pre tag size if first {
binary.BigEndian.PutUint32(segmentList[20:], uint32(t.headerTagSize)) //修改第一个flv tag的pre tag size
binary.BigEndian.PutUint32(bytes[20:], uint32(t.headerTagSize))
first = false
}
//遍历发送合并写切片 //遍历发送合并写切片
var index int var index int
for ; index < len(segmentList); index += 4 { for ; index < len(bytes); index += 4 {
size := binary.BigEndian.Uint32(segmentList[index:]) size := binary.BigEndian.Uint32(bytes[index:])
t.sendSegment(sink, segmentList[index:index+4+int(size)]) t.sendSegment(sink, bytes[index:index+4+int(size)])
index += int(size) index += int(size)
} }
} })
return nil return nil
} }
@@ -208,8 +197,7 @@ func (t *httpTransStream) WriteHeader() error {
func (t *httpTransStream) Close() error { func (t *httpTransStream) Close() error {
//发送剩余的流 //发送剩余的流
segment := t.mwBuffer.PopSegment() if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
if len(segment) > 0 {
t.sendUnpackedSegment(segment) t.sendUnpackedSegment(segment)
} }
return nil return nil

View File

@@ -2,6 +2,7 @@ package flv
import ( import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/yangjiechina/avformat/transport"
"net" "net"
"time" "time"
) )
@@ -15,6 +16,8 @@ func (w WSConn) Read(b []byte) (n int, err error) {
} }
func (w WSConn) Write(b []byte) (n int, err error) { func (w WSConn) Write(b []byte) (n int, err error) {
//输入http-flv数据
//去掉不需要的换行符
var offset int var offset int
for i := 2; i < len(b); i++ { for i := 2; i < len(b); i++ {
if b[i-2] == 0x0D && b[i-1] == 0x0A { if b[i-2] == 0x0D && b[i-1] == 0x0A {
@@ -31,5 +34,5 @@ func (w WSConn) SetDeadline(t time.Time) error {
} }
func NewWSConn(conn *websocket.Conn) net.Conn { func NewWSConn(conn *websocket.Conn) net.Conn {
return &WSConn{conn} return transport.NewConn(&WSConn{conn})
} }

View File

@@ -2,7 +2,7 @@ package hls
import ( import (
"bytes" "bytes"
"github.com/yangjiechina/lkm/stream" "github.com/yangjiechina/lkm/collections"
"math" "math"
"strconv" "strconv"
) )
@@ -59,7 +59,7 @@ type M3U8Writer interface {
func NewM3U8Writer(len int) M3U8Writer { func NewM3U8Writer(len int) M3U8Writer {
return &m3u8Writer{ return &m3u8Writer{
stringBuffer: bytes.NewBuffer(make([]byte, 0, 1024*10)), stringBuffer: bytes.NewBuffer(make([]byte, 0, 1024*10)),
playlist: stream.NewQueue(len), playlist: collections.NewQueue(len),
} }
} }
@@ -72,7 +72,7 @@ type Segment struct {
type m3u8Writer struct { type m3u8Writer struct {
stringBuffer *bytes.Buffer stringBuffer *bytes.Buffer
playlist *stream.Queue playlist *collections.Queue
} }
func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) { func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) {

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/collections"
"github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream" "github.com/yangjiechina/lkm/stream"
"net" "net"
@@ -41,8 +42,8 @@ type Session struct {
videoIndex int videoIndex int
audioStream utils.AVStream audioStream utils.AVStream
videoStream utils.AVStream videoStream utils.AVStream
audioBuffer stream.MemoryPool audioBuffer collections.MemoryPool
videoBuffer stream.MemoryPool videoBuffer collections.MemoryPool
rtpPacket *RtpPacket rtpPacket *RtpPacket
receiveBuffer *stream.ReceiveBuffer receiveBuffer *stream.ReceiveBuffer
} }

18
main.go
View File

@@ -44,7 +44,7 @@ func init() {
} }
if stream.AppConfig.Rtsp.IsMultiPort() { if stream.AppConfig.Rtsp.IsMultiPort() {
rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[0]), uint16(stream.AppConfig.Rtsp.Port[1])) rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[1]), uint16(stream.AppConfig.Rtsp.Port[2]))
} }
indent, _ := json.MarshalIndent(stream.AppConfig, "", "\t") indent, _ := json.MarshalIndent(stream.AppConfig, "", "\t")
@@ -59,8 +59,8 @@ func main() {
panic(err) panic(err)
} }
impl := rtmp.NewServer() server := rtmp.NewServer()
err = impl.Start(rtmpAddr) err = server.Start(rtmpAddr)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -74,8 +74,8 @@ func main() {
panic(rtspAddr) panic(rtspAddr)
} }
rtspServer := rtsp.NewServer(stream.AppConfig.Rtsp.Password) server := rtsp.NewServer(stream.AppConfig.Rtsp.Password)
err = rtspServer.Start(rtspAddr) err = server.Start(rtspAddr)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -125,10 +125,8 @@ func main() {
log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String())
} }
loadConfigError := http.ListenAndServe(":19999", nil) err := http.ListenAndServe(":19999", nil)
if loadConfigError != nil { if err != nil {
panic(loadConfigError) panic(err)
} }
select {}
} }

View File

@@ -8,7 +8,7 @@ import (
) )
type transStream struct { type transStream struct {
stream.BaseTransStream stream.TCPTransStream
chunkSize int chunkSize int
@@ -22,7 +22,7 @@ type transStream struct {
//合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存. //合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
//起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性. //起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
//看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694 //看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
mwBuffer stream.MergeWritingBuffer mwBuffer stream.MergeWritingBuffer //合并写同时作为, 用户态的发送缓冲区
} }
func (t *transStream) Input(packet utils.AVPacket) error { func (t *transStream) Input(packet utils.AVPacket) error {
@@ -62,16 +62,15 @@ func (t *transStream) Input(packet utils.AVPacket) error {
payloadSize += chunkPayloadOffset + len(data) payloadSize += chunkPayloadOffset + len(data)
} }
//遇到视频关键帧, 发送剩余的流 //遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey { if videoKey {
segment := t.mwBuffer.PopSegment() if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
if len(segment) > 0 {
t.SendPacket(segment) t.SendPacket(segment)
} }
} }
//分配内存 //分配内存
allocate := t.mwBuffer.Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize)) allocate := t.mwBuffer.Allocate(chunkHeaderSize+payloadSize+((payloadSize-1)/t.chunkSize), dts, videoKey)
//写rtmp chunk header //写rtmp chunk header
chunk.Length = payloadSize chunk.Length = payloadSize
@@ -84,11 +83,10 @@ func (t *transStream) Input(packet utils.AVPacket) error {
} else { } else {
n += t.muxer.WriteAudioData(allocate[chunkHeaderSize:], false) n += t.muxer.WriteAudioData(allocate[chunkHeaderSize:], false)
} }
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
segment := t.mwBuffer.PeekCompletedSegment(dts) //合并写满了再发
if len(segment) > 0 { if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 {
t.SendPacket(segment) t.SendPacket(segment)
} }
return nil return nil
@@ -97,17 +95,14 @@ func (t *transStream) Input(packet utils.AVPacket) error {
func (t *transStream) AddSink(sink stream.Sink) error { func (t *transStream) AddSink(sink stream.Sink) error {
utils.Assert(t.headerSize > 0) utils.Assert(t.headerSize > 0)
t.BaseTransStream.AddSink(sink) t.TCPTransStream.AddSink(sink)
//发送sequence header //发送sequence header
sink.Input(t.header[:t.headerSize]) sink.Input(t.header[:t.headerSize])
//发送当前内存池已有的合并写切片 //发送当前内存池已有的合并写切片
segmentList := t.mwBuffer.SegmentList() t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
if len(segmentList) > 0 { sink.Input(bytes)
sink.Input(segmentList) })
return nil
}
return nil return nil
} }
@@ -179,8 +174,7 @@ func (t *transStream) WriteHeader() error {
func (t *transStream) Close() error { func (t *transStream) Close() error {
//发送剩余的流 //发送剩余的流
segment := t.mwBuffer.PopSegment() if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 {
if len(segment) > 0 {
t.SendPacket(segment) t.SendPacket(segment)
} }
return nil return nil

View File

@@ -10,6 +10,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
) )
const ( const (
@@ -219,14 +220,16 @@ func init() {
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写. // AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct { type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频 GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` ProbeTimeout int `json:"probe_timeout"` //收流解析AVStream的超时时间
PublicIP string `json:"public_ip"` WriteTimeout int `json:"write_timeout"` //Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
ListenIP string `json:"listen_ip"` WriteBufferNumber int `json:"-"`
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. PublicIP string `json:"public_ip"`
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. ListenIP string `json:"listen_ip"`
Debug bool `json:"debug"` //debug模式, 开启将保存推流 IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` //debug模式, 开启将保存推流
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例. //合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
@@ -268,11 +271,17 @@ func SetDefaultConfig(config_ *AppConfig_) {
config_.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config_.GOPBufferSize) //最低4M码率 最高160M码率 config_.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config_.GOPBufferSize) //最低4M码率 最高160M码率
config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送 config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream
config_.WriteTimeout = limitInt(2000, 10000, config_.WriteTimeout)
config_.WriteBufferNumber = config_.WriteTimeout/config_.MergeWriteLatency + 1
config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level) config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level)
config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize) config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize)
config_.Log.MaxBackup = limitMin(1, config_.Log.MaxBackup) config_.Log.MaxBackup = limitMin(1, config_.Log.MaxBackup)
config_.Log.MaxAge = limitMin(1, config_.Log.MaxAge) config_.Log.MaxAge = limitMin(1, config_.Log.MaxAge)
config_.IdleTimeout *= int64(time.Second)
config_.ReceiveTimeout *= int64(time.Second)
config_.Hook.Timeout *= int64(time.Second)
} }
func limitMin(min, value int) int { func limitMin(min, value int) int {

View File

@@ -1,6 +1,9 @@
package stream package stream
import "github.com/yangjiechina/avformat/utils" import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/collections"
)
// GOPBuffer GOP缓存 // GOPBuffer GOP缓存
type GOPBuffer interface { type GOPBuffer interface {
@@ -23,13 +26,13 @@ type GOPBuffer interface {
} }
type streamBuffer struct { type streamBuffer struct {
buffer RingBuffer buffer collections.RingBuffer
existVideoKeyFrame bool existVideoKeyFrame bool
discardHandler func(packet utils.AVPacket) discardHandler func(packet utils.AVPacket)
} }
func NewStreamBuffer() GOPBuffer { func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: NewRingBuffer(1000), existVideoKeyFrame: false} return &streamBuffer{buffer: collections.NewRingBuffer(1000), existVideoKeyFrame: false}
} }
func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool { func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool {

View File

@@ -1,125 +1,225 @@
package stream package stream
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/collections"
)
// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 // MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存
// 和GOP缓存一样, 也以视频关键帧为界. 遇到视频关键帧, 发送剩余输出流, 清空buffer // 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒,
// 容量根据write_timeout发送超时和合并写时间来计算, write_timeout/mw_latency.如果I帧间隔大于发送超时时间, 则需要创建新的块.
type MergeWritingBuffer interface { type MergeWritingBuffer interface {
Allocate(size int) []byte Allocate(size int, ts int64, videoKey bool) []byte
// PeekCompletedSegment 返回当前完整合并写切片 // PeekCompletedSegment 返回当前完整切片, 如果不满, 返回nil.
PeekCompletedSegment(ts int64) []byte PeekCompletedSegment() []byte
// PopSegment 返回当前合并写切片, 并清空内存池 // FlushSegment 保存当前切片, 创建新的切片
PopSegment() []byte FlushSegment() []byte
// SegmentList 返回所有完整切片
SegmentList() []byte
IsFull(ts int64) bool IsFull(ts int64) bool
IsCompleted() bool // IsNewSegment 新切片, 还未写数据
IsNewSegment() bool
IsEmpty() bool // Reserve 从当前切片中预留指定长度数据
Reserve(number int)
Reserve(count int) // ReadSegmentsFromKeyFrameIndex 从最近的关键帧读取切片
ReadSegmentsFromKeyFrameIndex(cb func([]byte))
} }
type mergeWritingBuffer struct { type mergeWritingBuffer struct {
transStreamBuffer MemoryPool mwBlocks []collections.MemoryPool
segmentOffset int //当前合并写包位于memoryPool的开始偏移量 //空闲合并写
freeKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool]
freeNoneKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool]
prePacketTS int64 //前一个包的时间戳 index int //当前切片位于mwBlocks的索引
startTS int64 //当前切片的开始时间
duration int //当前切片时长
lastKeyFrameIndex int //最新关键帧所在切片的索引
existVideo bool //是否存在视频
keyFrameBufferMaxLength int
nonKeyFrameBufferMaxLength int
keyFrameMap map[int]int
} }
func (m *mergeWritingBuffer) Allocate(size int) []byte { func (m *mergeWritingBuffer) createMWBlock(videoKey bool) collections.MemoryPool {
return m.transStreamBuffer.Allocate(size) if videoKey {
return collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength)
} else {
return collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength)
}
} }
func (m *mergeWritingBuffer) PeekCompletedSegment(ts int64) []byte { func (m *mergeWritingBuffer) grow() {
if !AppConfig.GOPCache { pools := make([]collections.MemoryPool, cap(m.mwBlocks)*3/2)
data, _ := m.transStreamBuffer.Data() for i := 0; i < cap(m.mwBlocks); i++ {
m.transStreamBuffer.Clear() pools[i] = m.mwBlocks[i]
}
m.mwBlocks = pools
}
func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte {
if !AppConfig.GOPCache || !m.existVideo {
return m.mwBlocks[0].Allocate(size)
}
utils.Assert(ts != -1)
//新的切片
if m.startTS == -1 {
if _, ok := m.keyFrameMap[m.index]; ok {
delete(m.keyFrameMap, m.index)
}
if m.mwBlocks[m.index] == nil {
//创建内存块
m.mwBlocks[m.index] = m.createMWBlock(videoKey)
} else {
//循环使用
if !videoKey {
//I帧间隔长, 不够写一组GOP, 扩容!
if len(m.keyFrameMap) < 1 {
capacity := len(m.mwBlocks)
m.grow()
m.index = capacity
m.mwBlocks[m.index] = m.createMWBlock(videoKey)
}
}
m.mwBlocks[m.index].Clear()
}
m.startTS = ts
}
if videoKey {
//请务必确保关键帧帧从新的切片开始
//外部遇到关键帧请先调用FlushSegment
utils.Assert(m.mwBlocks[m.index].IsEmpty())
m.lastKeyFrameIndex = m.index
m.keyFrameMap[m.index] = m.index
}
if ts < m.startTS {
m.startTS = ts
}
m.duration = int(ts - m.startTS)
return m.mwBlocks[m.index].Allocate(size)
}
func (m *mergeWritingBuffer) FlushSegment() []byte {
if m.mwBlocks[m.index] == nil {
return nil
}
data, _ := m.mwBlocks[m.index].Data()
if len(data) == 0 {
return nil
}
//更新缓冲长度
if m.lastKeyFrameIndex == m.index && m.keyFrameBufferMaxLength < len(data) {
m.keyFrameBufferMaxLength = len(data) * 3 / 2
} else if m.lastKeyFrameIndex != m.index && m.nonKeyFrameBufferMaxLength < len(data) {
m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2
}
m.index = (m.index + 1) % cap(m.mwBlocks)
m.startTS = -1
m.duration = 0
return data
}
func (m *mergeWritingBuffer) PeekCompletedSegment() []byte {
if !AppConfig.GOPCache || !m.existVideo {
data, _ := m.mwBlocks[0].Data()
m.mwBlocks[0].Clear()
return data return data
} }
if m.prePacketTS == -1 { if m.duration < AppConfig.MergeWriteLatency {
m.prePacketTS = ts
}
if ts < m.prePacketTS {
m.prePacketTS = ts
}
if int(ts-m.prePacketTS) < AppConfig.MergeWriteLatency {
return nil return nil
} }
head, _ := m.transStreamBuffer.Data() return m.FlushSegment()
data := head[m.segmentOffset:]
m.segmentOffset = len(head)
m.prePacketTS = -1
return data
} }
func (m *mergeWritingBuffer) IsFull(ts int64) bool { func (m *mergeWritingBuffer) IsFull(ts int64) bool {
if m.prePacketTS == -1 { if m.startTS == -1 {
return false return false
} }
return int(ts-m.prePacketTS) >= AppConfig.MergeWriteLatency return int(ts-m.startTS) >= AppConfig.MergeWriteLatency
} }
func (m *mergeWritingBuffer) IsCompleted() bool { func (m *mergeWritingBuffer) IsNewSegment() bool {
data, _ := m.transStreamBuffer.Data() if m.mwBlocks[m.index] == nil {
return m.segmentOffset == len(data) return true
}
func (m *mergeWritingBuffer) IsEmpty() bool {
data, _ := m.transStreamBuffer.Data()
return len(data) <= m.segmentOffset
}
func (m *mergeWritingBuffer) Reserve(count int) {
_ = m.transStreamBuffer.Allocate(count)
}
func (m *mergeWritingBuffer) PopSegment() []byte {
if !AppConfig.GOPCache {
return nil
} }
head, _ := m.transStreamBuffer.Data() data, _ := m.mwBlocks[m.index].Data()
data := head[m.segmentOffset:] return len(data) == 0
m.transStreamBuffer.Clear()
m.segmentOffset = 0
m.prePacketTS = -1
return data
} }
func (m *mergeWritingBuffer) SegmentList() []byte { func (m *mergeWritingBuffer) Reserve(number int) {
if !AppConfig.GOPCache { utils.Assert(m.mwBlocks[m.index] != nil)
return nil
m.mwBlocks[m.index].Reserve(number)
}
func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
if m.lastKeyFrameIndex < 0 || m.index == m.lastKeyFrameIndex {
return
} }
head, _ := m.transStreamBuffer.Data() for i := m.lastKeyFrameIndex; i < cap(m.mwBlocks); i++ {
return head[:m.segmentOffset] if m.mwBlocks[i] == nil {
continue
}
data, _ := m.mwBlocks[i].Data()
cb(data)
}
//回调循环使用的头部数据
if m.index < m.lastKeyFrameIndex {
for i := 0; i < m.index; i++ {
data, _ := m.mwBlocks[i].Data()
cb(data)
}
}
} }
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
//开启GOP缓存, 输出流也缓存整个GOP //开启GOP缓存, 输出流也缓存整个GOP
bufferSize := AppConfig.GOPBufferSize var blocks []collections.MemoryPool
if existVideo && !AppConfig.GOPCache { if existVideo {
bufferSize = 1024 * 1000 blocks = make([]collections.MemoryPool, AppConfig.WriteBufferNumber)
} else if !existVideo { } else {
bufferSize = 48000 * 10 blocks = make([]collections.MemoryPool, 1)
}
if !existVideo || !AppConfig.GOPCache {
blocks[0] = collections.NewDirectMemoryPool(1024 * 100)
} }
return &mergeWritingBuffer{ return &mergeWritingBuffer{
transStreamBuffer: NewDirectMemoryPool(bufferSize), keyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 * 2,
segmentOffset: 0, nonKeyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 / 2,
prePacketTS: -1, mwBlocks: blocks,
startTS: -1,
lastKeyFrameIndex: -1,
existVideo: existVideo,
keyFrameMap: make(map[int]int, 5),
} }
} }

View File

@@ -64,6 +64,8 @@ type Sink interface {
Start() Start()
Flush() Flush()
GetConn() net.Conn
} }
// GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String // GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
@@ -244,3 +246,7 @@ func (s *BaseSink) Start() {
func (s *BaseSink) Flush() { func (s *BaseSink) Flush() {
} }
func (s *BaseSink) GetConn() net.Conn {
return s.Conn
}

View File

@@ -3,6 +3,7 @@ package stream
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/yangjiechina/lkm/collections"
"github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/log"
"net" "net"
"net/url" "net/url"
@@ -91,7 +92,7 @@ type Source interface {
Close() Close()
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool
// OnDiscardPacket GOP缓存溢出回调, 释放AVPacket // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket
OnDiscardPacket(pkt utils.AVPacket) OnDiscardPacket(pkt utils.AVPacket)
@@ -140,15 +141,15 @@ type PublishSource struct {
state SessionState state SessionState
Conn net.Conn Conn net.Conn
TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
recordSink Sink //每个Source的录制流 recordSink Sink //每个Source的录制流
hlsStream TransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成 hlsStream TransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成
audioTranscoders []transcode.Transcoder //音频解码器 audioTranscoders []transcode.Transcoder //音频解码器
videoTranscoders []transcode.Transcoder //视频解码器 videoTranscoders []transcode.Transcoder //视频解码器
originStreams StreamManager //推流的音视频Streams originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Stream allStreams StreamManager //推流Streams+转码器获得的Stream
pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. pktBuffers [8]collections.MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
existVideo bool //是否存在视频 existVideo bool //是否存在视频
completed bool completed bool
@@ -228,21 +229,21 @@ func (s *PublishSource) CreateDefaultOutStreams() {
} }
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool { func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool {
if index >= cap(s.pktBuffers) { if index >= cap(s.pktBuffers) {
panic("流路数过多...") panic("流路数过多...")
} }
if s.pktBuffers[index] == nil { if s.pktBuffers[index] == nil {
if utils.AVMediaTypeAudio == mediaType { if utils.AVMediaTypeAudio == mediaType {
s.pktBuffers[index] = NewRbMemoryPool(48000 * 64) s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 64)
} else if AppConfig.GOPCache { } else if AppConfig.GOPCache {
//开启GOP缓存 //开启GOP缓存
s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize) s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize)
} else { } else {
//未开启GOP缓存 //未开启GOP缓存
//1M缓存大小, 单帧绰绰有余 //1M缓存大小, 单帧绰绰有余
s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000) s.pktBuffers[index] = collections.NewRbMemoryPool(1024 * 1000)
} }
} }

View File

@@ -1,7 +1,9 @@
package stream package stream
import ( import (
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
) )
// TransStream 将AVPacket封装成传输流转发给各个Sink // TransStream 将AVPacket封装成传输流转发给各个Sink
@@ -97,3 +99,32 @@ func (t *BaseTransStream) SendPacket(data []byte) error {
return nil return nil
} }
type TCPTransStream struct {
BaseTransStream
}
func (t *TCPTransStream) AddSink(sink Sink) error {
if err := t.BaseTransStream.AddSink(sink); err != nil {
return err
}
sink.GetConn().(*transport.Conn).EnableAsyncWriteMode(AppConfig.WriteBufferNumber - 1)
return nil
}
func (t *TCPTransStream) SendPacket(data []byte) error {
for _, sink := range t.Sinks {
err := sink.Input(data)
if err == nil {
continue
}
if _, ok := err.(*transport.ZeroWindowSizeError); ok {
log.Sugar.Errorf("发送超时, 强制删除 sink:%s", sink.PrintInfo())
go sink.Close()
}
}
return nil
}