支持rtmp拉流

This commit is contained in:
yangjiechina
2023-11-27 16:57:04 +08:00
parent 8485f6c857
commit b88be46bb7
3 changed files with 52 additions and 19 deletions

View File

@@ -1,6 +1,7 @@
package rtmp package rtmp
import ( import (
"github.com/yangjiechina/avformat/librtmp"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/stream" "github.com/yangjiechina/live-server/stream"
"net" "net"
@@ -9,7 +10,7 @@ import (
func CreateTransStream(protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream { func CreateTransStream(protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream {
if stream.ProtocolRtmp == protocol { if stream.ProtocolRtmp == protocol {
return &TransStream{} return NewTransStream(librtmp.ChunkSize)
} }
return nil return nil

View File

@@ -26,24 +26,30 @@ func (t *TransStream) Input(packet utils.AVPacket) {
var data []byte var data []byte
var chunk *librtmp.Chunk var chunk *librtmp.Chunk
var videoPkt bool var videoPkt bool
var length int
//rtmp chunk消息体的数据大小
var payloadSize int
if utils.AVMediaTypeAudio == packet.MediaType() { if utils.AVMediaTypeAudio == packet.MediaType() {
data = packet.Data() data = packet.Data()
length = len(data)
chunk = &t.audioChunk chunk = &t.audioChunk
payloadSize += 2 + length
} else if utils.AVMediaTypeVideo == packet.MediaType() { } else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
println("")
}
videoPkt = true videoPkt = true
data = packet.AVCCPacketData() data = packet.AVCCPacketData()
length = len(data)
chunk = &t.videoChunk chunk = &t.videoChunk
payloadSize += 5 + length
} }
length := len(data) //payloadSize += payloadSize / t.chunkSize
//rtmp chunk消息体的数据大小
payloadSize := 5 + length
payloadSize += payloadSize / t.chunkSize
//分配内存 //分配内存
t.memoryPool.Mark() t.memoryPool.Mark()
allocate := t.memoryPool.Allocate(12 + payloadSize) allocate := t.memoryPool.Allocate(12 + payloadSize + (payloadSize / t.chunkSize))
//写chunk头 //写chunk头
chunk.Length = payloadSize chunk.Length = payloadSize
@@ -54,13 +60,21 @@ func (t *TransStream) Input(packet utils.AVPacket) {
//写flv //写flv
ct := packet.Pts() - packet.Dts() ct := packet.Pts() - packet.Dts()
if videoPkt { if videoPkt {
n += t.muxer.WriteVideoData(allocate, uint32(ct), packet.KeyFrame(), false) n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false)
} else { } else {
n += t.muxer.WriteAudioData(allocate, false) n += t.muxer.WriteAudioData(allocate[12:], false)
} }
first := true
var min int
for length > 0 { for length > 0 {
min := utils.MinInt(length, t.chunkSize) if first {
min = utils.MinInt(length, t.chunkSize-5)
first = false
} else {
min = utils.MinInt(length, t.chunkSize)
}
copy(allocate[n:], data[:min]) copy(allocate[n:], data[:min])
n += min n += min
@@ -78,8 +92,9 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
} }
rtmpData := t.memoryPool.Fetch() rtmpData := t.memoryPool.Fetch()[:n]
ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && utils.AVMediaTypeVideo == packet.MediaType(), packet.Dts()) ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts())
if ret { if ret {
//发送给sink //发送给sink
@@ -88,18 +103,21 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
} }
t.memoryPool.FreeTail()
} }
func (t *TransStream) AddSink(sink stream.ISink) { func (t *TransStream) AddSink(sink stream.ISink) {
t.TransStreamImpl.AddSink(sink) t.TransStreamImpl.AddSink(sink)
utils.Assert(t.headerSize > 0)
sink.Input(t.header[:t.headerSize])
t.transBuffer.Peek(func(packet interface{}) { t.transBuffer.Peek(func(packet interface{}) {
sink.Input(packet.([]byte)) sink.Input(packet.([]byte))
}) })
} }
func (t *TransStream) onDiscardPacket(pkt interface{}) { func (t *TransStream) onDiscardPacket(pkt interface{}) {
//bytes := pkt.([]byte)
t.memoryPool.FreeHead() t.memoryPool.FreeHead()
} }
@@ -117,7 +135,7 @@ func (t *TransStream) WriteHeader() error {
audioStream = track audioStream = track
audioCodecId = audioStream.CodecId() audioCodecId = audioStream.CodecId()
t.audioChunk = librtmp.NewAudioChunk() t.audioChunk = librtmp.NewAudioChunk()
} else if utils.AVMediaTypeAudio == track.Type() { } else if utils.AVMediaTypeVideo == track.Type() {
videoStream = track videoStream = track
videoCodecId = videoStream.CodecId() videoCodecId = videoStream.CodecId()
t.videoChunk = librtmp.NewVideoChunk() t.videoChunk = librtmp.NewVideoChunk()
@@ -127,6 +145,7 @@ func (t *TransStream) WriteHeader() error {
utils.Assert(audioStream != nil || videoStream != nil) utils.Assert(audioStream != nil || videoStream != nil)
//初始化 //初始化
t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024) t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2) t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2)
@@ -135,17 +154,26 @@ func (t *TransStream) WriteHeader() error {
var n int var n int
if audioStream != nil { if audioStream != nil {
n += t.muxer.WriteAudioData(t.header, true) n += t.muxer.WriteAudioData(t.header[12:], true)
extra := audioStream.Extra() extra := audioStream.Extra()
copy(t.header[n:], extra) copy(t.header[n+12:], extra)
n += len(extra) n += len(extra)
t.audioChunk.Length = n
t.audioChunk.ToBytes(t.header)
n += 12
} }
if videoStream != nil { if videoStream != nil {
n += t.muxer.WriteAudioData(t.header[n:], true) tmp := n
n += t.muxer.WriteVideoData(t.header[n+12:], 0, false, true)
extra := videoStream.Extra() extra := videoStream.Extra()
copy(t.header[n:], extra) copy(t.header[n+12:], extra)
n += len(extra) n += len(extra)
t.videoChunk.Length = 5 + len(extra)
t.videoChunk.ToBytes(t.header[tmp:])
n += 12
} }
t.headerSize = n t.headerSize = n
@@ -153,6 +181,6 @@ func (t *TransStream) WriteHeader() error {
} }
func NewTransStream(chunkSize int) stream.ITransStream { func NewTransStream(chunkSize int) stream.ITransStream {
transStream := &TransStream{chunkSize: chunkSize} transStream := &TransStream{chunkSize: chunkSize, TransStreamImpl: stream.TransStreamImpl{Sinks: make(map[stream.SinkId]stream.ISink, 64)}}
return transStream return transStream
} }

View File

@@ -82,6 +82,10 @@ func (r *ringBuffer) Size() int {
} }
func (r *ringBuffer) All() ([]interface{}, []interface{}) { func (r *ringBuffer) All() ([]interface{}, []interface{}) {
if r.size == 0 {
return nil, nil
}
if r.head < r.tail { if r.head < r.tail {
return r.data[r.head:], r.data[:r.tail] return r.data[r.head:], r.data[:r.tail]
} else { } else {