mirror of
https://github.com/lkmio/lkm.git
synced 2025-10-05 15:16:49 +08:00
完善rtmp server
This commit is contained in:
@@ -2,27 +2,114 @@ package rtmp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat/libflv"
|
"github.com/yangjiechina/avformat/libflv"
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/stream"
|
"github.com/yangjiechina/live-server/stream"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Publisher struct {
|
type Publisher struct {
|
||||||
stream.SourceImpl
|
stream.SourceImpl
|
||||||
deMuxer libflv.DeMuxer
|
deMuxer libflv.DeMuxer
|
||||||
|
audioMemoryPool stream.MemoryPool
|
||||||
|
videoMemoryPool stream.MemoryPool
|
||||||
|
audioPacket []byte
|
||||||
|
videoPacket []byte
|
||||||
|
|
||||||
|
audioUnmark bool
|
||||||
|
videoUnmark bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPublisher(sourceId string) *Publisher {
|
func NewPublisher(sourceId string) *Publisher {
|
||||||
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}}
|
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}}
|
||||||
muxer := &libflv.DeMuxer{}
|
publisher.deMuxer = libflv.DeMuxer{}
|
||||||
//设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
|
//设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
|
||||||
muxer.SetHandler(publisher)
|
publisher.deMuxer.SetHandler(publisher)
|
||||||
|
|
||||||
|
//创建内存池
|
||||||
|
publisher.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1))
|
||||||
|
if stream.AppConfig.GOPCache > 0 {
|
||||||
|
//以每秒钟4M码率大小创建内存池
|
||||||
|
publisher.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8 * stream.AppConfig.GOPCache)
|
||||||
|
} else {
|
||||||
|
publisher.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8)
|
||||||
|
}
|
||||||
return publisher
|
return publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnVideo 从rtmpchunk解析过来的视频包
|
func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
|
||||||
|
tmp := stream_.Extra()
|
||||||
|
bytes := make([]byte, len(tmp))
|
||||||
|
copy(bytes, tmp)
|
||||||
|
stream_.SetExtraData(bytes)
|
||||||
|
|
||||||
|
if utils.AVMediaTypeAudio == stream_.Type() {
|
||||||
|
p.audioMemoryPool.FreeTail(len(p.audioPacket))
|
||||||
|
} else if utils.AVMediaTypeVideo == stream_.Type() {
|
||||||
|
p.videoMemoryPool.FreeTail(len(p.videoPacket))
|
||||||
|
}
|
||||||
|
|
||||||
|
p.SourceImpl.OnDeMuxStream(stream_)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) OnDeMuxStreamDone() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
||||||
|
p.SourceImpl.OnDeMuxPacket(index, packet)
|
||||||
|
|
||||||
|
if stream.AppConfig.GOPCache > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if utils.AVMediaTypeAudio == packet.MediaType() {
|
||||||
|
p.audioMemoryPool.FreeHead(len(packet.Data()))
|
||||||
|
} else if utils.AVMediaTypeVideo == packet.MediaType() {
|
||||||
|
p.videoMemoryPool.FreeHead(len(packet.Data()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) OnDeMuxDone() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnVideo 从rtm chunk解析过来的视频包
|
||||||
func (p *Publisher) OnVideo(data []byte, ts uint32) {
|
func (p *Publisher) OnVideo(data []byte, ts uint32) {
|
||||||
|
if data == nil {
|
||||||
|
data = p.videoMemoryPool.Fetch()
|
||||||
|
p.videoUnmark = false
|
||||||
|
}
|
||||||
|
|
||||||
|
p.videoPacket = data
|
||||||
_ = p.deMuxer.InputVideo(data, ts)
|
_ = p.deMuxer.InputVideo(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnAudio(data []byte, ts uint32) {
|
func (p *Publisher) OnAudio(data []byte, ts uint32) {
|
||||||
|
if data == nil {
|
||||||
|
data = p.audioMemoryPool.Fetch()
|
||||||
|
p.audioUnmark = false
|
||||||
|
}
|
||||||
|
|
||||||
|
p.audioPacket = data
|
||||||
_ = p.deMuxer.InputAudio(data, ts)
|
_ = p.deMuxer.InputAudio(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnPartPacket 从rtmp解析过来的部分音视频包
|
||||||
|
func (p *Publisher) OnPartPacket(index int, data []byte, first bool) {
|
||||||
|
//audio
|
||||||
|
if index == 0 {
|
||||||
|
if p.audioUnmark {
|
||||||
|
p.audioMemoryPool.Mark()
|
||||||
|
p.audioUnmark = true
|
||||||
|
}
|
||||||
|
|
||||||
|
p.audioMemoryPool.Write(data)
|
||||||
|
//video
|
||||||
|
} else if index == 1 {
|
||||||
|
if p.videoUnmark {
|
||||||
|
p.videoMemoryPool.Mark()
|
||||||
|
p.videoUnmark = true
|
||||||
|
}
|
||||||
|
|
||||||
|
p.videoMemoryPool.Write(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
|
||||||
"github.com/yangjiechina/avformat/transport"
|
"github.com/yangjiechina/avformat/transport"
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,7 +17,7 @@ type serverImpl struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverImpl) Start(addr net.Addr) error {
|
func (s *serverImpl) Start(addr net.Addr) error {
|
||||||
avformat.Assert(s.tcp == nil)
|
utils.Assert(s.tcp == nil)
|
||||||
|
|
||||||
server := &transport.TCPServer{}
|
server := &transport.TCPServer{}
|
||||||
server.SetHandler(s)
|
server.SetHandler(s)
|
||||||
@@ -37,7 +37,7 @@ func (s *serverImpl) Close() {
|
|||||||
|
|
||||||
func (s *serverImpl) OnConnected(conn net.Conn) {
|
func (s *serverImpl) OnConnected(conn net.Conn) {
|
||||||
t := conn.(*transport.Conn)
|
t := conn.(*transport.Conn)
|
||||||
t.Data = NewSession()
|
t.Data = NewSession(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
|
func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
|
||||||
|
@@ -1,11 +1,10 @@
|
|||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
|
||||||
"github.com/yangjiechina/avformat/librtmp"
|
"github.com/yangjiechina/avformat/librtmp"
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/stream"
|
"github.com/yangjiechina/live-server/stream"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Session 负责除RTMP连接和断开以外的所有生命周期处理
|
// Session 负责除RTMP连接和断开以外的所有生命周期处理
|
||||||
@@ -15,43 +14,49 @@ type Session interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSession() *sessionImpl {
|
func NewSession(conn net.Conn) Session {
|
||||||
impl := &sessionImpl{}
|
impl := &sessionImpl{}
|
||||||
stack := librtmp.NewStack(impl)
|
stack := librtmp.NewStack(impl)
|
||||||
impl.stack = stack
|
impl.stack = stack
|
||||||
|
impl.conn = conn
|
||||||
return impl
|
return impl
|
||||||
}
|
}
|
||||||
|
|
||||||
type sessionImpl struct {
|
type sessionImpl struct {
|
||||||
stream.SessionImpl
|
stream.SessionImpl
|
||||||
|
//解析rtmp协议栈
|
||||||
stack *librtmp.Stack
|
stack *librtmp.Stack
|
||||||
//publisher/sink
|
//publisher/sink
|
||||||
handle interface{}
|
handle interface{}
|
||||||
|
conn net.Conn
|
||||||
|
|
||||||
streamId string
|
streamId string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) OnPublish(app, stream_ string, response chan avformat.HookState) {
|
func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookState) {
|
||||||
s.streamId = app + "/" + stream_
|
s.streamId = app + "/" + stream_
|
||||||
publisher := NewPublisher(s.streamId)
|
publisher := NewPublisher(s.streamId)
|
||||||
s.stack.SetOnPublishHandler(publisher)
|
s.stack.SetOnPublishHandler(publisher)
|
||||||
|
s.stack.SetOnTransDeMuxerHandler(publisher)
|
||||||
|
//stream.SessionImpl统一处理, Source是否已经存在, Hook回调....
|
||||||
s.SessionImpl.OnPublish(publisher, nil, func() {
|
s.SessionImpl.OnPublish(publisher, nil, func() {
|
||||||
s.handle = publisher
|
s.handle = publisher
|
||||||
response <- http.StatusOK
|
response <- utils.HookStateOK
|
||||||
}, func(state avformat.HookState) {
|
}, func(state utils.HookState) {
|
||||||
response <- state
|
response <- state
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) OnPlay(app, stream string, response chan avformat.HookState) {
|
func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) {
|
||||||
s.streamId = app + "/" + stream
|
s.streamId = app + "/" + stream_
|
||||||
//sink := &Sink{}
|
|
||||||
//s.SessionImpl.OnPlay(sink, nil, func() {
|
sink := NewSink(stream.GenerateSinkId(s.conn), s.conn)
|
||||||
// s.handle = sink
|
s.SessionImpl.OnPlay(sink, nil, func() {
|
||||||
// response <- http.StatusOK
|
s.handle = sink
|
||||||
//}, func(state avformat.HookState) {
|
response <- utils.HookStateOK
|
||||||
// response <- state
|
}, func(state utils.HookState) {
|
||||||
//})
|
response <- state
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
|
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
|
||||||
|
@@ -1,7 +1,11 @@
|
|||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
import "github.com/yangjiechina/live-server/stream"
|
import (
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
|
"github.com/yangjiechina/live-server/stream"
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
type Sink struct {
|
func NewSink(id stream.SinkId, conn net.Conn) stream.ISink {
|
||||||
stream.SinkImpl
|
return &stream.SinkImpl{Id_: id, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE}
|
||||||
}
|
}
|
||||||
|
@@ -1,4 +1,158 @@
|
|||||||
package rtmp
|
package rtmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/yangjiechina/avformat/libflv"
|
||||||
|
"github.com/yangjiechina/avformat/librtmp"
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
|
"github.com/yangjiechina/live-server/stream"
|
||||||
|
)
|
||||||
|
|
||||||
type TransStream struct {
|
type TransStream struct {
|
||||||
|
stream.TransStreamImpl
|
||||||
|
chunkSize int
|
||||||
|
header []byte //音视频头chunk
|
||||||
|
headerSize int
|
||||||
|
muxer *libflv.Muxer
|
||||||
|
|
||||||
|
audioChunk librtmp.Chunk
|
||||||
|
videoChunk librtmp.Chunk
|
||||||
|
|
||||||
|
memoryPool stream.MemoryPool
|
||||||
|
transBuffer stream.StreamBuffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStream) Input(packet utils.AVPacket) {
|
||||||
|
utils.Assert(t.TransStreamImpl.Completed)
|
||||||
|
var data []byte
|
||||||
|
var chunk *librtmp.Chunk
|
||||||
|
var videoPkt bool
|
||||||
|
|
||||||
|
if utils.AVMediaTypeAudio == packet.MediaType() {
|
||||||
|
data = packet.Data()
|
||||||
|
chunk = &t.audioChunk
|
||||||
|
} else if utils.AVMediaTypeVideo == packet.MediaType() {
|
||||||
|
videoPkt = true
|
||||||
|
data = packet.AVCCPacketData()
|
||||||
|
chunk = &t.videoChunk
|
||||||
|
}
|
||||||
|
|
||||||
|
length := len(data)
|
||||||
|
//rtmp chunk消息体的数据大小
|
||||||
|
payloadSize := 5 + length
|
||||||
|
payloadSize += payloadSize / t.chunkSize
|
||||||
|
|
||||||
|
//分配内存
|
||||||
|
t.memoryPool.Mark()
|
||||||
|
allocate := t.memoryPool.Allocate(12 + payloadSize)
|
||||||
|
|
||||||
|
//写chunk头
|
||||||
|
chunk.Length = payloadSize
|
||||||
|
chunk.Timestamp = uint32(packet.Dts())
|
||||||
|
n := chunk.ToBytes(allocate)
|
||||||
|
utils.Assert(n == 12)
|
||||||
|
|
||||||
|
//写flv
|
||||||
|
ct := packet.Pts() - packet.Dts()
|
||||||
|
if videoPkt {
|
||||||
|
n += t.muxer.WriteVideoData(allocate, uint32(ct), packet.KeyFrame(), false)
|
||||||
|
} else {
|
||||||
|
n += t.muxer.WriteAudioData(allocate, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
for length > 0 {
|
||||||
|
min := utils.MinInt(length, t.chunkSize)
|
||||||
|
copy(allocate[n:], data[:min])
|
||||||
|
n += min
|
||||||
|
|
||||||
|
length -= min
|
||||||
|
data = data[min:]
|
||||||
|
|
||||||
|
//写一个ChunkType3用作分割
|
||||||
|
if length > 0 {
|
||||||
|
if videoPkt {
|
||||||
|
allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdVideo)
|
||||||
|
} else {
|
||||||
|
allocate[n] = (0x3 << 6) | byte(librtmp.ChunkStreamIdAudio)
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rtmpData := t.memoryPool.Fetch()
|
||||||
|
ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && utils.AVMediaTypeVideo == packet.MediaType(), packet.Dts())
|
||||||
|
if ret {
|
||||||
|
//发送给sink
|
||||||
|
|
||||||
|
for _, sink := range t.Sinks {
|
||||||
|
sink.Input(rtmpData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStream) AddSink(sink stream.ISink) {
|
||||||
|
t.TransStreamImpl.AddSink(sink)
|
||||||
|
|
||||||
|
t.transBuffer.Peek(func(packet interface{}) {
|
||||||
|
sink.Input(packet.([]byte))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStream) onDiscardPacket(pkt interface{}) {
|
||||||
|
bytes := pkt.([]byte)
|
||||||
|
t.memoryPool.FreeHead(len(bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStream) WriteHeader() error {
|
||||||
|
utils.Assert(t.Tracks != nil)
|
||||||
|
utils.Assert(!t.TransStreamImpl.Completed)
|
||||||
|
|
||||||
|
var audioStream utils.AVStream
|
||||||
|
var videoStream utils.AVStream
|
||||||
|
var audioCodecId utils.AVCodecID
|
||||||
|
var videoCodecId utils.AVCodecID
|
||||||
|
|
||||||
|
for _, track := range t.Tracks {
|
||||||
|
if utils.AVMediaTypeAudio == track.Type() {
|
||||||
|
audioStream = track
|
||||||
|
audioCodecId = audioStream.CodecId()
|
||||||
|
t.audioChunk = librtmp.NewAudioChunk()
|
||||||
|
} else if utils.AVMediaTypeAudio == track.Type() {
|
||||||
|
videoStream = track
|
||||||
|
videoCodecId = videoStream.CodecId()
|
||||||
|
t.videoChunk = librtmp.NewVideoChunk()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
utils.Assert(audioStream != nil || videoStream != nil)
|
||||||
|
|
||||||
|
//初始化
|
||||||
|
t.header = make([]byte, 1024)
|
||||||
|
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
|
||||||
|
t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2)
|
||||||
|
t.transBuffer = stream.NewStreamBuffer(2000)
|
||||||
|
t.transBuffer.SetDiscardHandler(t.onDiscardPacket)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if audioStream != nil {
|
||||||
|
n += t.muxer.WriteAudioData(t.header, true)
|
||||||
|
extra := audioStream.Extra()
|
||||||
|
copy(t.header[n:], extra)
|
||||||
|
n += len(extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
if videoStream != nil {
|
||||||
|
n += t.muxer.WriteAudioData(t.header[n:], true)
|
||||||
|
extra := videoStream.Extra()
|
||||||
|
copy(t.header[n:], extra)
|
||||||
|
n += len(extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.headerSize = n
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTransStream(chunkSize int) stream.ITransStream {
|
||||||
|
transStream := &TransStream{chunkSize: chunkSize}
|
||||||
|
return transStream
|
||||||
}
|
}
|
||||||
|
@@ -1,16 +1,36 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
// MemoryPool
|
import (
|
||||||
// 从解复用阶段,拼凑成完整的AVPacket开始(写),到GOP缓存结束(释放),整个过程都使用池中内存
|
"github.com/yangjiechina/avformat/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MemoryPool 从解复用阶段,拼凑成完整的AVPacket开始(写),到GOP缓存结束(释放),整个过程都使用池中内存
|
||||||
|
// 类似环形缓冲区, 区别在于,写入的内存块是连续的、整块内存.
|
||||||
type MemoryPool interface {
|
type MemoryPool interface {
|
||||||
|
// Mark 标记一块写的内存地址
|
||||||
|
//使用流程 Mark->Write/Allocate....->Fetch/Reset
|
||||||
|
Mark()
|
||||||
|
|
||||||
|
Write(data []byte)
|
||||||
|
|
||||||
Allocate(size int) []byte
|
Allocate(size int) []byte
|
||||||
|
|
||||||
Free(size int)
|
Fetch() []byte
|
||||||
|
|
||||||
|
// Reset 清空此次Write的标记,本次缓存的数据无效
|
||||||
|
Reset()
|
||||||
|
|
||||||
|
// FreeHead 从头部释放指定大小内存
|
||||||
|
FreeHead(size int)
|
||||||
|
|
||||||
|
// FreeTail 从尾部释放指定大小内存
|
||||||
|
FreeTail(size int)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMemoryPool(capacity int) MemoryPool {
|
func NewMemoryPool(capacity int) MemoryPool {
|
||||||
pool := &memoryPool{
|
pool := &memoryPool{
|
||||||
data: make([]byte, capacity),
|
data: make([]byte, capacity),
|
||||||
|
capacity: capacity,
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool
|
return pool
|
||||||
@@ -18,12 +38,79 @@ func NewMemoryPool(capacity int) MemoryPool {
|
|||||||
|
|
||||||
type memoryPool struct {
|
type memoryPool struct {
|
||||||
data []byte
|
data []byte
|
||||||
size int
|
ptrStart uintptr
|
||||||
|
ptrEnd uintptr
|
||||||
|
//剩余的可用内存空间不足以为此次write
|
||||||
|
capacity int
|
||||||
|
head int
|
||||||
|
tail int
|
||||||
|
|
||||||
|
//保存开始索引
|
||||||
|
mark int
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据head和tail计算出可用的内存地址
|
||||||
|
func (m *memoryPool) allocate(size int) []byte {
|
||||||
|
if m.capacity-m.tail < size {
|
||||||
|
//使用从头释放的内存
|
||||||
|
if m.tail-m.mark+size <= m.head {
|
||||||
|
copy(m.data, m.data[m.mark:m.tail])
|
||||||
|
m.capacity = m.mark
|
||||||
|
m.tail = m.tail - m.mark
|
||||||
|
m.mark = 0
|
||||||
|
} else {
|
||||||
|
|
||||||
|
//扩容
|
||||||
|
capacity := (cap(m.data) + m.tail - m.mark + size) * 3 / 2
|
||||||
|
bytes := make([]byte, capacity)
|
||||||
|
//不对之前的内存进行复制, 已经被AVPacket引用, 自行GC
|
||||||
|
copy(bytes, m.data[m.mark:m.tail])
|
||||||
|
m.data = bytes
|
||||||
|
m.capacity = capacity
|
||||||
|
m.tail = m.tail - m.mark
|
||||||
|
m.mark = 0
|
||||||
|
m.head = 0
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes := m.data[m.tail:]
|
||||||
|
m.tail += size
|
||||||
|
return bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) Mark() {
|
||||||
|
m.mark = m.tail
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) Write(data []byte) {
|
||||||
|
allocate := m.allocate(len(data))
|
||||||
|
copy(allocate, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPool) Allocate(size int) []byte {
|
func (m *memoryPool) Allocate(size int) []byte {
|
||||||
return nil
|
return m.allocate(size)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPool) Free(size int) {
|
func (m *memoryPool) Fetch() []byte {
|
||||||
|
return m.data[m.mark:m.tail]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) Reset() {
|
||||||
|
m.tail = m.mark
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) FreeHead(size int) {
|
||||||
|
m.head += size
|
||||||
|
if m.head == m.tail {
|
||||||
|
m.head = 0
|
||||||
|
m.tail = 0
|
||||||
|
} else if m.head >= m.capacity {
|
||||||
|
m.head = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryPool) FreeTail(size int) {
|
||||||
|
m.tail -= size
|
||||||
|
utils.Assert(m.tail >= 0)
|
||||||
}
|
}
|
||||||
|
25
stream/memory_pool_test.go
Normal file
25
stream/memory_pool_test.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMemoryPool(t *testing.T) {
|
||||||
|
bytes := make([]byte, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
bytes[i] = byte(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := NewMemoryPool(5)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
pool.Mark()
|
||||||
|
pool.Write(bytes)
|
||||||
|
fetch := pool.Fetch()
|
||||||
|
println(hex.Dump(fetch))
|
||||||
|
|
||||||
|
if i%2 == 0 {
|
||||||
|
pool.FreeHead(len(fetch))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -14,6 +14,8 @@ type RingBuffer interface {
|
|||||||
Tail() interface{}
|
Tail() interface{}
|
||||||
|
|
||||||
Size() int
|
Size() int
|
||||||
|
|
||||||
|
All() ([]interface{}, []interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRingBuffer(capacity int) RingBuffer {
|
func NewRingBuffer(capacity int) RingBuffer {
|
||||||
@@ -75,3 +77,11 @@ func (r *ringBuffer) Tail() interface{} {
|
|||||||
func (r *ringBuffer) Size() int {
|
func (r *ringBuffer) Size() int {
|
||||||
return r.size
|
return r.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ringBuffer) All() ([]interface{}, []interface{}) {
|
||||||
|
if r.head < r.tail {
|
||||||
|
return r.data[r.head:r.tail], nil
|
||||||
|
} else {
|
||||||
|
return r.data[r.head:], r.data[:r.tail]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,19 +1,19 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Session 封装推拉流Session 统一管理,统一 hook回调
|
// Session 封装推拉流Session 统一管理,统一 hook回调
|
||||||
type Session interface {
|
type Session interface {
|
||||||
OnPublish(source ISource, pra map[string]interface{}, success func(), failure func(state avformat.HookState))
|
OnPublish(source ISource, pra map[string]interface{}, success func(), failure func(state utils.HookState))
|
||||||
|
|
||||||
OnPublishDone()
|
OnPublishDone()
|
||||||
|
|
||||||
OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state avformat.HookState))
|
OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state utils.HookState))
|
||||||
|
|
||||||
OnPlayDone(pra map[string]interface{}, success func(), failure func(state avformat.HookState))
|
OnPlayDone(pra map[string]interface{}, success func(), failure func(state utils.HookState))
|
||||||
}
|
}
|
||||||
|
|
||||||
type SessionImpl struct {
|
type SessionImpl struct {
|
||||||
@@ -30,19 +30,19 @@ func (s *SessionImpl) AddInfoParams(data map[string]interface{}) {
|
|||||||
data["remoteAddr"] = s.remoteAddr
|
data["remoteAddr"] = s.remoteAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
|
func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, success func(), failure func(state utils.HookState)) {
|
||||||
//streamId 已经被占用
|
//streamId 已经被占用
|
||||||
source := SourceManager.Find(s.stream)
|
source := SourceManager.Find(s.stream)
|
||||||
if source != nil {
|
if source != nil {
|
||||||
failure(avformat.HookStateOccupy)
|
failure(utils.HookStateOccupy)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !AppConfig.Hook.EnableOnPublish() {
|
if !AppConfig.Hook.EnableOnPublish() {
|
||||||
if err := SourceManager.Add(source_); err != nil {
|
if err := SourceManager.Add(source_); err == nil {
|
||||||
success()
|
success()
|
||||||
} else {
|
} else {
|
||||||
failure(avformat.HookStateOccupy)
|
failure(utils.HookStateOccupy)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@@ -54,18 +54,18 @@ func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, suc
|
|||||||
|
|
||||||
s.AddInfoParams(pra)
|
s.AddInfoParams(pra)
|
||||||
err := s.DoPublish(pra, func(response *http.Response) {
|
err := s.DoPublish(pra, func(response *http.Response) {
|
||||||
if err := SourceManager.Add(source_); err != nil {
|
if err := SourceManager.Add(source_); err == nil {
|
||||||
success()
|
success()
|
||||||
} else {
|
} else {
|
||||||
failure(avformat.HookStateOccupy)
|
failure(utils.HookStateOccupy)
|
||||||
}
|
}
|
||||||
}, func(response *http.Response, err error) {
|
}, func(response *http.Response, err error) {
|
||||||
failure(avformat.HookStateFailure)
|
failure(utils.HookStateFailure)
|
||||||
})
|
})
|
||||||
|
|
||||||
//hook地址连接失败
|
//hook地址连接失败
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failure(avformat.HookStateFailure)
|
failure(utils.HookStateFailure)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,7 +74,7 @@ func (s *SessionImpl) OnPublishDone() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SessionImpl) OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
|
func (s *SessionImpl) OnPlay(sink ISink, pra map[string]interface{}, success func(), failure func(state utils.HookState)) {
|
||||||
f := func() {
|
f := func() {
|
||||||
source := SourceManager.Find(s.stream)
|
source := SourceManager.Find(s.stream)
|
||||||
if source == nil {
|
if source == nil {
|
||||||
@@ -99,15 +99,15 @@ func (s *SessionImpl) OnPlay(sink ISink, pra map[string]interface{}, success fun
|
|||||||
f()
|
f()
|
||||||
success()
|
success()
|
||||||
}, func(response *http.Response, err error) {
|
}, func(response *http.Response, err error) {
|
||||||
failure(avformat.HookStateFailure)
|
failure(utils.HookStateFailure)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failure(avformat.HookStateFailure)
|
failure(utils.HookStateFailure)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SessionImpl) OnPlayDone(pra map[string]interface{}, success func(), failure func(state avformat.HookState)) {
|
func (s *SessionImpl) OnPlayDone(pra map[string]interface{}, success func(), failure func(state utils.HookState)) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -1,15 +1,16 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import "github.com/yangjiechina/avformat/utils"
|
import (
|
||||||
|
"github.com/yangjiechina/avformat/utils"
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
type SinkId string
|
type SinkId interface{}
|
||||||
|
|
||||||
type ISink interface {
|
type ISink interface {
|
||||||
Id() SinkId
|
Id() SinkId
|
||||||
|
|
||||||
Input(data []byte)
|
Input(data []byte) error
|
||||||
|
|
||||||
Send(buffer utils.ByteBuffer)
|
|
||||||
|
|
||||||
SourceId() string
|
SourceId() string
|
||||||
|
|
||||||
@@ -33,6 +34,26 @@ type ISink interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerateSinkId 根据Conn生成SinkId IPV4使用一个uint64, IPV6使用String
|
||||||
|
func GenerateSinkId(conn net.Conn) SinkId {
|
||||||
|
network := conn.RemoteAddr().Network()
|
||||||
|
if "tcp" == network {
|
||||||
|
id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.TCPAddr).IP.To4()))
|
||||||
|
id <<= 32
|
||||||
|
id |= uint64(conn.RemoteAddr().(*net.TCPAddr).Port << 16)
|
||||||
|
|
||||||
|
return id
|
||||||
|
} else if "udp" == network {
|
||||||
|
id := uint64(utils.BytesToInt(conn.RemoteAddr().(*net.UDPAddr).IP.To4()))
|
||||||
|
id <<= 32
|
||||||
|
id |= uint64(conn.RemoteAddr().(*net.UDPAddr).Port << 16)
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn.RemoteAddr().String()
|
||||||
|
}
|
||||||
|
|
||||||
func AddSinkToWaitingQueue(streamId string, sink ISink) {
|
func AddSinkToWaitingQueue(streamId string, sink ISink) {
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -46,35 +67,37 @@ func PopWaitingSinks(streamId string) []ISink {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SinkImpl struct {
|
type SinkImpl struct {
|
||||||
id string
|
Id_ SinkId
|
||||||
protocol Protocol
|
sourceId string
|
||||||
|
Protocol_ Protocol
|
||||||
enableVideo bool
|
enableVideo bool
|
||||||
|
|
||||||
desiredAudioCodecId utils.AVCodecID
|
DesiredAudioCodecId_ utils.AVCodecID
|
||||||
desiredVideoCodecId utils.AVCodecID
|
DesiredVideoCodecId_ utils.AVCodecID
|
||||||
|
|
||||||
|
Conn net.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Id() string {
|
func (s *SinkImpl) Id() SinkId {
|
||||||
return s.id
|
return s.Id_
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Input(data []byte) {
|
func (s *SinkImpl) Input(data []byte) error {
|
||||||
//TODO implement me
|
if s.Conn != nil {
|
||||||
panic("implement me")
|
_, err := s.Conn.Write(data)
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SinkImpl) Send(buffer utils.ByteBuffer) {
|
return err
|
||||||
//TODO implement me
|
}
|
||||||
panic("implement me")
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) SourceId() string {
|
func (s *SinkImpl) SourceId() string {
|
||||||
//TODO implement me
|
return s.sourceId
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Protocol() Protocol {
|
func (s *SinkImpl) Protocol() Protocol {
|
||||||
return s.protocol
|
return s.Protocol_
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) State() int {
|
func (s *SinkImpl) State() int {
|
||||||
@@ -95,7 +118,14 @@ func (s *SinkImpl) SetEnableVideo(enable bool) {
|
|||||||
s.enableVideo = enable
|
s.enableVideo = enable
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Close() {
|
func (s *SinkImpl) DesiredAudioCodecId() utils.AVCodecID {
|
||||||
//TODO implement me
|
return s.DesiredAudioCodecId_
|
||||||
panic("implement me")
|
}
|
||||||
|
|
||||||
|
func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID {
|
||||||
|
return s.DesiredVideoCodecId_
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SinkImpl) Close() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
"fmt"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/transcode"
|
"github.com/yangjiechina/live-server/transcode"
|
||||||
"time"
|
"time"
|
||||||
@@ -84,6 +84,7 @@ type SourceImpl struct {
|
|||||||
videoTranscoders []transcode.ITranscoder //视频解码器
|
videoTranscoders []transcode.ITranscoder //视频解码器
|
||||||
originStreams StreamManager //推流的音视频Streams
|
originStreams StreamManager //推流的音视频Streams
|
||||||
allStreams StreamManager //推流Streams+转码器获得的Streams
|
allStreams StreamManager //推流Streams+转码器获得的Streams
|
||||||
|
buffers []StreamBuffer
|
||||||
|
|
||||||
completed bool
|
completed bool
|
||||||
probeTimer *time.Timer
|
probeTimer *time.Timer
|
||||||
@@ -153,12 +154,12 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
|
|||||||
|
|
||||||
//创建音频转码器
|
//创建音频转码器
|
||||||
if !disableAudio && audioCodecId != audioStream.CodecId() {
|
if !disableAudio && audioCodecId != audioStream.CodecId() {
|
||||||
avformat.Assert(false)
|
utils.Assert(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
//创建视频转码器
|
//创建视频转码器
|
||||||
if !disableVideo && videoCodecId != videoStream.CodecId() {
|
if !disableVideo && videoCodecId != videoStream.CodecId() {
|
||||||
avformat.Assert(false)
|
utils.Assert(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
var streams [5]utils.AVStream
|
var streams [5]utils.AVStream
|
||||||
@@ -172,32 +173,53 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
|
|||||||
index++
|
index++
|
||||||
}
|
}
|
||||||
|
|
||||||
//transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:]...)
|
transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:]...)
|
||||||
TransStreamFactory(sink.Protocol(), streams[:])
|
transStream, ok := s.transStreams[transStreamId]
|
||||||
|
if ok {
|
||||||
|
transStream = TransStreamFactory(sink.Protocol(), streams[:])
|
||||||
|
s.transStreams[transStreamId] = transStream
|
||||||
|
|
||||||
|
for i := 0; i < index; i++ {
|
||||||
|
transStream.AddTrack(streams[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = transStream.WriteHeader()
|
||||||
|
}
|
||||||
|
|
||||||
|
transStream.AddSink(sink)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) RemoveSink(tid TransStreamId, sinkId string) bool {
|
func (s *SourceImpl) RemoveSink(tid TransStreamId, sinkId string) bool {
|
||||||
//TODO implement me
|
return true
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) Close() {
|
func (s *SourceImpl) Close() {
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
|
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
|
||||||
|
if s.completed {
|
||||||
|
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.originStreams.Add(stream)
|
s.originStreams.Add(stream)
|
||||||
s.allStreams.Add(stream)
|
s.allStreams.Add(stream)
|
||||||
if len(s.originStreams.All()) == 1 {
|
if len(s.originStreams.All()) == 1 {
|
||||||
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, s.writeHeader)
|
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, s.writeHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//为每个Stream创建对于的Buffer
|
||||||
|
if AppConfig.GOPCache > 0 {
|
||||||
|
buffer := NewStreamBuffer(int64(AppConfig.GOPCache))
|
||||||
|
s.buffers = append(s.buffers, buffer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 从DeMuxer解析完Stream后, 处理等待Sinks
|
// 从DeMuxer解析完Stream后, 处理等待Sinks
|
||||||
func (s *SourceImpl) writeHeader() {
|
func (s *SourceImpl) writeHeader() {
|
||||||
avformat.Assert(!s.completed)
|
utils.Assert(!s.completed)
|
||||||
s.probeTimer.Stop()
|
s.probeTimer.Stop()
|
||||||
s.completed = true
|
s.completed = true
|
||||||
|
|
||||||
@@ -212,7 +234,14 @@ func (s *SourceImpl) OnDeMuxStreamDone() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
func (s *SourceImpl) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
||||||
|
if AppConfig.GOPCache > 0 {
|
||||||
|
buffer := s.buffers[packet.Index()]
|
||||||
|
buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, stream := range s.transStreams {
|
||||||
|
stream.Input(packet)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxDone() {
|
func (s *SourceImpl) OnDeMuxDone() {
|
||||||
|
108
stream/stream_buffer.go
Normal file
108
stream/stream_buffer.go
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
package stream
|
||||||
|
|
||||||
|
// StreamBuffer GOP缓存
|
||||||
|
type StreamBuffer interface {
|
||||||
|
|
||||||
|
// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
|
||||||
|
AddPacket(packet interface{}, key bool, ts int64) bool
|
||||||
|
|
||||||
|
// SetDiscardHandler 设置丢弃帧时的回调
|
||||||
|
SetDiscardHandler(handler func(packet interface{}))
|
||||||
|
|
||||||
|
Peek(handler func(packet interface{}))
|
||||||
|
|
||||||
|
Duration() int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamBuffer struct {
|
||||||
|
buffer RingBuffer
|
||||||
|
duration int64
|
||||||
|
|
||||||
|
keyFrameDts int64 //最近一个关键帧的Dts
|
||||||
|
FarthestKeyFrameDts int64 //最远一个关键帧的Dts
|
||||||
|
|
||||||
|
discardHandler func(packet interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type element struct {
|
||||||
|
ts int64
|
||||||
|
key bool
|
||||||
|
pkt interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStreamBuffer(duration int64) StreamBuffer {
|
||||||
|
return &streamBuffer{duration: duration, buffer: NewRingBuffer(1000)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamBuffer) AddPacket(packet interface{}, key bool, ts int64) bool {
|
||||||
|
if s.buffer.IsEmpty() {
|
||||||
|
if !key {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
s.FarthestKeyFrameDts = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
s.buffer.Push(element{ts, key, packet})
|
||||||
|
if key {
|
||||||
|
s.keyFrameDts = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
//丢弃处理
|
||||||
|
//以最近的关键帧时间戳开始,丢弃缓存超过duration长度的帧
|
||||||
|
//至少需要保障当前GOP完整
|
||||||
|
//暂时不考虑以下情况:
|
||||||
|
// 1. 音频收流正常,视频长时间没收流,待视频恢复后。 会造成在此期间,多余的音频帧被丢弃,播放时有画面,没声音.
|
||||||
|
// 2. 视频反之亦然
|
||||||
|
if !key {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for farthest := s.keyFrameDts - s.duration; s.buffer.Size() > 1 && s.buffer.Head().(element).ts < farthest; {
|
||||||
|
ele := s.buffer.Pop().(element)
|
||||||
|
|
||||||
|
//重新设置最早的关键帧时间戳
|
||||||
|
if ele.key && ele.ts != s.FarthestKeyFrameDts {
|
||||||
|
s.FarthestKeyFrameDts = ele.ts
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.discardHandler != nil {
|
||||||
|
s.discardHandler(ele.pkt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) {
|
||||||
|
s.discardHandler = handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamBuffer) Peek(handler func(packet interface{})) {
|
||||||
|
head, tail := s.buffer.All()
|
||||||
|
|
||||||
|
if head == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, value := range head {
|
||||||
|
handler(value.(element).pkt)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tail == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, value := range tail {
|
||||||
|
handler(value.(element).pkt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamBuffer) Duration() int64 {
|
||||||
|
head := s.buffer.Head()
|
||||||
|
tail := s.buffer.Tail()
|
||||||
|
|
||||||
|
if head == nil || tail == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return tail.(element).ts - head.(element).ts
|
||||||
|
}
|
@@ -1,66 +1,9 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Track interface {
|
|
||||||
Stream() utils.AVStream
|
|
||||||
|
|
||||||
Cache() RingBuffer
|
|
||||||
|
|
||||||
AddPacket(packet utils.AVPacket)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 封装stream 增加GOP管理
|
|
||||||
type track struct {
|
|
||||||
stream utils.AVStream
|
|
||||||
cache RingBuffer
|
|
||||||
duration int
|
|
||||||
keyFrameDts int64 //最近一个关键帧的Dts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *track) Stream() utils.AVStream {
|
|
||||||
return t.stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *track) Cache() RingBuffer {
|
|
||||||
return t.cache
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *track) AddPacket(packet utils.AVPacket) {
|
|
||||||
if t.cache.IsEmpty() && !packet.KeyFrame() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.cache.Push(packet)
|
|
||||||
if packet.KeyFrame() {
|
|
||||||
t.keyFrameDts = packet.Dts()
|
|
||||||
}
|
|
||||||
|
|
||||||
//以最近的关键帧时间戳开始,丢弃缓存超过duration长度的帧
|
|
||||||
//至少需要保障当前GOP完整
|
|
||||||
//head := t.cache.Head().(utils.AVPacket)
|
|
||||||
//for farthest := t.keyFrameDts - int64(t.duration); t.cache.Size() > 1 && t.cache.Head().(utils.AVPacket).Dts() < farthest; {
|
|
||||||
// t.cache.Pop()
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTrack(stream utils.AVStream, cacheSeconds int) Track {
|
|
||||||
t := &track{stream: stream, duration: cacheSeconds * 1000}
|
|
||||||
|
|
||||||
if cacheSeconds > 0 {
|
|
||||||
if utils.AVMediaTypeVideo == stream.Type() {
|
|
||||||
t.cache = NewRingBuffer(cacheSeconds * 30 * 2)
|
|
||||||
} else if utils.AVMediaTypeAudio == stream.Type() {
|
|
||||||
t.cache = NewRingBuffer(cacheSeconds * 50 * 2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
type StreamManager struct {
|
type StreamManager struct {
|
||||||
streams []utils.AVStream
|
streams []utils.AVStream
|
||||||
completed bool
|
completed bool
|
||||||
@@ -68,8 +11,8 @@ type StreamManager struct {
|
|||||||
|
|
||||||
func (s *StreamManager) Add(stream utils.AVStream) {
|
func (s *StreamManager) Add(stream utils.AVStream) {
|
||||||
for _, stream_ := range s.streams {
|
for _, stream_ := range s.streams {
|
||||||
avformat.Assert(stream_.Type() != stream.Type())
|
utils.Assert(stream_.Type() != stream.Type())
|
||||||
avformat.Assert(stream_.CodecId() != stream.CodecId())
|
utils.Assert(stream_.CodecId() != stream.CodecId())
|
||||||
}
|
}
|
||||||
|
|
||||||
s.streams = append(s.streams, stream)
|
s.streams = append(s.streams, stream)
|
||||||
|
@@ -30,14 +30,14 @@ func init() {
|
|||||||
// 请确保ids根据值升序排序传参
|
// 请确保ids根据值升序排序传参
|
||||||
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
|
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
|
||||||
len_ := len(ids)
|
len_ := len(ids)
|
||||||
avformat.Assert(len_ > 0 && len_ < 8)
|
utils.Assert(len_ > 0 && len_ < 8)
|
||||||
|
|
||||||
var streamId uint64
|
var streamId uint64
|
||||||
streamId = uint64(protocol) << 56
|
streamId = uint64(protocol) << 56
|
||||||
|
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
bId, ok := narrowCodecIds[int(id)]
|
bId, ok := narrowCodecIds[int(id)]
|
||||||
avformat.Assert(ok)
|
utils.Assert(ok)
|
||||||
|
|
||||||
streamId |= uint64(bId) << (48 - i*8)
|
streamId |= uint64(bId) << (48 - i*8)
|
||||||
}
|
}
|
||||||
@@ -47,14 +47,14 @@ func init() {
|
|||||||
|
|
||||||
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
|
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
|
||||||
len_ := len(ids)
|
len_ := len(ids)
|
||||||
avformat.Assert(len_ > 0 && len_ < 8)
|
utils.Assert(len_ > 0 && len_ < 8)
|
||||||
|
|
||||||
var streamId uint64
|
var streamId uint64
|
||||||
streamId = uint64(protocol) << 56
|
streamId = uint64(protocol) << 56
|
||||||
|
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
bId, ok := narrowCodecIds[int(id.CodecId())]
|
bId, ok := narrowCodecIds[int(id.CodecId())]
|
||||||
avformat.Assert(ok)
|
utils.Assert(ok)
|
||||||
|
|
||||||
streamId |= uint64(bId) << (48 - i*8)
|
streamId |= uint64(bId) << (48 - i*8)
|
||||||
}
|
}
|
||||||
@@ -65,9 +65,11 @@ func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStream
|
|||||||
var TransStreamFactory func(protocol Protocol, streams []utils.AVStream) ITransStream
|
var TransStreamFactory func(protocol Protocol, streams []utils.AVStream) ITransStream
|
||||||
|
|
||||||
type ITransStream interface {
|
type ITransStream interface {
|
||||||
|
Input(packet utils.AVPacket)
|
||||||
|
|
||||||
AddTrack(stream utils.AVStream)
|
AddTrack(stream utils.AVStream)
|
||||||
|
|
||||||
WriteHeader()
|
WriteHeader() error
|
||||||
|
|
||||||
AddSink(sink ISink)
|
AddSink(sink ISink)
|
||||||
|
|
||||||
@@ -77,21 +79,27 @@ type ITransStream interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type TransStreamImpl struct {
|
type TransStreamImpl struct {
|
||||||
sinks map[SinkId]ISink
|
Sinks map[SinkId]ISink
|
||||||
muxer avformat.Muxer
|
muxer avformat.Muxer
|
||||||
tracks []utils.AVStream
|
Tracks []utils.AVStream
|
||||||
|
transBuffer MemoryPool //每个TransStream也缓存封装后的流
|
||||||
|
Completed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStreamImpl) Input(packet utils.AVPacket) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) AddTrack(stream utils.AVStream) {
|
func (t *TransStreamImpl) AddTrack(stream utils.AVStream) {
|
||||||
t.tracks = append(t.tracks, stream)
|
t.Tracks = append(t.Tracks, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) AddSink(sink ISink) {
|
func (t *TransStreamImpl) AddSink(sink ISink) {
|
||||||
t.sinks[sink.Id()] = sink
|
t.Sinks[sink.Id()] = sink
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) RemoveSink(id SinkId) {
|
func (t *TransStreamImpl) RemoveSink(id SinkId) {
|
||||||
delete(t.sinks, id)
|
delete(t.Sinks, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) AllSink() []ISink {
|
func (t *TransStreamImpl) AllSink() []ISink {
|
||||||
|
Reference in New Issue
Block a user