完善source封装

This commit is contained in:
yangjie
2023-11-29 22:24:18 +08:00
parent 6fd7498ce0
commit d3f37e63a6
5 changed files with 68 additions and 65 deletions

View File

@@ -9,7 +9,6 @@ import (
type Publisher struct {
stream.SourceImpl
deMuxer libflv.DeMuxer
audioMemoryPool stream.MemoryPool
videoMemoryPool stream.MemoryPool
@@ -18,10 +17,11 @@ type Publisher struct {
}
func NewPublisher(sourceId string) *Publisher {
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false}
publisher.deMuxer = libflv.NewDeMuxer()
deMuxer := libflv.NewDeMuxer()
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId, TransDeMuxer: nil}, audioUnmark: false, videoUnmark: false}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
publisher.deMuxer.SetHandler(publisher)
deMuxer.SetHandler(publisher)
publisher.SourceImpl.SetState(stream.SessionStateTransferring)
//创建内存池
publisher.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1))
@@ -92,7 +92,7 @@ func (p *Publisher) OnVideo(data []byte, ts uint32) {
p.videoUnmark = false
}
_ = p.deMuxer.InputVideo(data, ts)
//_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
}
func (p *Publisher) OnAudio(data []byte, ts uint32) {
@@ -101,7 +101,7 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) {
p.audioUnmark = false
}
_ = p.deMuxer.InputAudio(data, ts)
//_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
}
// OnPartPacket 从rtmp解析过来的部分音视频包

View File

@@ -1,7 +0,0 @@
package rtmp
import "github.com/yangjiechina/avformat"
type TransDeMuxer struct {
avformat.DeMuxerImpl
}

View File

@@ -2,6 +2,7 @@ package stream
import (
"fmt"
"github.com/yangjiechina/avformat"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/transcode"
"time"
@@ -13,6 +14,8 @@ type SourceType byte
// Protocol 输出协议
type Protocol uint32
type SourceEvent byte
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
@@ -25,6 +28,11 @@ const (
ProtocolRtc = Protocol(5)
ProtocolRtmpStr = "rtmp"
SourceEventPlay = SourceEvent(1)
SourceEventPlayDone = SourceEvent(1)
SourceEventInput = SourceEvent(1)
SourceEventClose = SourceEvent(1)
)
// SessionState 推拉流Session状态
@@ -46,9 +54,6 @@ type ISource interface {
// Input 输入推流数据
Input(data []byte)
// CreateTransDeMuxer 创建推流的解服用器
CreateTransDeMuxer() ITransDeMuxer
// CreateTranscoder 创建转码器
CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder
@@ -63,7 +68,11 @@ type ISource interface {
AddSink(sink ISink) bool
// RemoveSink 删除Sink/**
RemoveSink(tid TransStreamId, sinkId string) bool
RemoveSink(sink ISink) bool
AddEvent(event SourceEvent, data interface{})
SetState(state SessionState)
// Close 关闭Source
// 停止一切封装和转发流以及转码工作
@@ -71,16 +80,14 @@ type ISource interface {
Close()
}
type onSourceHandler interface {
onDeMuxStream(stream utils.AVStream)
}
type CreateSource func(id string, type_ SourceType, handler avformat.OnDeMuxerHandler)
type SourceImpl struct {
Id_ string
type_ SourceType
state SessionState
deMuxer ITransDeMuxer //负责从推流协议中解析出AVStream和AVPacket
TransDeMuxer avformat.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
recordSink ISink //每个Source唯一的一个录制流
audioTranscoders []transcode.ITranscoder //音频解码器
videoTranscoders []transcode.ITranscoder //视频解码器
@@ -93,6 +100,13 @@ type SourceImpl struct {
//所有的输出协议, 持有Sink
transStreams map[TransStreamId]ITransStream
//sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
inputEvent chan []byte
closeEvent chan byte
playingEventQueue chan ISink
playingDoneEventQueue chan ISink
}
func (s *SourceImpl) Id() string {
@@ -100,12 +114,31 @@ func (s *SourceImpl) Id() string {
}
func (s *SourceImpl) Input(data []byte) {
s.deMuxer.Input(data)
if SessionStateTransferring == s.state {
s.inputEvent <- data
} else {
s.TransDeMuxer.Input(data, nil)
}
}
func (s *SourceImpl) CreateTransDeMuxer() ITransDeMuxer {
//TODO implement me
panic("implement me")
func (s *SourceImpl) LoopEvent() {
for {
select {
case data := <-s.inputEvent:
s.TransDeMuxer.Input(data, nil)
break
case sink := <-s.playingEventQueue:
s.AddSink(sink)
break
case sink := <-s.playingDoneEventQueue:
s.AddSink(sink)
break
case _ = <-s.closeEvent:
s.Close()
return
}
}
}
func (s *SourceImpl) CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder {
@@ -228,10 +261,26 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
return false
}
func (s *SourceImpl) RemoveSink(tid TransStreamId, sinkId string) bool {
func (s *SourceImpl) RemoveSink(sink ISink) bool {
return true
}
func (s *SourceImpl) AddEvent(event SourceEvent, data interface{}) {
if SourceEventInput == event {
} else if SourceEventPlay == event {
} else if SourceEventPlayDone == event {
} else if SourceEventClose == event {
}
}
func (s *SourceImpl) SetState(state SessionState) {
s.state = state
}
func (s *SourceImpl) Close() {
}

View File

@@ -1,35 +0,0 @@
package stream
import "github.com/yangjiechina/avformat/utils"
// OnTransDeMuxerHandler 解复用器回调 /**
type OnTransDeMuxerHandler interface {
OnDeMuxStream(stream utils.AVStream)
OnDeMuxStreamDone()
OnDeMuxPacket(index int, packet utils.AVPacket)
OnDeMuxDone()
}
type ITransDeMuxer interface {
Input(data []byte)
SetHandler(handler OnTransDeMuxerHandler)
Close()
}
type TransDeMuxerImpl struct {
handler OnTransDeMuxerHandler
}
func (impl *TransDeMuxerImpl) Input(data []byte) {
panic("implement me")
}
func (impl *TransDeMuxerImpl) SetHandler(handler OnTransDeMuxerHandler) {
impl.handler = handler
}
func (impl *TransDeMuxerImpl) Close() {
panic("implement me")
}

View File

@@ -1,4 +0,0 @@
package stream
type ITransMuxer interface {
}