diff --git a/api.go b/api.go index 27904b9..c00e71c 100644 --- a/api.go +++ b/api.go @@ -75,7 +75,7 @@ func startApiServer(addr string) { //TCP主动,设置连接地址 apiServer.router.HandleFunc("/api/v1/gb28181/source/connect", apiServer.connectGBSource) apiServer.router.HandleFunc("/api/v1/gb28181/source/close", apiServer.closeGBSource) - apiServer.router.HandleFunc("/api/v1/gb28181/gc/force", func(writer http.ResponseWriter, request *http.Request) { + apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) { runtime.GC() writer.WriteHeader(http.StatusOK) }) diff --git a/config.json b/config.json index d79ada3..0566421 100644 --- a/config.json +++ b/config.json @@ -2,6 +2,7 @@ "gop_cache": true, "gop_buffer_size": 8192000, "probe_timeout": 2000, + "write_timeout": 5000, "mw_latency": 350, "listen_ip" : "0.0.0.0", "public_ip": "192.168.2.148", @@ -55,14 +56,14 @@ "hook": { "enable": true, "timeout": 10, - "on_publish": "http://localhost:8081/api/v1/on_publish", - "on_publish_done": "http://localhost:8081/api/v1/on_publish_done", - "on_play" : "http://localhost:8081/api/v1/on_play", - "on_play_done" : "http://localhost:8081/api/on_play_done", + "on_publish": "http://localhost:9000/api/v1/hook/on_publish", + "on_publish_done": "http://localhost:9000/api/v1/hook/on_publish_done", + "on_play" : "http://localhost:9000/api/v1/hook/on_play", + "on_play_done" : "http://localhost:9000/api/on_play_done", - "on_record": "http://localhost:8081/api/v1/on_reocrd", - "on_idle_timeout": "http://localhost:8081/api/v1/on_idle_timeout", - "on_receive_timeout": "http://localhost:8081/api/v1/on_recv_timeout" + "on_record": "http://localhost:9000/api/v1/hook/on_reocrd", + "on_idle_timeout": "http://localhost:9000/api/v1/hook/on_idle_timeout", + "on_receive_timeout": "http://localhost:9000/api/v1/hook/on_recv_timeout" }, "log": { diff --git a/flv/http_flv.go b/flv/http_flv.go index ce07329..5eb2984 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -14,21 +14,8 @@ const ( HttpFlvBlockLengthSize = 20 ) -type HttpFlvBlock struct { - pktSize uint32 - skipCount uint16 -} - -var separator []byte - -func init() { - separator = make([]byte, 2) - separator[0] = 0x0D - separator[1] = 0x0A -} - type httpTransStream struct { - stream.BaseTransStream + stream.TCPTransStream muxer libflv.Muxer mwBuffer stream.MergeWritingBuffer @@ -57,9 +44,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { } //发送剩余数据 - if videoKey && !t.mwBuffer.IsEmpty() { + if videoKey && !t.mwBuffer.IsNewSegment() { t.mwBuffer.Reserve(2) - segment := t.mwBuffer.PopSegment() + segment := t.mwBuffer.FlushSegment() t.sendUnpackedSegment(segment) } @@ -67,7 +54,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { var separatorSize int //新的合并写切片, 预留包长字节 - if t.mwBuffer.IsCompleted() { + if t.mwBuffer.IsNewSegment() { separatorSize = HttpFlvBlockLengthSize //10字节描述flv包长, 前2个字节描述无效字节长度 n = HttpFlvBlockLengthSize @@ -79,7 +66,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { } //分配flv block - bytes := t.mwBuffer.Allocate(separatorSize + flvSize) + bytes := t.mwBuffer.Allocate(separatorSize+flvSize, dts, videoKey) n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) copy(bytes[n:], data) @@ -87,8 +74,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error { //每一个合并写切片开始和预留长度所需的字节数 //合并写切片末尾加上换行符 //长度是16进制字符串 - segment := t.mwBuffer.PeekCompletedSegment(dts) - if len(segment) > 0 { + if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 { t.sendUnpackedSegment(segment) } return nil @@ -129,24 +115,27 @@ func (t *httpTransStream) computeSkipCount(data []byte) int { func (t *httpTransStream) AddSink(sink stream.Sink) error { utils.Assert(t.headerSize > 0) - t.BaseTransStream.AddSink(sink) + t.TCPTransStream.AddSink(sink) //发送sequence header t.sendSegment(sink, t.header[:t.headerSize]) //发送当前内存池已有的合并写切片 - segmentList := t.mwBuffer.SegmentList() - if len(segmentList) > 0 { - //修改第一个flv tag的pre tag size - binary.BigEndian.PutUint32(segmentList[20:], uint32(t.headerTagSize)) + first := true + t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) { + if first { + //修改第一个flv tag的pre tag size + binary.BigEndian.PutUint32(bytes[20:], uint32(t.headerTagSize)) + first = false + } //遍历发送合并写切片 var index int - for ; index < len(segmentList); index += 4 { - size := binary.BigEndian.Uint32(segmentList[index:]) - t.sendSegment(sink, segmentList[index:index+4+int(size)]) + for ; index < len(bytes); index += 4 { + size := binary.BigEndian.Uint32(bytes[index:]) + t.sendSegment(sink, bytes[index:index+4+int(size)]) index += int(size) } - } + }) return nil } @@ -208,8 +197,7 @@ func (t *httpTransStream) WriteHeader() error { func (t *httpTransStream) Close() error { //发送剩余的流 - segment := t.mwBuffer.PopSegment() - if len(segment) > 0 { + if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 { t.sendUnpackedSegment(segment) } return nil diff --git a/flv/ws_conn.go b/flv/ws_conn.go index df8c078..38a842d 100644 --- a/flv/ws_conn.go +++ b/flv/ws_conn.go @@ -2,6 +2,7 @@ package flv import ( "github.com/gorilla/websocket" + "github.com/yangjiechina/avformat/transport" "net" "time" ) @@ -15,6 +16,8 @@ func (w WSConn) Read(b []byte) (n int, err error) { } func (w WSConn) Write(b []byte) (n int, err error) { + //输入http-flv数据 + //去掉不需要的换行符 var offset int for i := 2; i < len(b); i++ { if b[i-2] == 0x0D && b[i-1] == 0x0A { @@ -31,5 +34,5 @@ func (w WSConn) SetDeadline(t time.Time) error { } func NewWSConn(conn *websocket.Conn) net.Conn { - return &WSConn{conn} + return transport.NewConn(&WSConn{conn}) } diff --git a/hls/m3u8.go b/hls/m3u8.go index bd05d58..6fa6cdd 100644 --- a/hls/m3u8.go +++ b/hls/m3u8.go @@ -2,7 +2,7 @@ package hls import ( "bytes" - "github.com/yangjiechina/lkm/stream" + "github.com/yangjiechina/lkm/collections" "math" "strconv" ) @@ -59,7 +59,7 @@ type M3U8Writer interface { func NewM3U8Writer(len int) M3U8Writer { return &m3u8Writer{ stringBuffer: bytes.NewBuffer(make([]byte, 0, 1024*10)), - playlist: stream.NewQueue(len), + playlist: collections.NewQueue(len), } } @@ -72,7 +72,7 @@ type Segment struct { type m3u8Writer struct { stringBuffer *bytes.Buffer - playlist *stream.Queue + playlist *collections.Queue } func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) { diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index ae32fc1..bc40cb2 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/collections" "github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/stream" "net" @@ -41,8 +42,8 @@ type Session struct { videoIndex int audioStream utils.AVStream videoStream utils.AVStream - audioBuffer stream.MemoryPool - videoBuffer stream.MemoryPool + audioBuffer collections.MemoryPool + videoBuffer collections.MemoryPool rtpPacket *RtpPacket receiveBuffer *stream.ReceiveBuffer } diff --git a/main.go b/main.go index 296dab6..0168540 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func init() { } if stream.AppConfig.Rtsp.IsMultiPort() { - rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[0]), uint16(stream.AppConfig.Rtsp.Port[1])) + rtsp.TransportManger = transport.NewTransportManager(uint16(stream.AppConfig.Rtsp.Port[1]), uint16(stream.AppConfig.Rtsp.Port[2])) } indent, _ := json.MarshalIndent(stream.AppConfig, "", "\t") @@ -59,8 +59,8 @@ func main() { panic(err) } - impl := rtmp.NewServer() - err = impl.Start(rtmpAddr) + server := rtmp.NewServer() + err = server.Start(rtmpAddr) if err != nil { panic(err) } @@ -74,8 +74,8 @@ func main() { panic(rtspAddr) } - rtspServer := rtsp.NewServer(stream.AppConfig.Rtsp.Password) - err = rtspServer.Start(rtspAddr) + server := rtsp.NewServer(stream.AppConfig.Rtsp.Password) + err = server.Start(rtspAddr) if err != nil { panic(err) } @@ -125,10 +125,8 @@ func main() { log.Sugar.Info("启动jt1078服务成功 addr:", jtAddr.String()) } - loadConfigError := http.ListenAndServe(":19999", nil) - if loadConfigError != nil { - panic(loadConfigError) + err := http.ListenAndServe(":19999", nil) + if err != nil { + panic(err) } - - select {} } diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 250c1ca..ddbaaf6 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -8,7 +8,7 @@ import ( ) type transStream struct { - stream.BaseTransStream + stream.TCPTransStream chunkSize int @@ -22,7 +22,7 @@ type transStream struct { //合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存. //起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后,mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性. //看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694 - mwBuffer stream.MergeWritingBuffer + mwBuffer stream.MergeWritingBuffer //合并写同时作为, 用户态的发送缓冲区 } func (t *transStream) Input(packet utils.AVPacket) error { @@ -62,16 +62,15 @@ func (t *transStream) Input(packet utils.AVPacket) error { payloadSize += chunkPayloadOffset + len(data) } - //遇到视频关键帧, 发送剩余的流 + //遇到视频关键帧, 发送剩余的流, 创建新切片 if videoKey { - segment := t.mwBuffer.PopSegment() - if len(segment) > 0 { + if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 { t.SendPacket(segment) } } //分配内存 - allocate := t.mwBuffer.Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize)) + allocate := t.mwBuffer.Allocate(chunkHeaderSize+payloadSize+((payloadSize-1)/t.chunkSize), dts, videoKey) //写rtmp chunk header chunk.Length = payloadSize @@ -84,11 +83,10 @@ func (t *transStream) Input(packet utils.AVPacket) error { } else { n += t.muxer.WriteAudioData(allocate[chunkHeaderSize:], false) } - n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) - segment := t.mwBuffer.PeekCompletedSegment(dts) - if len(segment) > 0 { + //合并写满了再发 + if segment := t.mwBuffer.PeekCompletedSegment(); len(segment) > 0 { t.SendPacket(segment) } return nil @@ -97,17 +95,14 @@ func (t *transStream) Input(packet utils.AVPacket) error { func (t *transStream) AddSink(sink stream.Sink) error { utils.Assert(t.headerSize > 0) - t.BaseTransStream.AddSink(sink) + t.TCPTransStream.AddSink(sink) //发送sequence header sink.Input(t.header[:t.headerSize]) //发送当前内存池已有的合并写切片 - segmentList := t.mwBuffer.SegmentList() - if len(segmentList) > 0 { - sink.Input(segmentList) - return nil - } - + t.mwBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) { + sink.Input(bytes) + }) return nil } @@ -179,8 +174,7 @@ func (t *transStream) WriteHeader() error { func (t *transStream) Close() error { //发送剩余的流 - segment := t.mwBuffer.PopSegment() - if len(segment) > 0 { + if segment := t.mwBuffer.FlushSegment(); len(segment) > 0 { t.SendPacket(segment) } return nil diff --git a/stream/config.go b/stream/config.go index 67b1487..842ee8f 100644 --- a/stream/config.go +++ b/stream/config.go @@ -10,6 +10,7 @@ import ( "os" "strconv" "strings" + "time" ) const ( @@ -219,14 +220,16 @@ func init() { // AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写. type AppConfig_ struct { - GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 - GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 - ProbeTimeout int `json:"probe_timeout"` - PublicIP string `json:"public_ip"` - ListenIP string `json:"listen_ip"` - IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. - ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. - Debug bool `json:"debug"` //debug模式, 开启将保存推流 + GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 + GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 + ProbeTimeout int `json:"probe_timeout"` //收流解析AVStream的超时时间 + WriteTimeout int `json:"write_timeout"` //Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本. + WriteBufferNumber int `json:"-"` + PublicIP string `json:"public_ip"` + ListenIP string `json:"listen_ip"` + IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. + ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. + Debug bool `json:"debug"` //debug模式, 开启将保存推流 //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. @@ -268,11 +271,17 @@ func SetDefaultConfig(config_ *AppConfig_) { config_.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config_.GOPBufferSize) //最低4M码率 最高160M码率 config_.MergeWriteLatency = limitInt(350, 2000, config_.MergeWriteLatency) //最低缓存350毫秒数据才发送 最高缓存2秒数据才发送 config_.ProbeTimeout = limitInt(2000, 5000, config_.MergeWriteLatency) //2-5秒内必须解析完AVStream + config_.WriteTimeout = limitInt(2000, 10000, config_.WriteTimeout) + config_.WriteBufferNumber = config_.WriteTimeout/config_.MergeWriteLatency + 1 config_.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config_.Log.Level) config_.Log.MaxSize = limitMin(1, config_.Log.MaxSize) config_.Log.MaxBackup = limitMin(1, config_.Log.MaxBackup) config_.Log.MaxAge = limitMin(1, config_.Log.MaxAge) + + config_.IdleTimeout *= int64(time.Second) + config_.ReceiveTimeout *= int64(time.Second) + config_.Hook.Timeout *= int64(time.Second) } func limitMin(min, value int) int { diff --git a/stream/gop_buffer.go b/stream/gop_buffer.go index ffb41ed..ddde894 100644 --- a/stream/gop_buffer.go +++ b/stream/gop_buffer.go @@ -1,6 +1,9 @@ package stream -import "github.com/yangjiechina/avformat/utils" +import ( + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/collections" +) // GOPBuffer GOP缓存 type GOPBuffer interface { @@ -23,13 +26,13 @@ type GOPBuffer interface { } type streamBuffer struct { - buffer RingBuffer + buffer collections.RingBuffer existVideoKeyFrame bool discardHandler func(packet utils.AVPacket) } func NewStreamBuffer() GOPBuffer { - return &streamBuffer{buffer: NewRingBuffer(1000), existVideoKeyFrame: false} + return &streamBuffer{buffer: collections.NewRingBuffer(1000), existVideoKeyFrame: false} } func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool { diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index 034f84c..206fba1 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -1,125 +1,225 @@ package stream +import ( + "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/collections" +) + // MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 -// 和GOP缓存一样, 也以视频关键帧为界. 遇到视频关键帧, 发送剩余输出流, 清空buffer - +// 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒, +// 容量根据write_timeout发送超时和合并写时间来计算, write_timeout/mw_latency.如果I帧间隔大于发送超时时间, 则需要创建新的块. type MergeWritingBuffer interface { - Allocate(size int) []byte + Allocate(size int, ts int64, videoKey bool) []byte - // PeekCompletedSegment 返回当前完整合并写切片 - PeekCompletedSegment(ts int64) []byte + // PeekCompletedSegment 返回当前完整切片, 如果不满, 返回nil. + PeekCompletedSegment() []byte - // PopSegment 返回当前合并写切片, 并清空内存池 - PopSegment() []byte - - // SegmentList 返回所有完整切片 - SegmentList() []byte + // FlushSegment 保存当前切片, 创建新的切片 + FlushSegment() []byte IsFull(ts int64) bool - IsCompleted() bool + // IsNewSegment 新切片, 还未写数据 + IsNewSegment() bool - IsEmpty() bool + // Reserve 从当前切片中预留指定长度数据 + Reserve(number int) - Reserve(count int) + // ReadSegmentsFromKeyFrameIndex 从最近的关键帧读取切片 + ReadSegmentsFromKeyFrameIndex(cb func([]byte)) } type mergeWritingBuffer struct { - transStreamBuffer MemoryPool + mwBlocks []collections.MemoryPool - segmentOffset int //当前合并写包位于memoryPool的开始偏移量 + //空闲合并写块 + freeKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool] + freeNoneKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool] - prePacketTS int64 //前一个包的时间戳 + index int //当前切片位于mwBlocks的索引 + startTS int64 //当前切片的开始时间 + duration int //当前切片时长 + + lastKeyFrameIndex int //最新关键帧所在切片的索引 + existVideo bool //是否存在视频 + + keyFrameBufferMaxLength int + nonKeyFrameBufferMaxLength int + keyFrameMap map[int]int } -func (m *mergeWritingBuffer) Allocate(size int) []byte { - return m.transStreamBuffer.Allocate(size) +func (m *mergeWritingBuffer) createMWBlock(videoKey bool) collections.MemoryPool { + if videoKey { + return collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength) + } else { + return collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength) + } } -func (m *mergeWritingBuffer) PeekCompletedSegment(ts int64) []byte { - if !AppConfig.GOPCache { - data, _ := m.transStreamBuffer.Data() - m.transStreamBuffer.Clear() +func (m *mergeWritingBuffer) grow() { + pools := make([]collections.MemoryPool, cap(m.mwBlocks)*3/2) + for i := 0; i < cap(m.mwBlocks); i++ { + pools[i] = m.mwBlocks[i] + } + + m.mwBlocks = pools +} + +func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte { + if !AppConfig.GOPCache || !m.existVideo { + return m.mwBlocks[0].Allocate(size) + } + + utils.Assert(ts != -1) + + //新的切片 + if m.startTS == -1 { + if _, ok := m.keyFrameMap[m.index]; ok { + delete(m.keyFrameMap, m.index) + } + + if m.mwBlocks[m.index] == nil { + //创建内存块 + m.mwBlocks[m.index] = m.createMWBlock(videoKey) + } else { + //循环使用 + if !videoKey { + + //I帧间隔长, 不够写一组GOP, 扩容! + if len(m.keyFrameMap) < 1 { + capacity := len(m.mwBlocks) + m.grow() + m.index = capacity + + m.mwBlocks[m.index] = m.createMWBlock(videoKey) + } + } + + m.mwBlocks[m.index].Clear() + } + + m.startTS = ts + } + + if videoKey { + //请务必确保关键帧帧从新的切片开始 + //外部遇到关键帧请先调用FlushSegment + utils.Assert(m.mwBlocks[m.index].IsEmpty()) + m.lastKeyFrameIndex = m.index + m.keyFrameMap[m.index] = m.index + } + + if ts < m.startTS { + m.startTS = ts + } + + m.duration = int(ts - m.startTS) + return m.mwBlocks[m.index].Allocate(size) +} + +func (m *mergeWritingBuffer) FlushSegment() []byte { + if m.mwBlocks[m.index] == nil { + return nil + } + + data, _ := m.mwBlocks[m.index].Data() + if len(data) == 0 { + return nil + } + + //更新缓冲长度 + if m.lastKeyFrameIndex == m.index && m.keyFrameBufferMaxLength < len(data) { + m.keyFrameBufferMaxLength = len(data) * 3 / 2 + } else if m.lastKeyFrameIndex != m.index && m.nonKeyFrameBufferMaxLength < len(data) { + m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2 + } + + m.index = (m.index + 1) % cap(m.mwBlocks) + m.startTS = -1 + m.duration = 0 + return data +} + +func (m *mergeWritingBuffer) PeekCompletedSegment() []byte { + if !AppConfig.GOPCache || !m.existVideo { + data, _ := m.mwBlocks[0].Data() + m.mwBlocks[0].Clear() return data } - if m.prePacketTS == -1 { - m.prePacketTS = ts - } - - if ts < m.prePacketTS { - m.prePacketTS = ts - } - - if int(ts-m.prePacketTS) < AppConfig.MergeWriteLatency { + if m.duration < AppConfig.MergeWriteLatency { return nil } - head, _ := m.transStreamBuffer.Data() - data := head[m.segmentOffset:] - - m.segmentOffset = len(head) - m.prePacketTS = -1 - - return data + return m.FlushSegment() } func (m *mergeWritingBuffer) IsFull(ts int64) bool { - if m.prePacketTS == -1 { + if m.startTS == -1 { return false } - return int(ts-m.prePacketTS) >= AppConfig.MergeWriteLatency + return int(ts-m.startTS) >= AppConfig.MergeWriteLatency } -func (m *mergeWritingBuffer) IsCompleted() bool { - data, _ := m.transStreamBuffer.Data() - return m.segmentOffset == len(data) -} - -func (m *mergeWritingBuffer) IsEmpty() bool { - data, _ := m.transStreamBuffer.Data() - return len(data) <= m.segmentOffset -} - -func (m *mergeWritingBuffer) Reserve(count int) { - _ = m.transStreamBuffer.Allocate(count) -} - -func (m *mergeWritingBuffer) PopSegment() []byte { - if !AppConfig.GOPCache { - return nil +func (m *mergeWritingBuffer) IsNewSegment() bool { + if m.mwBlocks[m.index] == nil { + return true } - head, _ := m.transStreamBuffer.Data() - data := head[m.segmentOffset:] - m.transStreamBuffer.Clear() - m.segmentOffset = 0 - m.prePacketTS = -1 - return data + data, _ := m.mwBlocks[m.index].Data() + return len(data) == 0 } -func (m *mergeWritingBuffer) SegmentList() []byte { - if !AppConfig.GOPCache { - return nil +func (m *mergeWritingBuffer) Reserve(number int) { + utils.Assert(m.mwBlocks[m.index] != nil) + + m.mwBlocks[m.index].Reserve(number) +} + +func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { + if m.lastKeyFrameIndex < 0 || m.index == m.lastKeyFrameIndex { + return } - head, _ := m.transStreamBuffer.Data() - return head[:m.segmentOffset] + for i := m.lastKeyFrameIndex; i < cap(m.mwBlocks); i++ { + if m.mwBlocks[i] == nil { + continue + } + + data, _ := m.mwBlocks[i].Data() + cb(data) + } + + //回调循环使用的头部数据 + if m.index < m.lastKeyFrameIndex { + for i := 0; i < m.index; i++ { + data, _ := m.mwBlocks[i].Data() + cb(data) + } + } } func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { //开启GOP缓存, 输出流也缓存整个GOP - bufferSize := AppConfig.GOPBufferSize - if existVideo && !AppConfig.GOPCache { - bufferSize = 1024 * 1000 - } else if !existVideo { - bufferSize = 48000 * 10 + var blocks []collections.MemoryPool + if existVideo { + blocks = make([]collections.MemoryPool, AppConfig.WriteBufferNumber) + } else { + blocks = make([]collections.MemoryPool, 1) + } + + if !existVideo || !AppConfig.GOPCache { + blocks[0] = collections.NewDirectMemoryPool(1024 * 100) } return &mergeWritingBuffer{ - transStreamBuffer: NewDirectMemoryPool(bufferSize), - segmentOffset: 0, - prePacketTS: -1, + keyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 * 2, + nonKeyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 / 2, + mwBlocks: blocks, + startTS: -1, + lastKeyFrameIndex: -1, + existVideo: existVideo, + keyFrameMap: make(map[int]int, 5), } } diff --git a/stream/sink.go b/stream/sink.go index f1b6e6a..5db6c6b 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -64,6 +64,8 @@ type Sink interface { Start() Flush() + + GetConn() net.Conn } // GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String @@ -244,3 +246,7 @@ func (s *BaseSink) Start() { func (s *BaseSink) Flush() { } + +func (s *BaseSink) GetConn() net.Conn { + return s.Conn +} diff --git a/stream/source.go b/stream/source.go index 0f3f06f..1c85ee5 100644 --- a/stream/source.go +++ b/stream/source.go @@ -3,6 +3,7 @@ package stream import ( "encoding/json" "fmt" + "github.com/yangjiechina/lkm/collections" "github.com/yangjiechina/lkm/log" "net" "net/url" @@ -91,7 +92,7 @@ type Source interface { Close() // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 - FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool + FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket OnDiscardPacket(pkt utils.AVPacket) @@ -140,15 +141,15 @@ type PublishSource struct { state SessionState Conn net.Conn - TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket - recordSink Sink //每个Source的录制流 - hlsStream TransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成 - audioTranscoders []transcode.Transcoder //音频解码器 - videoTranscoders []transcode.Transcoder //视频解码器 - originStreams StreamManager //推流的音视频Streams - allStreams StreamManager //推流Streams+转码器获得的Stream - pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. - gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 + TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket + recordSink Sink //每个Source的录制流 + hlsStream TransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成 + audioTranscoders []transcode.Transcoder //音频解码器 + videoTranscoders []transcode.Transcoder //视频解码器 + originStreams StreamManager //推流的音视频Streams + allStreams StreamManager //推流Streams+转码器获得的Stream + pktBuffers [8]collections.MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. + gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 existVideo bool //是否存在视频 completed bool @@ -228,21 +229,21 @@ func (s *PublishSource) CreateDefaultOutStreams() { } // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 -func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool { +func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool { if index >= cap(s.pktBuffers) { panic("流路数过多...") } if s.pktBuffers[index] == nil { if utils.AVMediaTypeAudio == mediaType { - s.pktBuffers[index] = NewRbMemoryPool(48000 * 64) + s.pktBuffers[index] = collections.NewRbMemoryPool(48000 * 64) } else if AppConfig.GOPCache { //开启GOP缓存 - s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize) + s.pktBuffers[index] = collections.NewRbMemoryPool(AppConfig.GOPBufferSize) } else { //未开启GOP缓存 //1M缓存大小, 单帧绰绰有余 - s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000) + s.pktBuffers[index] = collections.NewRbMemoryPool(1024 * 1000) } } diff --git a/stream/trans_stream.go b/stream/trans_stream.go index d1ba0da..ebd2c5f 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -1,7 +1,9 @@ package stream import ( + "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/lkm/log" ) // TransStream 将AVPacket封装成传输流,转发给各个Sink @@ -97,3 +99,32 @@ func (t *BaseTransStream) SendPacket(data []byte) error { return nil } + +type TCPTransStream struct { + BaseTransStream +} + +func (t *TCPTransStream) AddSink(sink Sink) error { + if err := t.BaseTransStream.AddSink(sink); err != nil { + return err + } + + sink.GetConn().(*transport.Conn).EnableAsyncWriteMode(AppConfig.WriteBufferNumber - 1) + return nil +} + +func (t *TCPTransStream) SendPacket(data []byte) error { + for _, sink := range t.Sinks { + err := sink.Input(data) + if err == nil { + continue + } + + if _, ok := err.(*transport.ZeroWindowSizeError); ok { + log.Sugar.Errorf("发送超时, 强制删除 sink:%s", sink.PrintInfo()) + go sink.Close() + } + } + + return nil +}