完善Source和Sink

This commit is contained in:
DESKTOP-COJOJSE\lenovo
2023-11-18 17:57:05 +08:00
parent bffd89375e
commit 33ec8159f1
13 changed files with 517 additions and 54 deletions

View File

@@ -1,5 +1,6 @@
{
"gop_cache": 0,
"probe_timeout": 2000,
"rtmp": {
"enable": true,

17
main.go
View File

@@ -1,5 +1,22 @@
package main
import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/stream"
)
func CreateTransStream(protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream {
if stream.ProtocolRtmp == protocol {
}
return nil
}
func init() {
stream.TransStreamFactory = CreateTransStream
}
func main() {
}

View File

@@ -13,11 +13,12 @@ type Publisher struct {
func NewPublisher(sourceId string) *Publisher {
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}}
muxer := &libflv.DeMuxer{}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
muxer.SetHandler(publisher)
return publisher
}
// OnVideo 从rtmpchunk解析过来的数据
// OnVideo 从rtmpchunk解析过来的视频
func (p *Publisher) OnVideo(data []byte, ts uint32) {
_ = p.deMuxer.InputVideo(data, ts)
}

View File

@@ -8,8 +8,9 @@ import (
"net/http"
)
// Session 负责除RTMP连接和断开以外的所有生命周期处理
type Session interface {
Input(conn net.Conn, data []byte) error
Input(conn net.Conn, data []byte) error //接受网络数据包再交由Stack处理
Close()
}
@@ -44,13 +45,13 @@ func (s *sessionImpl) OnPublish(app, stream_ string, response chan avformat.Hook
func (s *sessionImpl) OnPlay(app, stream string, response chan avformat.HookState) {
s.streamId = app + "/" + stream
sink := &Sink{}
s.SessionImpl.OnPlay(sink, nil, func() {
s.handle = sink
response <- http.StatusOK
}, func(state avformat.HookState) {
response <- state
})
//sink := &Sink{}
//s.SessionImpl.OnPlay(sink, nil, func() {
// s.handle = sink
// response <- http.StatusOK
//}, func(state avformat.HookState) {
// response <- state
//})
}
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
@@ -58,6 +59,4 @@ func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
}
func (s *sessionImpl) Close() {
//TODO implement me
panic("implement me")
}

4
rtmp/rtmp_transtream.go Normal file
View File

@@ -0,0 +1,4 @@
package rtmp
type TransStream struct {
}

View File

@@ -51,6 +51,8 @@ func init() {
}
type AppConfig_ struct {
Rtmp RtmpConfig
Hook HookConfig
GOPCache int `json:"gop_cache"` //缓存GOP个数不是时长
ProbeTimeout int `json:"probe_timeout"`
Rtmp RtmpConfig
Hook HookConfig
}

29
stream/memory_pool.go Normal file
View File

@@ -0,0 +1,29 @@
package stream
// MemoryPool
// 从解复用阶段拼凑成完整的AVPacket开始(写)到GOP缓存结束(释放),整个过程都使用池中内存
type MemoryPool interface {
Allocate(size int) []byte
Free(size int)
}
func NewMemoryPool(capacity int) MemoryPool {
pool := &memoryPool{
data: make([]byte, capacity),
}
return pool
}
type memoryPool struct {
data []byte
size int
}
func (m *memoryPool) Allocate(size int) []byte {
return nil
}
func (m *memoryPool) Free(size int) {
}

77
stream/ring_buffer.go Normal file
View File

@@ -0,0 +1,77 @@
package stream
type RingBuffer interface {
IsEmpty() bool
IsFull() bool
Push(value interface{})
Pop() interface{}
Head() interface{}
Tail() interface{}
Size() int
}
func NewRingBuffer(capacity int) RingBuffer {
r := &ringBuffer{
data: make([]interface{}, capacity),
head: 0,
tail: 0,
size: 0,
}
return r
}
type ringBuffer struct {
data []interface{}
head int
tail int
size int
}
func (r *ringBuffer) IsEmpty() bool {
return r.size == 0
}
func (r *ringBuffer) IsFull() bool {
return r.size == cap(r.data)
}
func (r *ringBuffer) Push(value interface{}) {
if r.IsFull() {
r.Pop()
}
r.data[r.tail] = value
r.tail = (r.tail + 1) % cap(r.data)
r.size++
}
func (r *ringBuffer) Pop() interface{} {
if r.IsEmpty() {
return nil
}
element := r.data[r.head]
r.head = (r.head + 1) % cap(r.data)
r.size--
return element
}
func (r *ringBuffer) Head() interface{} {
return r.data[r.head]
}
func (r *ringBuffer) Tail() interface{} {
return r.data[r.tail]
}
func (r *ringBuffer) Size() int {
return r.size
}

29
stream/ringbuffer_test.go Normal file
View File

@@ -0,0 +1,29 @@
package stream
import (
"fmt"
"testing"
)
func TestRingBuffer(t *testing.T) {
buffer := NewRingBuffer(10)
full := buffer.IsFull()
empty := buffer.IsEmpty()
head := buffer.Head()
tail := buffer.Tail()
pop := buffer.Pop()
println(full)
println(empty)
println(head)
println(tail)
println(pop)
for i := 0; i < 100; i++ {
buffer.Push(i)
}
for !buffer.IsEmpty() {
i := buffer.Pop()
println(fmt.Sprintf("element:%d", i.(int)))
}
}

View File

@@ -2,8 +2,10 @@ package stream
import "github.com/yangjiechina/avformat/utils"
type SinkId string
type ISink interface {
Id() string
Id() SinkId
Input(data []byte)
@@ -11,16 +13,23 @@ type ISink interface {
SourceId() string
Protocol() int
Protocol() Protocol
State() int
SetState(state int)
DisableVideo() bool
EnableVideo() bool
// SetEnableVideo 允许客户端只拉取音频流
SetEnableVideo(enable bool)
// DesiredAudioCodecId 允许客户端拉取指定的音频流
DesiredAudioCodecId() utils.AVCodecID
// DesiredVideoCodecId DescribeVideoCodecId 允许客户端拉取指定的视频流
DesiredVideoCodecId() utils.AVCodecID
Close()
}
@@ -28,7 +37,7 @@ func AddSinkToWaitingQueue(streamId string, sink ISink) {
}
func RemoveSinkFromWaitingQueue(streamId, sinkId string) ISink {
func RemoveSinkFromWaitingQueue(streamId, sinkId SinkId) ISink {
return nil
}
@@ -37,11 +46,16 @@ func PopWaitingSinks(streamId string) []ISink {
}
type SinkImpl struct {
id string
protocol Protocol
enableVideo bool
desiredAudioCodecId utils.AVCodecID
desiredVideoCodecId utils.AVCodecID
}
func (s *SinkImpl) Id() string {
//TODO implement me
panic("implement me")
return s.id
}
func (s *SinkImpl) Input(data []byte) {
@@ -59,9 +73,8 @@ func (s *SinkImpl) SourceId() string {
panic("implement me")
}
func (s *SinkImpl) Protocol() int {
//TODO implement me
panic("implement me")
func (s *SinkImpl) Protocol() Protocol {
return s.protocol
}
func (s *SinkImpl) State() int {
@@ -74,18 +87,15 @@ func (s *SinkImpl) SetState(state int) {
panic("implement me")
}
func (s *SinkImpl) DisableVideo() bool {
//TODO implement me
panic("implement me")
func (s *SinkImpl) EnableVideo() bool {
return s.enableVideo
}
func (s *SinkImpl) SetEnableVideo(enable bool) {
//TODO implement me
panic("implement me")
s.enableVideo = enable
}
func (s *SinkImpl) Close() {
//TODO implement me
panic("implement me")
}

View File

@@ -1,14 +1,14 @@
package stream
import (
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/transcode"
"time"
)
type TransStreamId uint32
// SourceType Source 推流类型
type SourceType uint32
type SourceType byte
// Protocol 输出协议
type Protocol uint32
@@ -56,7 +56,7 @@ type ISource interface {
// TranscodeStreams 返回转码的Streams
TranscodeStreams() []utils.AVStream
// AddSink 添加Sink, 在此之前Sink已经握手、授权通过. 如果Source还未WriteHeader将Sink添加到等待队列.
// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader将Sink添加到等待队列.
// 匹配拉流的编码器, 创建TransMuxer或向存在TransMuxer添加Sink
AddSink(sink ISink) bool
@@ -82,20 +82,22 @@ type SourceImpl struct {
recordSink ISink //每个Source唯一的一个录制流
audioTranscoders []transcode.ITranscoder //音频解码器
videoTranscoders []transcode.ITranscoder //视频解码器
transcodeStreams []utils.AVStream //从音视频解码器中获得的AVStream
originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Streams
completed bool
probeTimer *time.Timer
//所有的输出协议, 持有Sink
transStreams map[TransStreamId]TransStream
transStreams map[TransStreamId]ITransStream
}
func (s *SourceImpl) Id() string {
//TODO implement me
panic("implement me")
return s.Id_
}
func (s *SourceImpl) Input(data []byte) {
//TODO implement me
panic("implement me")
s.deMuxer.Input(data)
}
func (s *SourceImpl) CreateTransDeMuxer() ITransDeMuxer {
@@ -109,18 +111,70 @@ func (s *SourceImpl) CreateTranscoder(src utils.AVStream, dst utils.AVStream) tr
}
func (s *SourceImpl) OriginStreams() []utils.AVStream {
//TODO implement me
panic("implement me")
return s.originStreams.All()
}
func (s *SourceImpl) TranscodeStreams() []utils.AVStream {
//TODO implement me
panic("implement me")
return s.allStreams.All()
}
func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID) bool {
if ProtocolRtmp == protocol || ProtocolFlv == protocol {
}
return true
}
func (s *SourceImpl) AddSink(sink ISink) bool {
//TODO implement me
panic("implement me")
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
audioStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeAudio)
videoStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeVideo)
disableAudio := audioStream == nil
disableVideo := videoStream == nil || !sink.EnableVideo()
if disableAudio && disableVideo {
return false
}
//不支持对期望编码的流封装. 降级
if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.Protocol(), audioCodecId, videoCodecId) {
audioCodecId = utils.AVCodecIdNONE
videoCodecId = utils.AVCodecIdNONE
}
if !disableAudio && utils.AVCodecIdNONE == audioCodecId {
audioCodecId = audioStream.CodecId()
}
if !disableVideo && utils.AVCodecIdNONE == videoCodecId {
videoCodecId = videoStream.CodecId()
}
//创建音频转码器
if !disableAudio && audioCodecId != audioStream.CodecId() {
avformat.Assert(false)
}
//创建视频转码器
if !disableVideo && videoCodecId != videoStream.CodecId() {
avformat.Assert(false)
}
var streams [5]utils.AVStream
var index int
for _, stream := range s.originStreams.All() {
if disableVideo && stream.Type() == utils.AVMediaTypeVideo {
continue
}
streams[index] = stream
index++
}
//transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:]...)
TransStreamFactory(sink.Protocol(), streams[:])
return false
}
func (s *SourceImpl) RemoveSink(tid TransStreamId, sinkId string) bool {
@@ -134,21 +188,32 @@ func (s *SourceImpl) Close() {
}
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
//TODO implement me
panic("implement me")
s.originStreams.Add(stream)
s.allStreams.Add(stream)
if len(s.originStreams.All()) == 1 {
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, s.writeHeader)
}
}
// 从DeMuxer解析完Stream后, 处理等待Sinks
func (s *SourceImpl) writeHeader() {
avformat.Assert(!s.completed)
s.probeTimer.Stop()
s.completed = true
sinks := PopWaitingSinks(s.Id_)
for _, sink := range sinks {
s.AddSink(sink)
}
}
func (s *SourceImpl) OnDeMuxStreamDone() {
//TODO implement me
panic("implement me")
s.writeHeader()
}
func (s *SourceImpl) OnDeMuxPacket(index int, packet *utils.AVPacket2) {
//TODO implement me
panic("implement me")
func (s *SourceImpl) OnDeMuxPacket(index int, packet utils.AVPacket) {
}
func (s *SourceImpl) OnDeMuxDone() {
//TODO implement me
panic("implement me")
}

133
stream/stream_manager.go Normal file
View File

@@ -0,0 +1,133 @@
package stream
import (
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/utils"
)
type Track interface {
Stream() utils.AVStream
Cache() RingBuffer
AddPacket(packet utils.AVPacket)
}
// 封装stream 增加GOP管理
type track struct {
stream utils.AVStream
cache RingBuffer
duration int
keyFrameDts int64 //最近一个关键帧的Dts
}
func (t *track) Stream() utils.AVStream {
return t.stream
}
func (t *track) Cache() RingBuffer {
return t.cache
}
func (t *track) AddPacket(packet utils.AVPacket) {
if t.cache.IsEmpty() && !packet.KeyFrame() {
return
}
t.cache.Push(packet)
if packet.KeyFrame() {
t.keyFrameDts = packet.Dts()
}
//以最近的关键帧时间戳开始丢弃缓存超过duration长度的帧
//至少需要保障当前GOP完整
//head := t.cache.Head().(utils.AVPacket)
//for farthest := t.keyFrameDts - int64(t.duration); t.cache.Size() > 1 && t.cache.Head().(utils.AVPacket).Dts() < farthest; {
// t.cache.Pop()
//}
}
func NewTrack(stream utils.AVStream, cacheSeconds int) Track {
t := &track{stream: stream, duration: cacheSeconds * 1000}
if cacheSeconds > 0 {
if utils.AVMediaTypeVideo == stream.Type() {
t.cache = NewRingBuffer(cacheSeconds * 30 * 2)
} else if utils.AVMediaTypeAudio == stream.Type() {
t.cache = NewRingBuffer(cacheSeconds * 50 * 2)
}
}
return t
}
type StreamManager struct {
streams []utils.AVStream
completed bool
}
func (s *StreamManager) Add(stream utils.AVStream) {
for _, stream_ := range s.streams {
avformat.Assert(stream_.Type() != stream.Type())
avformat.Assert(stream_.CodecId() != stream.CodecId())
}
s.streams = append(s.streams, stream)
//按照AVCodecId升序排序
for i := 0; i < len(s.streams); i++ {
for j := 1; j < len(s.streams)-i; j++ {
tmp := s.streams[j-1]
if s.streams[j].CodecId() < tmp.CodecId() {
s.streams[j-1] = s.streams[j]
s.streams[j] = tmp
}
}
}
}
func (s *StreamManager) FindStream(id utils.AVCodecID) utils.AVStream {
for _, stream_ := range s.streams {
if stream_.CodecId() == id {
return stream_
}
}
return nil
}
func (s *StreamManager) FindStreamWithType(mediaType utils.AVMediaType) utils.AVStream {
for _, stream_ := range s.streams {
if stream_.Type() == mediaType {
return stream_
}
}
return nil
}
func (s *StreamManager) FindStreams(id utils.AVCodecID) []utils.AVStream {
var streams []utils.AVStream
for _, stream_ := range s.streams {
if stream_.CodecId() == id {
streams = append(streams, stream_)
}
}
return streams
}
func (s *StreamManager) FindStreamsWithType(mediaType utils.AVMediaType) []utils.AVStream {
var streams []utils.AVStream
for _, stream_ := range s.streams {
if stream_.Type() == mediaType {
streams = append(streams, stream_)
}
}
return streams
}
func (s *StreamManager) All() []utils.AVStream {
return s.streams
}

View File

@@ -1,4 +1,100 @@
package stream
type TransStream struct {
import (
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/utils"
)
// TransStreamId 每个传输流的唯一Id由协议+流Id组成
type TransStreamId uint64
// AVCodecID转为byte的对应关系
var narrowCodecIds map[int]byte
func init() {
narrowCodecIds = map[int]byte{
int(utils.AVCodecIdH263): 0x1,
int(utils.AVCodecIdH264): 0x2,
int(utils.AVCodecIdH265): 0x3,
int(utils.AVCodecIdAV1): 0x4,
int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
}
}
// GenerateTransStreamId 根据传入的推拉流协议和编码器ID生成StreamId
// 请确保ids根据值升序排序传参
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
len_ := len(ids)
avformat.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id)]
avformat.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}*/
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
len_ := len(ids)
avformat.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id.CodecId())]
avformat.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}
var TransStreamFactory func(protocol Protocol, streams []utils.AVStream) ITransStream
type ITransStream interface {
AddTrack(stream utils.AVStream)
WriteHeader()
AddSink(sink ISink)
RemoveSink(id SinkId)
AllSink() []ISink
}
type TransStreamImpl struct {
sinks map[SinkId]ISink
muxer avformat.Muxer
tracks []utils.AVStream
}
func (t *TransStreamImpl) AddTrack(stream utils.AVStream) {
t.tracks = append(t.tracks, stream)
}
func (t *TransStreamImpl) AddSink(sink ISink) {
t.sinks[sink.Id()] = sink
}
func (t *TransStreamImpl) RemoveSink(id SinkId) {
delete(t.sinks, id)
}
func (t *TransStreamImpl) AllSink() []ISink {
//TODO implement me
panic("implement me")
}