diff --git a/go.mod b/go.mod index 0b3f618..c6ff28c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,9 @@ module github.com/yangjiechina/live-server require github.com/yangjiechina/avformat v0.0.0 + +require golang.org/x/sys v0.15.0 // indirect + replace github.com/yangjiechina/avformat => ../avformat go 1.19 diff --git a/main.go b/main.go index 2288fe2..9e52b6d 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,20 @@ package main import ( + "net" + "net/http" + + _ "net/http/pprof" + + "github.com/yangjiechina/avformat/librtmp" "github.com/yangjiechina/avformat/utils" + "github.com/yangjiechina/live-server/rtmp" "github.com/yangjiechina/live-server/stream" ) func CreateTransStream(protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream { if stream.ProtocolRtmp == protocol { - + return rtmp.NewTransStream(librtmp.ChunkSize) } return nil @@ -18,5 +25,24 @@ func init() { } func main() { + stream.AppConfig.GOPCache = 2 + impl := rtmp.NewServer() + addr := "0.0.0.0:1935" + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + panic(err) + } + err = impl.Start(tcpAddr) + if err != nil { + panic(err) + } + + println("启动rtmp服务成功:" + addr) + + loadConfigError := http.ListenAndServe(":19999", nil) + if loadConfigError != nil { + panic(loadConfigError) + } + select {} } diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 95c2de6..e0b6c8d 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -1,9 +1,10 @@ package rtmp import ( + "net" + "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" - "net" ) type IServer interface { @@ -12,6 +13,10 @@ type IServer interface { Close() } +func NewServer() IServer { + return &serverImpl{} +} + type serverImpl struct { tcp *transport.TCPServer } diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index 567bf71..4f4e07e 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -17,8 +17,10 @@ type TransStream struct { audioChunk librtmp.Chunk videoChunk librtmp.Chunk - memoryPool stream.MemoryPool - transBuffer stream.StreamBuffer + memoryPool stream.MemoryPool + transBuffer stream.StreamBuffer + lastTs int64 + chunkSizeQueue *stream.Queue } func (t *TransStream) Input(packet utils.AVPacket) { @@ -92,19 +94,77 @@ func (t *TransStream) Input(packet utils.AVPacket) { rtmpData := t.memoryPool.Fetch()[:n] ret := true if stream.AppConfig.GOPCache > 0 { - ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) + //ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) + ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts()) + } + + if !ret || stream.AppConfig.GOPCache < 1 { + t.memoryPool.FreeTail() } if ret { //发送给sink + mergeWriteLatency := int64(350) + + if mergeWriteLatency == 0 { + for _, sink := range t.Sinks { + sink.Input(rtmpData) + } + + return + } + + t.chunkSizeQueue.Push(len(rtmpData)) + if t.lastTs == 0 { + t.transBuffer.Peek(0).(utils.AVPacket).Dts() + } + + if mergeWriteLatency > t.transBuffer.Peek(t.transBuffer.Size()-1).(utils.AVPacket).Dts()-t.lastTs { + return + } + + head, tail := t.memoryPool.Data() + queueHead, queueTail := t.chunkSizeQueue.All() + var offset int + var size int + endTs := t.lastTs + mergeWriteLatency + for i := 0; i < t.transBuffer.Size(); i++ { + pkt := t.transBuffer.Peek(i).(utils.AVPacket) + if pkt.Dts() < t.lastTs { + if i < len(queueHead) { + offset += queueHead[i].(int) + } else { + offset += queueTail[i+1%len(queueTail)].(int) + } + continue + } + if pkt.Dts() > endTs { + break + } + + size += len(pkt.Data()) + t.lastTs = pkt.Dts() + } + + var data1 []byte + var data2 []byte + if offset+size > len(head) { + data1 = head[offset:] + size -= len(head[offset:]) + data2 = tail[:size] + } else { + data1 = head[offset : offset+size] + } for _, sink := range t.Sinks { - sink.Input(rtmpData) - } - } + if data1 != nil { + sink.Input(data1) + } - if stream.AppConfig.GOPCache < 1 { - t.memoryPool.FreeTail() + if data2 != nil { + sink.Input(data2) + } + } } } @@ -114,15 +174,16 @@ func (t *TransStream) AddSink(sink stream.ISink) { utils.Assert(t.headerSize > 0) sink.Input(t.header[:t.headerSize]) - if stream.AppConfig.GOPCache > 0 { - t.transBuffer.PeekAll(func(packet interface{}) { - sink.Input(packet.([]byte)) - }) - } + // if stream.AppConfig.GOPCache > 0 { + // t.transBuffer.PeekAll(func(packet interface{}) { + // sink.Input(packet.([]byte)) + // }) + // } } func (t *TransStream) onDiscardPacket(pkt interface{}) { t.memoryPool.FreeHead() + t.chunkSizeQueue.Pop() } func (t *TransStream) WriteHeader() error { @@ -188,5 +249,6 @@ func (t *TransStream) WriteHeader() error { func NewTransStream(chunkSize int) stream.ITransStream { transStream := &TransStream{chunkSize: chunkSize, TransStreamImpl: stream.TransStreamImpl{Sinks: make(map[stream.SinkId]stream.ISink, 64)}} + transStream.chunkSizeQueue = stream.NewQueue(512) return transStream } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index 8942514..7a011b3 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -25,6 +25,8 @@ type MemoryPool interface { // FreeTail 从尾部释放指定大小内存 FreeTail() + + Data() ([]byte, []byte) } func NewMemoryPool(capacity int) MemoryPool { @@ -135,3 +137,12 @@ func (m *memoryPool) FreeTail() { m.tail = m.capacity } } + +func (m *memoryPool) Data() ([]byte, []byte) { + if m.tail <= m.head { + return m.data[m.head:], m.data[:m.tail] + } else { + return m.data[m.head:m.tail], nil + } + +}