支持enhanced-rtmp中的H265推拉流

This commit is contained in:
yangjiechina
2024-04-16 20:29:48 +08:00
parent ef9a0fea4f
commit 1fd0a2f063
11 changed files with 85 additions and 57 deletions

View File

@@ -59,9 +59,8 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
if videoKey {
head, _ := t.StreamBuffers[0].Data()
if len(head) > t.SegmentOffset {
t.StreamBuffers[0].Mark()
//分配末尾换行符
t.StreamBuffers[0].Allocate(2)
t.StreamBuffers[0].Fetch()
head, _ = t.StreamBuffers[0].Data()
t.writeSeparator(head[t.SegmentOffset:])
@@ -84,12 +83,9 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
separatorSize = 2
}
t.StreamBuffers[0].Mark()
allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize)
n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), packet.Dts(), packet.Pts(), packet.KeyFrame(), false)
copy(allocate[n:], data)
_ = t.StreamBuffers[0].Fetch()
if !full {
return nil
}
@@ -115,8 +111,9 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
t.muxer.AddAudioTrack(stream.CodecId(), 0, 0, 0)
} else if utils.AVMediaTypeVideo == stream.Type() {
t.muxer.AddVideoTrack(stream.CodecId())
t.muxer.AddProperty("width", stream.(utils.VideoStream).Width())
t.muxer.AddProperty("height", stream.(utils.VideoStream).Height())
t.muxer.AddProperty("width", stream.CodecParameters().SPSInfo().Width())
t.muxer.AddProperty("height", stream.CodecParameters().SPSInfo().Height())
}
return nil
}

View File

@@ -99,7 +99,7 @@ func (t *transStream) Input(packet utils.AVPacket) error {
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(), pts, dts, packet.KeyFrame())
return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
} else {
return t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
}

View File

@@ -34,7 +34,8 @@ func (t *transStream) Input(packet utils.AVPacket) error {
}
sink_.input(packet.Index(), extra, 0)
}
sink_.input(packet.Index(), packet.AnnexBPacketData(), 40)
sink_.input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), uint32(packet.Duration(1000)))
}
}

View File

@@ -49,7 +49,7 @@ func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) Publishe
func (p *publisher) Init() {
//创建内存池
p.audioMemoryPool = stream.NewMemoryPool(48000 * 1)
p.audioMemoryPool = stream.NewMemoryPool(48000 * 64)
if stream.AppConfig.GOPCache {
//以每秒钟4M码率大小创建内存池
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000)

View File

@@ -24,7 +24,10 @@ func NewTransStream(chunkSize int) stream.ITransStream {
return transStream
}
var count int
func (t *TransStream) Input(packet utils.AVPacket) error {
count++
utils.Assert(t.TransStreamImpl.Completed)
var data []byte
@@ -34,12 +37,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
var length int
//rtmp chunk消息体的数据大小
var payloadSize int
//先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
var chunkPayloadOffset int
ct := packet.Pts() - packet.Dts()
if utils.AVMediaTypeAudio == packet.MediaType() {
data = packet.Data()
length = len(data)
chunk = &t.audioChunk
payloadSize += 2 + length
chunkPayloadOffset = 2
payloadSize += chunkPayloadOffset + length
} else if utils.AVMediaTypeVideo == packet.MediaType() {
videoPkt = true
@@ -47,26 +54,30 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
data = packet.AVCCPacketData()
length = len(data)
chunk = &t.videoChunk
payloadSize += 5 + length
chunkPayloadOffset = t.muxer.ComputeVideoDataSize(uint32(ct))
payloadSize += chunkPayloadOffset + length
}
//遇到视频关键帧,不考虑合并写大小,发送之前剩余的数据.
if videoKey {
tmp := t.StreamBuffers[0]
head, _ := tmp.Data()
t.SendPacket(head[t.SegmentOffset:])
t.SwapStreamBuffer()
if len(head[t.SegmentOffset:]) > 0 {
bytes := head[t.SegmentOffset:]
t.SendPacket(bytes)
//交替使用两块内存
t.SwapStreamBuffer()
}
}
//分配内存
t.StreamBuffers[0].Mark()
allocate := t.StreamBuffers[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize))
//写chunk头
var ts uint32
if packet.CodecId() == utils.AVCodecIdAAC {
ts = uint32(packet.ConvertPts(libflv.AACFrameSize))
ts = uint32(packet.ConvertDts(libflv.AACFrameSize))
} else {
ts = uint32(packet.ConvertPts(1000))
ts = uint32(packet.ConvertDts(1000))
}
chunk.Length = payloadSize
@@ -75,22 +86,22 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
utils.Assert(n == 12)
//写flv
ct := packet.Pts() - packet.Dts()
if videoPkt {
n += t.muxer.WriteVideoData(allocate[12:], uint32(ct), packet.KeyFrame(), false)
n += chunk.WriteData(allocate[n:], data, t.chunkSize)
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
} else {
n += t.muxer.WriteAudioData(allocate[12:], false)
n += chunk.WriteData(allocate[n:], data, t.chunkSize)
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
}
_ = t.StreamBuffers[0].Fetch()[:n]
if t.Full(packet.Pts()) {
//未满合并写大小, 不发送
if !t.Full(packet.Dts()) {
return nil
}
head, _ := t.StreamBuffers[0].Data()
t.SendPacketWithOffset(head[:], t.SegmentOffset)
//发送合并写数据
t.SendPacketWithOffset(head, t.SegmentOffset)
return nil
}

View File

@@ -3,6 +3,8 @@ package rtsp
import (
"encoding/binary"
"fmt"
"github.com/yangjiechina/avformat/libavc"
"github.com/yangjiechina/avformat/libhevc"
"github.com/yangjiechina/avformat/librtp"
"github.com/yangjiechina/avformat/librtsp/sdp"
"github.com/yangjiechina/avformat/utils"
@@ -109,23 +111,23 @@ func (t *tranStream) Input(packet utils.AVPacket) error {
return nil
}
extra, err := t.TransStreamImpl.Tracks[packet.Index()].AnnexBExtraData()
if err != nil {
return err
stream_.cache = true
parameters := t.TransStreamImpl.Tracks[packet.Index()].CodecParameters()
if utils.AVCodecIdH265 == packet.CodecId() {
bytes := parameters.DecoderConfRecord().(*libhevc.HEVCDecoderConfRecord).VPS
stream_.muxer.Input(bytes[0], uint32(packet.ConvertPts(stream_.rate)))
}
var count int
stream_.cache = true
utils.SplitNalU(extra, func(nalu []byte) {
data := utils.RemoveStartCode(nalu)
stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate)))
count++
})
spsBytes := parameters.DecoderConfRecord().SPSBytes()
ppsBytes := parameters.DecoderConfRecord().PPSBytes()
stream_.muxer.Input(spsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
stream_.muxer.Input(ppsBytes[0], uint32(packet.ConvertPts(stream_.rate)))
stream_.header = stream_.tmp
}
data := utils.RemoveStartCode(packet.AnnexBPacketData())
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]))
stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate)))
}
@@ -153,7 +155,7 @@ func (t *tranStream) AddTrack(stream utils.AVStream) error {
//创建RTP封装
var muxer librtp.Muxer
if utils.AVCodecIdH264 == stream.CodecId() {
if utils.AVCodecIdH264 == stream.CodecId() || utils.AVCodecIdH265 == stream.CodecId() {
muxer = librtp.NewH264Muxer(payloadType.Pt, 0, 0xFFFFFFFF)
} else if utils.AVCodecIdAAC == stream.CodecId() {
muxer = librtp.NewAACMuxer(payloadType.Pt, 0, 0xFFFFFFFF)

View File

@@ -2,6 +2,7 @@ package rtsp
import (
"fmt"
"github.com/yangjiechina/avformat/libbufio"
"github.com/yangjiechina/avformat/utils"
)
@@ -54,7 +55,7 @@ func (t *transportManager) AllocTransport(tcp bool, cb func(port int)) error {
}
t.nextPort = t.nextPort + 1%t.endPort
t.nextPort = utils.MaxInt(t.nextPort, t.startPort)
t.nextPort = libbufio.MaxInt(t.nextPort, t.startPort)
return nil
}

View File

@@ -6,24 +6,31 @@ import (
// MemoryPool 从解复用阶段拼凑成完整的AVPacket开始(写)到GOP缓存结束(释放),整个过程都使用池中内存
// 类似环形缓冲区, 区别在于,写入的内存块是连续的、整块内存.
// 两种使用方式:
// 1. 已知需要分配内存大小, 直接使用Allocate()函数分配, 并且外部自行操作内存块
// 2. 未知分配内存大小, 先使用Mark()函数,标记内存起始偏移量, 再通过Write()函数将数据拷贝进内存块最后调用Fetch/Reset函数完成或释放内存块
//
// 两种使用方式互斥,不能同时使用.
type MemoryPool interface {
// Mark 标记一块写的内存地址
//使用流程 Mark->Write/Allocate....->Fetch/Reset
// Allocate 分配指定大小的内存
Allocate(size int) []byte
// Mark 标记内存块起始位置
Mark()
// Write 向内存块中写入数据, 必须先调用Mark函数
Write(data []byte)
// Fetch 获取当前内存块必须先调用Mark函数
Fetch() []byte
// Reset 清空写入的数据,本次缓存的数据无效
Reset()
// Reserve 保留指定大小的内存空间
//主要是为了和实现和Write相似功能但是不拷贝, 所以使用流程和Write一样.
Reserve(size int)
Allocate(size int) []byte
Fetch() []byte
// Reset 清空此次Write的标记本次缓存的数据无效
Reset()
// FreeHead 从头部释放指定大小内存
FreeHead()
@@ -127,6 +134,8 @@ func (m *memoryPool) allocate(size int) []byte {
}
func (m *memoryPool) Mark() {
utils.Assert(!m.mark)
m.markIndex = m.tail
m.mark = true
}
@@ -144,8 +153,9 @@ func (m *memoryPool) Reserve(size int) {
}
func (m *memoryPool) Allocate(size int) []byte {
utils.Assert(m.mark)
return m.allocate(size)
m.Mark()
_ = m.allocate(size)
return m.Fetch()
}
func (m *memoryPool) Fetch() []byte {
@@ -163,6 +173,7 @@ func (m *memoryPool) Reset() {
}
func (m *memoryPool) FreeHead() {
utils.Assert(!m.mark)
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.Pop().(int)
@@ -177,6 +188,7 @@ func (m *memoryPool) FreeHead() {
}
func (m *memoryPool) FreeTail() {
utils.Assert(!m.mark)
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.PopBack().(int)

View File

@@ -1,6 +1,9 @@
package stream
import "github.com/yangjiechina/avformat/utils"
import (
"github.com/yangjiechina/avformat/libbufio"
"github.com/yangjiechina/avformat/utils"
)
type Queue struct {
*ringBuffer
@@ -42,7 +45,7 @@ func (q *Queue) PopBack() interface{} {
value := q.ringBuffer.Tail()
q.size--
q.tail = utils.MaxInt(0, q.tail-1)
q.tail = libbufio.MaxInt(0, q.tail-1)
return value
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"encoding/binary"
"fmt"
"github.com/yangjiechina/avformat/utils"
"net"
@@ -50,13 +51,13 @@ type ISink interface {
func GenerateSinkId(addr net.Addr) SinkId {
network := addr.Network()
if "tcp" == network {
id := uint64(utils.BytesToInt(addr.(*net.TCPAddr).IP.To4()))
id := uint64(binary.BigEndian.Uint32(addr.(*net.TCPAddr).IP.To4()))
id <<= 32
id |= uint64(addr.(*net.TCPAddr).Port << 16)
return id
} else if "udp" == network {
id := uint64(utils.BytesToInt(addr.(*net.UDPAddr).IP.To4()))
id := uint64(binary.BigEndian.Uint32(addr.(*net.UDPAddr).IP.To4()))
id <<= 32
id |= uint64(addr.(*net.UDPAddr).Port << 16)

View File

@@ -198,13 +198,13 @@ func (c *CacheTransStream) Full(ts int64) bool {
}
func (c *CacheTransStream) SwapStreamBuffer() {
utils.Assert(c.ExistVideo)
if c.ExistVideo && AppConfig.MergeWriteLatency > 0 {
tmp := c.StreamBuffers[0]
c.StreamBuffers[0] = c.StreamBuffers[1]
c.StreamBuffers[1] = tmp
}
tmp := c.StreamBuffers[0]
c.StreamBuffers[0] = c.StreamBuffers[1]
c.StreamBuffers[1] = tmp
c.StreamBuffers[0].Clear()
c.PrePacketTS = -1
c.SegmentOffset = 0
}