接口不使用“I”开头, 实现类不使用"impl"结尾

This commit is contained in:
yangjiechina
2024-06-06 19:27:56 +08:00
parent 39c787fbee
commit 2ae2622945
37 changed files with 458 additions and 481 deletions

2
api.go
View File

@@ -253,7 +253,7 @@ func (api *ApiServer) generateSourceId(remoteAddr string) stream.SinkId {
return stream.GenerateSinkId(tcpAddr)
}
func (api *ApiServer) doPlay(sink stream.ISink) utils.HookState {
func (api *ApiServer) doPlay(sink stream.Sink) utils.HookState {
ok := utils.HookStateOK
stream.HookPlaying(sink, func() {

View File

@@ -5,6 +5,6 @@ import (
"net"
)
func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.ISink {
return &stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn}
func NewFLVSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolFlv, Conn: conn}
}

View File

@@ -28,7 +28,7 @@ func init() {
}
type httpTransStream struct {
stream.TransStreamImpl
stream.BaseTransStream
muxer libflv.Muxer
mwBuffer stream.MergeWritingBuffer
@@ -101,7 +101,7 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
}
func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
if err := t.TransStreamImpl.AddTrack(stream); err != nil {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}
@@ -124,7 +124,7 @@ func (t *httpTransStream) sendUnpackedSegment(segment []byte) {
}
// 为单个sink发送flv切片, 切片已经添加分隔符
func (t *httpTransStream) sendSegment(sink stream.ISink, data []byte) error {
func (t *httpTransStream) sendSegment(sink stream.Sink, data []byte) error {
return sink.Input(data[t.computeSkipCount(data):])
}
@@ -132,10 +132,10 @@ func (t *httpTransStream) computeSkipCount(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
func (t *httpTransStream) AddSink(sink stream.ISink) error {
func (t *httpTransStream) AddSink(sink stream.Sink) error {
utils.Assert(t.headerSize > 0)
t.TransStreamImpl.AddSink(sink)
t.BaseTransStream.AddSink(sink)
//发送sequence header
t.sendSegment(sink, t.header[:t.headerSize])
@@ -190,7 +190,7 @@ func (t *httpTransStream) WriteHeader() error {
t.headerSize += t.muxer.WriteHeader(t.header[HttpFlvBlockLengthSize:])
for _, track := range t.TransStreamImpl.Tracks {
for _, track := range t.BaseTransStream.Tracks {
var data []byte
if utils.AVMediaTypeAudio == track.Type() {
data = track.Extra()
@@ -223,7 +223,7 @@ func (t *httpTransStream) Close() error {
return nil
}
func NewHttpTransStream() stream.ITransStream {
func NewHttpTransStream() stream.TransStream {
return &httpTransStream{
muxer: libflv.NewMuxer(),
header: make([]byte, 1024),
@@ -231,6 +231,6 @@ func NewHttpTransStream() stream.ITransStream {
}
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewHttpTransStream(), nil
}

View File

@@ -14,10 +14,10 @@ type Filter interface {
ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, error)
}
type FilterImpl struct {
type BaseFilter struct {
}
func (r FilterImpl) ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, error) {
func (r BaseFilter) ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, error) {
packet := rtp.Packet{}
err := packet.Unmarshal(data)

View File

@@ -5,7 +5,7 @@ import (
)
type SingleFilter struct {
FilterImpl
BaseFilter
source GBSource
}

View File

@@ -5,7 +5,7 @@ import (
)
type SSRCFilter struct {
FilterImpl
BaseFilter
sources map[uint32]GBSource
}

View File

@@ -14,12 +14,12 @@ import (
"net"
)
type Transport int
type TransportType int
const (
TransportUDP = Transport(0)
TransportTCPPassive = Transport(1)
TransportTCPActive = Transport(2)
TransportTypeUDP = TransportType(0)
TransportTypeTCPPassive = TransportType(1)
TransportTypeTCPActive = TransportType(2)
PsProbeBufferSize = 1024 * 1024 * 2
JitterBufferSize = 1024 * 1024
@@ -32,19 +32,19 @@ var (
)
type GBSource interface {
stream.ISource
stream.Source
InputRtp(pkt *rtp.Packet) error
Transport() Transport
TransportType() TransportType
PrepareTransDeMuxer(id string, ssrc uint32)
}
// GBSourceImpl GB28181推流Source
// BaseGBSource GB28181推流Source
// 负责解析生成AVStream和AVPacket, 后续全权交给父类Source处理.
type GBSourceImpl struct {
stream.SourceImpl
type BaseGBSource struct {
stream.PublishSource
deMuxerCtx *libmpeg.PSDeMuxerContext
@@ -148,15 +148,15 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1
return source, port, err
}
func (source *GBSourceImpl) InputRtp(pkt *rtp.Packet) error {
func (source *BaseGBSource) InputRtp(pkt *rtp.Packet) error {
panic("implement me")
}
func (source *GBSourceImpl) Transport() Transport {
func (source *BaseGBSource) Transport() TransportType {
panic("implement me")
}
func (source *GBSourceImpl) PrepareTransDeMuxer(id string, ssrc uint32) {
func (source *BaseGBSource) PrepareTransDeMuxer(id string, ssrc uint32) {
source.Id_ = id
source.ssrc = ssrc
source.deMuxerCtx = libmpeg.NewPSDeMuxerContext(make([]byte, PsProbeBufferSize))
@@ -164,12 +164,12 @@ func (source *GBSourceImpl) PrepareTransDeMuxer(id string, ssrc uint32) {
}
// Input 输入PS流
func (source *GBSourceImpl) Input(data []byte) error {
func (source *BaseGBSource) Input(data []byte) error {
return source.deMuxerCtx.Input(data)
}
// OnPartPacket 部分es流回调
func (source *GBSourceImpl) OnPartPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, data []byte, first bool) {
func (source *BaseGBSource) OnPartPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, data []byte, first bool) {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
//第一个es包, 标记内存起始位置
@@ -181,7 +181,7 @@ func (source *GBSourceImpl) OnPartPacket(index int, mediaType utils.AVMediaType,
}
// OnLossPacket 非完整es包丢弃回调, 直接释放内存块
func (source *GBSourceImpl) OnLossPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID) {
func (source *BaseGBSource) OnLossPacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID) {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
buffer.Fetch()
@@ -189,7 +189,7 @@ func (source *GBSourceImpl) OnLossPacket(index int, mediaType utils.AVMediaType,
}
// OnCompletePacket 完整帧回调
func (source *GBSourceImpl) OnCompletePacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, dts int64, pts int64, key bool) error {
func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaType, codec utils.AVCodecID, dts int64, pts int64, key bool) error {
buffer := source.FindOrCreatePacketBuffer(index, mediaType)
data := buffer.Fetch()
@@ -302,11 +302,11 @@ func (source *GBSourceImpl) OnCompletePacket(index int, mediaType utils.AVMediaT
return nil
}
func (source *GBSourceImpl) Close() {
func (source *BaseGBSource) Close() {
if source.transport != nil {
source.transport.Close()
source.transport = nil
}
source.SourceImpl.Close()
source.PublishSource.Close()
}

View File

@@ -32,6 +32,6 @@ func (a ActiveSource) Connect(remoteAddr *net.TCPAddr) error {
return nil
}
func (a ActiveSource) Transport() Transport {
return TransportTCPActive
func (a ActiveSource) TransportType() TransportType {
return TransportTypeTCPActive
}

View File

@@ -6,18 +6,18 @@ import (
)
type PassiveSource struct {
GBSourceImpl
BaseGBSource
}
func NewPassiveSource() *PassiveSource {
return &PassiveSource{}
}
func (t PassiveSource) Transport() Transport {
return TransportTCPPassive
func (t PassiveSource) TransportType() TransportType {
return TransportTypeTCPPassive
}
func (t PassiveSource) InputRtp(pkt *rtp.Packet) error {
t.SourceImpl.AddEvent(stream.SourceEventInput, pkt.Payload)
t.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload)
return nil
}

View File

@@ -8,7 +8,7 @@ import (
)
type UDPSource struct {
GBSourceImpl
BaseGBSource
rtpDeMuxer *jitterbuffer.JitterBuffer
@@ -22,8 +22,8 @@ func NewUDPSource() *UDPSource {
}
}
func (u UDPSource) Transport() Transport {
return TransportUDP
func (u UDPSource) TransportType() TransportType {
return TransportTypeUDP
}
func (u UDPSource) InputRtp(pkt *rtp.Packet) error {
@@ -45,6 +45,6 @@ func (u UDPSource) InputRtp(pkt *rtp.Packet) error {
u.rtpBuffer.FreeHead()
u.SourceImpl.AddEvent(stream.SourceEventInput, pkt.Payload)
u.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload)
}
}

View File

@@ -5,11 +5,11 @@ import (
)
type tsSink struct {
stream.SinkImpl
stream.BaseSink
}
func NewTSSink(id stream.SinkId, sourceId string) stream.ISink {
return &tsSink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}}
func NewTSSink(id stream.SinkId, sourceId string) stream.Sink {
return &tsSink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}}
}
func (s *tsSink) Input(data []byte) error {
@@ -17,7 +17,7 @@ func (s *tsSink) Input(data []byte) error {
}
type m3u8Sink struct {
stream.SinkImpl
stream.BaseSink
cb func(m3u8 []byte)
}
@@ -26,6 +26,6 @@ func (s *m3u8Sink) Input(data []byte) error {
return nil
}
func NewM3U8Sink(id stream.SinkId, sourceId string, cb func(m3u8 []byte)) stream.ISink {
return &m3u8Sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, cb}
func NewM3U8Sink(id stream.SinkId, sourceId string, cb func(m3u8 []byte)) stream.Sink {
return &m3u8Sink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, cb}
}

View File

@@ -20,7 +20,7 @@ type tsContext struct {
}
type transStream struct {
stream.TransStreamImpl
stream.BaseTransStream
muxer libmpeg.TSMuxer
context *tsContext
@@ -33,7 +33,7 @@ type transStream struct {
playlistLength int //最大切片文件个数
m3u8File *os.File
m3u8Sinks map[stream.SinkId]stream.ISink
m3u8Sinks map[stream.SinkId]stream.Sink
}
// NewTransStream 创建HLS传输流
@@ -43,7 +43,7 @@ type transStream struct {
// @parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
// @segmentDuration 单个切片时长
// @playlistLength 缓存多少个切片
func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playlistLength int) (stream.ITransStream, error) {
func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playlistLength int) (stream.TransStream, error) {
//创建文件夹
if err := os.MkdirAll(dir, 0666); err != nil {
return nil, err
@@ -80,11 +80,11 @@ func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playli
stream_.m3u8 = NewM3U8Writer(playlistLength)
stream_.m3u8File = file
stream_.m3u8Sinks = make(map[stream.SinkId]stream.ISink, 24)
stream_.m3u8Sinks = make(map[stream.SinkId]stream.Sink, 24)
return stream_, nil
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
id := source.Id()
return NewTransStream("", stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id, "%d"), stream.AppConfig.Hls.Dir, stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength)
}
@@ -115,14 +115,14 @@ func (t *transStream) Input(packet utils.AVPacket) error {
pts := packet.ConvertPts(90000)
dts := packet.ConvertDts(90000)
if utils.AVMediaTypeVideo == packet.MediaType() {
return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
return t.muxer.Input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), pts, dts, packet.KeyFrame())
} else {
return t.muxer.Input(packet.Index(), packet.Data(), pts, dts, packet.KeyFrame())
}
}
func (t *transStream) AddTrack(stream utils.AVStream) error {
err := t.TransStreamImpl.AddTrack(stream)
err := t.BaseTransStream.AddTrack(stream)
if err != nil {
return err
}
@@ -142,7 +142,7 @@ func (t *transStream) WriteHeader() error {
return t.createSegment()
}
func (t *transStream) AddSink(sink stream.ISink) error {
func (t *transStream) AddSink(sink stream.Sink) error {
if sink_, ok := sink.(*m3u8Sink); ok {
if t.m3u8.Size() > 0 {
return sink.Input([]byte(t.m3u8.ToString()))
@@ -152,7 +152,7 @@ func (t *transStream) AddSink(sink stream.ISink) error {
}
}
return t.TransStreamImpl.AddSink(sink)
return t.BaseTransStream.AddSink(sink)
}
func (t *transStream) onTSWrite(data []byte) {
@@ -206,7 +206,7 @@ func (t *transStream) flushSegment() error {
for _, sink := range t.m3u8Sinks {
sink.Input([]byte(m3u8Txt))
}
t.m3u8Sinks = make(map[stream.SinkId]stream.ISink, 0)
t.m3u8Sinks = make(map[stream.SinkId]stream.Sink, 0)
}
return nil
}

View File

@@ -31,7 +31,7 @@ const (
)
type Session struct {
stream.SourceImpl
stream.PublishSource
phone string
decoder *transport.DelimiterFrameDecoder

View File

@@ -8,7 +8,7 @@ import (
)
type sink struct {
stream.SinkImpl
stream.BaseSink
offer string
answer string
@@ -20,8 +20,8 @@ type sink struct {
cb func(sdp string)
}
func NewSink(id stream.SinkId, sourceId string, offer string, cb func(sdp string)) stream.ISink {
return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
func NewSink(id stream.SinkId, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &sink{stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}
func (s *sink) setTrackCount(count int) {

View File

@@ -7,16 +7,16 @@ import (
)
type transStream struct {
stream.TransStreamImpl
stream.BaseTransStream
}
func NewTransStream() stream.ITransStream {
func NewTransStream() stream.TransStream {
t := &transStream{}
t.Init()
return t
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(), nil
}
@@ -32,18 +32,18 @@ func (t *transStream) Input(packet utils.AVPacket) error {
}
if packet.KeyFrame() {
extra := t.TransStreamImpl.Tracks[packet.Index()].CodecParameters().DecoderConfRecord().ToAnnexB()
extra := t.BaseTransStream.Tracks[packet.Index()].CodecParameters().DecoderConfRecord().ToAnnexB()
sink_.input(packet.Index(), extra, 0)
}
sink_.input(packet.Index(), packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]), uint32(packet.Duration(1000)))
sink_.input(packet.Index(), packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]), uint32(packet.Duration(1000)))
}
}
return nil
}
func (t *transStream) AddSink(sink_ stream.ISink) error {
func (t *transStream) AddSink(sink_ stream.Sink) error {
//创建PeerConnection
var videoTrack *webrtc.TrackLocalStaticSample
rtcSink := sink_.(*sink)
@@ -97,7 +97,7 @@ func (t *transStream) AddSink(sink_ stream.ISink) error {
rtcSink.peer = connection
rtcSink.SendHeader([]byte(connection.LocalDescription().SDP))
return t.TransStreamImpl.AddSink(sink_)
return t.BaseTransStream.AddSink(sink_)
}
func (t *transStream) WriteHeader() error {

View File

@@ -11,15 +11,15 @@ import (
// Publisher RTMP推流Source
type Publisher struct {
stream.SourceImpl
stream.PublishSource
stack *librtmp.Stack
}
func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) *Publisher {
deMuxer := libflv.NewDeMuxer(libflv.TSModeRelative)
publisher_ := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId, Type_: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
publisher_ := &Publisher{PublishSource: stream.PublishSource{Id_: sourceId, Type_: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.PublishSource
deMuxer.SetHandler(publisher_)
//为推流方分配足够多的缓冲区
conn.(*transport.Conn).ReallocateRecvBuffer(1024 * 1024)
@@ -33,7 +33,7 @@ func (p *Publisher) Input(data []byte) error {
func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
//AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存
p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail()
p.SourceImpl.OnDeMuxStream(stream)
p.PublishSource.OnDeMuxStream(stream)
}
// OnVideo 解析出来的完整视频包
@@ -41,12 +41,12 @@ func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
func (p *Publisher) OnVideo(index int, data []byte, ts uint32) {
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeVideo).Fetch()
//交给flv解复用器, 解析回调出AVPacket
p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
}
func (p *Publisher) OnAudio(index int, data []byte, ts uint32) {
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeAudio).Fetch()
p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
}
func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []byte, first bool) {

View File

@@ -8,65 +8,65 @@ import (
"github.com/yangjiechina/avformat/utils"
)
type IServer interface {
type Server interface {
Start(addr net.Addr) error
Close()
}
func NewServer() IServer {
return &serverImpl{}
func NewServer() Server {
return &server{}
}
type serverImpl struct {
type server struct {
tcp *transport.TCPServer
}
func (s *serverImpl) Start(addr net.Addr) error {
func (s *server) Start(addr net.Addr) error {
utils.Assert(s.tcp == nil)
server := &transport.TCPServer{}
server.SetHandler(s)
err := server.Bind(addr)
tcp := &transport.TCPServer{}
tcp.SetHandler(s)
err := tcp.Bind(addr)
if err != nil {
return err
}
s.tcp = server
s.tcp = tcp
return nil
}
func (s *serverImpl) Close() {
func (s *server) Close() {
panic("implement me")
}
func (s *serverImpl) OnConnected(conn net.Conn) {
func (s *server) OnConnected(conn net.Conn) {
log.Sugar.Debugf("rtmp连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data = NewSession(conn)
}
func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
func (s *server) OnPacket(conn net.Conn, data []byte) {
t := conn.(*transport.Conn)
err := t.Data.(*sessionImpl).Input(conn, data)
err := t.Data.(*Session).Input(conn, data)
if err != nil {
log.Sugar.Errorf("处理rtmp包失败 err:%s conn:%s", err.Error(), conn.RemoteAddr().String())
_ = conn.Close()
t.Data.(*sessionImpl).Close()
t.Data.(*Session).Close()
t.Data = nil
}
}
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
func (s *server) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
if t.Data != nil {
t.Data.(*sessionImpl).Close()
t.Data.(*Session).Close()
t.Data = nil
}
}

View File

@@ -8,7 +8,7 @@ import (
"testing"
)
func CreateTransStream(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) stream.ITransStream {
func CreateTransStream(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) stream.TransStream {
if stream.ProtocolRtmp == protocol {
return NewTransStream(librtmp.ChunkSize)
}
@@ -23,7 +23,7 @@ func init() {
func TestServer(t *testing.T) {
stream.AppConfig.GOPCache = true
stream.AppConfig.MergeWriteLatency = 350
impl := serverImpl{}
impl := server{}
addr := "0.0.0.0:1935"
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {

View File

@@ -9,22 +9,7 @@ import (
)
// Session 负责除连接和断开以外的所有RTMP生命周期处理
type Session interface {
Input(conn net.Conn, data []byte) error //接受网络数据包再交由Stack处理
Close()
}
func NewSession(conn net.Conn) Session {
impl := &sessionImpl{}
stack := librtmp.NewStack(impl)
impl.stack = stack
impl.conn = conn
return impl
}
type sessionImpl struct {
type Session struct {
//解析rtmp协议栈
stack *librtmp.Stack
//Publisher/sink, 在publish或play成功后赋值
@@ -34,7 +19,7 @@ type sessionImpl struct {
conn net.Conn
}
func (s *sessionImpl) generateSourceId(app, stream_ string) string {
func (s *Session) generateSourceId(app, stream_ string) string {
if len(app) == 0 {
return stream_
} else if len(stream_) == 0 {
@@ -44,7 +29,7 @@ func (s *sessionImpl) generateSourceId(app, stream_ string) string {
}
}
func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookState) {
func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState) {
log.Sugar.Infof("rtmp onpublish app:%s stream:%s conn:%s", app, stream_, s.conn.RemoteAddr().String())
sourceId := s.generateSourceId(app, stream_)
@@ -66,7 +51,7 @@ func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookSta
})
}
func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) {
func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) {
sourceId := s.generateSourceId(app, stream_)
//拉流事件Sink统一处理
sink := NewSink(stream.GenerateSinkId(s.conn.RemoteAddr()), sourceId, s.conn)
@@ -81,7 +66,7 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState)
})
}
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
func (s *Session) Input(conn net.Conn, data []byte) error {
//如果是推流并且握手成功后续收到的包都将发送给LoopEvent处理
if s.isPublisher {
s.handle.(*Publisher).AddEvent(stream.SourceEventInput, data)
@@ -91,7 +76,7 @@ func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
}
}
func (s *sessionImpl) Close() {
func (s *Session) Close() {
log.Sugar.Debugf("释放rtmp session conn:%s", s.conn.RemoteAddr().String())
//释放协议栈
@@ -110,7 +95,16 @@ func (s *sessionImpl) Close() {
s.handle.(*Publisher).AddEvent(stream.SourceEventClose, nil)
}
} else {
sink := s.handle.(stream.ISink)
sink := s.handle.(stream.Sink)
sink.Close()
}
}
func NewSession(conn net.Conn) *Session {
session := &Session{}
stack := librtmp.NewStack(session)
session.stack = stack
session.conn = conn
return session
}

View File

@@ -6,6 +6,6 @@ import (
"net"
)
func NewSink(id stream.SinkId, sourceId string, conn net.Conn) stream.ISink {
return &stream.SinkImpl{Id_: id, SourceId_: sourceId, State_: stream.SessionStateCreate, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE}
func NewSink(id stream.SinkId, sourceId string, conn net.Conn) stream.Sink {
return &stream.BaseSink{Id_: id, SourceId_: sourceId, State_: stream.SessionStateCreate, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE}
}

View File

@@ -8,7 +8,7 @@ import (
)
type TransStream struct {
stream.TransStreamImpl
stream.BaseTransStream
chunkSize int
@@ -22,7 +22,7 @@ type TransStream struct {
}
func (t *TransStream) Input(packet utils.AVPacket) error {
utils.Assert(t.TransStreamImpl.Completed)
utils.Assert(t.BaseTransStream.Completed)
var data []byte
var chunk *librtmp.Chunk
@@ -96,10 +96,10 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
return nil
}
func (t *TransStream) AddSink(sink stream.ISink) error {
func (t *TransStream) AddSink(sink stream.Sink) error {
utils.Assert(t.headerSize > 0)
t.TransStreamImpl.AddSink(sink)
t.BaseTransStream.AddSink(sink)
//发送sequence header
sink.Input(t.header[:t.headerSize])
@@ -115,7 +115,7 @@ func (t *TransStream) AddSink(sink stream.ISink) error {
func (t *TransStream) WriteHeader() error {
utils.Assert(t.Tracks != nil)
utils.Assert(!t.TransStreamImpl.Completed)
utils.Assert(!t.BaseTransStream.Completed)
t.Init()
@@ -139,7 +139,7 @@ func (t *TransStream) WriteHeader() error {
utils.Assert(audioStream != nil || videoStream != nil)
//初始化
t.TransStreamImpl.Completed = true
t.BaseTransStream.Completed = true
t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer()
if utils.AVCodecIdNONE != audioCodecId {
@@ -190,10 +190,10 @@ func (t *TransStream) Close() error {
return nil
}
func NewTransStream(chunkSize int) stream.ITransStream {
func NewTransStream(chunkSize int) stream.TransStream {
return &TransStream{chunkSize: chunkSize}
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
return NewTransStream(librtmp.ChunkSize), nil
}

View File

@@ -7,39 +7,39 @@ import (
"net"
)
type IServer interface {
type Server interface {
Start(addr net.Addr) error
Close()
}
func NewServer(password string) IServer {
return &serverImpl{
func NewServer(password string) Server {
return &server{
handler: newHandler(password),
}
}
type serverImpl struct {
type server struct {
tcp *transport.TCPServer
handler *handler
}
func (s *serverImpl) Start(addr net.Addr) error {
func (s *server) Start(addr net.Addr) error {
utils.Assert(s.tcp == nil)
//监听TCP端口
server := &transport.TCPServer{}
server.SetHandler(s)
err := server.Bind(addr)
tcp := &transport.TCPServer{}
tcp.SetHandler(s)
err := tcp.Bind(addr)
if err != nil {
return err
}
s.tcp = server
s.tcp = tcp
return nil
}
func (s *serverImpl) closeSession(conn net.Conn) {
func (s *server) closeSession(conn net.Conn) {
t := conn.(*transport.Conn)
if t.Data != nil {
t.Data.(*session).close()
@@ -47,18 +47,18 @@ func (s *serverImpl) closeSession(conn net.Conn) {
}
}
func (s *serverImpl) Close() {
func (s *server) Close() {
}
func (s *serverImpl) OnConnected(conn net.Conn) {
func (s *server) OnConnected(conn net.Conn) {
log.Sugar.Debugf("rtsp连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data = NewSession(conn)
}
func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
func (s *server) OnPacket(conn net.Conn, data []byte) {
t := conn.(*transport.Conn)
method, url, header, err := parseMessage(data)
@@ -75,7 +75,7 @@ func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
}
}
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
func (s *server) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("rtsp断开连接 conn:%s", conn.RemoteAddr().String())
s.closeSession(conn)

View File

@@ -20,7 +20,7 @@ var (
// 对于udp而言, 每个sink维护多个transport
// tcp直接单端口传输
type sink struct {
stream.SinkImpl
stream.BaseSink
senders []*librtp.RtpSender //一个rtsp源可能存在多个流, 每个流都需要拉取
sdpCb func(sdp string) //rtsp_stream生成sdp后使用该回调给rtsp_session, 响应describe
@@ -29,9 +29,9 @@ type sink struct {
playing bool //是否已经收到play请求
}
func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp string)) stream.ISink {
func NewSink(id stream.SinkId, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
return &sink{
stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtsp, Conn: conn},
stream.BaseSink{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtsp, Conn: conn},
nil,
cb,
false,
@@ -157,7 +157,7 @@ func (s *sink) SendHeader(data []byte) error {
}
func (s *sink) Close() {
s.SinkImpl.Close()
s.BaseSink.Close()
for _, sender := range s.senders {
if sender.Rtp != nil {

View File

@@ -21,7 +21,7 @@ const (
// rtsp传输流封装
// 低延迟是rtsp特性, 所以不考虑实现GOP缓存
type tranStream struct {
stream.TransStreamImpl
stream.BaseTransStream
addr net.IPAddr
addrType string
urlFormat string
@@ -30,7 +30,7 @@ type tranStream struct {
sdp string
}
func NewTransStream(addr net.IPAddr, urlFormat string) stream.ITransStream {
func NewTransStream(addr net.IPAddr, urlFormat string) stream.TransStream {
t := &tranStream{
addr: addr,
urlFormat: urlFormat,
@@ -46,7 +46,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string) stream.ITransStream {
return t
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
trackFormat := source.Id() + "?track=%d"
return NewTransStream(net.IPAddr{
IP: net.ParseIP(stream.AppConfig.PublicIP),
@@ -130,7 +130,7 @@ func (t *tranStream) Input(packet utils.AVPacket) error {
}
stream_.cache = true
parameters := t.TransStreamImpl.Tracks[packet.Index()].CodecParameters()
parameters := t.BaseTransStream.Tracks[packet.Index()].CodecParameters()
if utils.AVCodecIdH265 == packet.CodecId() {
bytes := parameters.DecoderConfRecord().(*libhevc.HEVCDecoderConfRecord).VPS
@@ -145,24 +145,24 @@ func (t *tranStream) Input(packet utils.AVPacket) error {
stream_.extraDataBuffer = stream_.tmpExtraDataBuffer
}
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.TransStreamImpl.Tracks[packet.Index()]))
data := libavc.RemoveStartCode(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
stream_.muxer.Input(data, uint32(packet.ConvertPts(stream_.rate)))
}
return nil
}
func (t *tranStream) AddSink(sink_ stream.ISink) error {
sink_.(*sink).setSenderCount(len(t.TransStreamImpl.Tracks))
func (t *tranStream) AddSink(sink_ stream.Sink) error {
sink_.(*sink).setSenderCount(len(t.BaseTransStream.Tracks))
if err := sink_.(*sink).SendHeader([]byte(t.sdp)); err != nil {
return err
}
return t.TransStreamImpl.AddSink(sink_)
return t.BaseTransStream.AddSink(sink_)
}
func (t *tranStream) AddTrack(stream utils.AVStream) error {
if err := t.TransStreamImpl.AddTrack(stream); err != nil {
if err := t.BaseTransStream.AddTrack(stream); err != nil {
return err
}

View File

@@ -100,10 +100,10 @@ func hookEvent(event HookEvent, body interface{}, success func(response *http.Re
return sendHookEvent(url, body, success, failure)
}
type hookSessionImpl struct {
type hookSession struct {
}
func (h *hookSessionImpl) send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
func (h *hookSession) send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
marshal, err := json.Marshal(body)
if err != nil {
return err
@@ -130,6 +130,6 @@ func (h *hookSessionImpl) send(url string, body interface{}, success func(respon
return sendHookEvent(url, body, success, failure)
}
func (h *hookSessionImpl) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
func (h *hookSession) Hook(event HookEvent, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error {
return hookEvent(event, body, success, failure)
}

View File

@@ -5,13 +5,13 @@ import (
)
type SourceHook interface {
Publish(source ISource, success func(), failure func(state utils.HookState))
Publish(source Source, success func(), failure func(state utils.HookState))
PublishDone(source ISource, success func(), failure func(state utils.HookState))
PublishDone(source Source, success func(), failure func(state utils.HookState))
}
type SinkHook interface {
Play(sink ISink, success func(), failure func(state utils.HookState))
Play(sink Sink, success func(), failure func(state utils.HookState))
PlayDone(source ISink, success func(), failure func(state utils.HookState))
PlayDone(source Sink, success func(), failure func(state utils.HookState))
}

View File

@@ -10,7 +10,7 @@ import (
type SinkId interface{}
type ISink interface {
type Sink interface {
Id() SinkId
Input(data []byte) error
@@ -75,8 +75,8 @@ func GenerateSinkId(addr net.Addr) SinkId {
return addr.String()
}
type SinkImpl struct {
hookSessionImpl
type BaseSink struct {
hookSession
Id_ SinkId
SourceId_ string
@@ -97,11 +97,11 @@ type SinkImpl struct {
Conn net.Conn
}
func (s *SinkImpl) Id() SinkId {
func (s *BaseSink) Id() SinkId {
return s.Id_
}
func (s *SinkImpl) Input(data []byte) error {
func (s *BaseSink) Input(data []byte) error {
if s.Conn != nil {
_, err := s.Conn.Write(data)
@@ -111,59 +111,59 @@ func (s *SinkImpl) Input(data []byte) error {
return nil
}
func (s *SinkImpl) SendHeader(data []byte) error {
func (s *BaseSink) SendHeader(data []byte) error {
return s.Input(data)
}
func (s *SinkImpl) SourceId() string {
func (s *BaseSink) SourceId() string {
return s.SourceId_
}
func (s *SinkImpl) TransStreamId() TransStreamId {
func (s *BaseSink) TransStreamId() TransStreamId {
return s.TransStreamId_
}
func (s *SinkImpl) SetTransStreamId(id TransStreamId) {
func (s *BaseSink) SetTransStreamId(id TransStreamId) {
s.TransStreamId_ = id
}
func (s *SinkImpl) Protocol() Protocol {
func (s *BaseSink) Protocol() Protocol {
return s.Protocol_
}
func (s *SinkImpl) Lock() {
func (s *BaseSink) Lock() {
s.lock.Lock()
}
func (s *SinkImpl) UnLock() {
func (s *BaseSink) UnLock() {
s.lock.Unlock()
}
func (s *SinkImpl) State() SessionState {
func (s *BaseSink) State() SessionState {
utils.Assert(!s.lock.TryLock())
return s.State_
}
func (s *SinkImpl) SetState(state SessionState) {
func (s *BaseSink) SetState(state SessionState) {
utils.Assert(!s.lock.TryLock())
s.State_ = state
}
func (s *SinkImpl) EnableVideo() bool {
func (s *BaseSink) EnableVideo() bool {
return !s.disableVideo
}
func (s *SinkImpl) SetEnableVideo(enable bool) {
func (s *BaseSink) SetEnableVideo(enable bool) {
s.disableVideo = !enable
}
func (s *SinkImpl) DesiredAudioCodecId() utils.AVCodecID {
func (s *BaseSink) DesiredAudioCodecId() utils.AVCodecID {
return s.DesiredAudioCodecId_
}
func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID {
func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
return s.DesiredVideoCodecId_
}
@@ -173,7 +173,7 @@ func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID {
// 什么时候调用Close? 是否考虑线程安全?
// 拉流断开连接,不需要考虑线程安全
// 踢流走source管道删除,并且关闭Conn
func (s *SinkImpl) Close() {
func (s *BaseSink) Close() {
utils.Assert(SessionStateClose != s.State_)
if s.Conn != nil {
@@ -206,6 +206,6 @@ func (s *SinkImpl) Close() {
}
}
func (s *SinkImpl) PrintInfo() string {
func (s *BaseSink) PrintInfo() string {
return fmt.Sprintf("%s-%v source:%s", s.Protocol().ToString(), s.Id_, s.SourceId_)
}

View File

@@ -6,7 +6,7 @@ import (
"net/http"
)
func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) {
func HookPlaying(s Sink, success func(), failure func(state utils.HookState)) {
f := func() {
source := SourceManager.Find(s.SourceId())
if source == nil {
@@ -63,7 +63,7 @@ func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) {
}
}
func HookPlayingDone(s ISink, success func(), failure func(state utils.HookState)) {
func HookPlayingDone(s Sink, success func(), failure func(state utils.HookState)) {
if !AppConfig.Hook.EnableOnPlayDone() {
if success != nil {
success()

View File

@@ -5,112 +5,18 @@ import (
"sync"
)
// 等待队列所有的Sink
var waitingSinks map[string]map[SinkId]ISink
var mutex sync.RWMutex
var SinkManager *sinkManager
func init() {
waitingSinks = make(map[string]map[SinkId]ISink, 1024)
}
func AddSinkToWaitingQueue(streamId string, sink ISink) {
mutex.Lock()
defer mutex.Unlock()
m, ok := waitingSinks[streamId]
if !ok {
if m, ok = waitingSinks[streamId]; !ok {
m = make(map[SinkId]ISink, 64)
waitingSinks[streamId] = m
}
}
m[sink.Id()] = sink
}
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkId) (ISink, bool) {
mutex.Lock()
defer mutex.Unlock()
m, ok := waitingSinks[sourceId]
if !ok {
return nil, false
}
sink, ok := m[sinkId]
if ok {
delete(m, sinkId)
}
return sink, ok
}
func PopWaitingSinks(sourceId string) []ISink {
mutex.Lock()
defer mutex.Unlock()
source, ok := waitingSinks[sourceId]
if !ok {
return nil
}
sinks := make([]ISink, len(source))
var index = 0
for _, sink := range source {
sinks[index] = sink
index++
}
delete(waitingSinks, sourceId)
return sinks
}
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool {
mutex.RLock()
defer mutex.RUnlock()
source, ok := waitingSinks[sourceId]
if !ok {
return false
}
_, ok = source[sinkId]
return ok
}
func ExistSink(sourceId string, sinkId SinkId) bool {
if sourceId != "" {
if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist {
return true
}
}
return SinkManager.Exist(sinkId)
SinkManager = &sinkManager{}
}
// ISinkManager 添加到TransStream的所有Sink
type ISinkManager interface {
Add(sink ISink) error
Find(id SinkId) ISink
Remove(id SinkId) (ISink, error)
Exist(id SinkId) bool
}
var SinkManager ISinkManager
func init() {
SinkManager = &sinkManagerImpl{}
}
type sinkManagerImpl struct {
type sinkManager struct {
m sync.Map
}
func (s *sinkManagerImpl) Add(sink ISink) error {
func (s *sinkManager) Add(sink Sink) error {
_, ok := s.m.LoadOrStore(sink.Id(), sink)
if ok {
return fmt.Errorf("the sink %s has been exist", sink.Id())
@@ -119,25 +25,25 @@ func (s *sinkManagerImpl) Add(sink ISink) error {
return nil
}
func (s *sinkManagerImpl) Find(id SinkId) ISink {
func (s *sinkManager) Find(id SinkId) Sink {
value, ok := s.m.Load(id)
if ok {
return value.(ISink)
return value.(Sink)
}
return nil
}
func (s *sinkManagerImpl) Remove(id SinkId) (ISink, error) {
func (s *sinkManager) Remove(id SinkId) (Sink, error) {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(ISink), nil
return value.(Sink), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)
}
func (s *sinkManagerImpl) Exist(id SinkId) bool {
func (s *sinkManager) Exist(id SinkId) bool {
_, ok := s.m.Load(id)
return ok
}

87
stream/sink_waitting.go Normal file
View File

@@ -0,0 +1,87 @@
package stream
import "sync"
// 等待队列所有的Sink
var waitingSinks map[string]map[SinkId]Sink
var mutex sync.RWMutex
func init() {
waitingSinks = make(map[string]map[SinkId]Sink, 1024)
}
func AddSinkToWaitingQueue(streamId string, sink Sink) {
mutex.Lock()
defer mutex.Unlock()
m, ok := waitingSinks[streamId]
if !ok {
if m, ok = waitingSinks[streamId]; !ok {
m = make(map[SinkId]Sink, 64)
waitingSinks[streamId] = m
}
}
m[sink.Id()] = sink
}
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkId) (Sink, bool) {
mutex.Lock()
defer mutex.Unlock()
m, ok := waitingSinks[sourceId]
if !ok {
return nil, false
}
sink, ok := m[sinkId]
if ok {
delete(m, sinkId)
}
return sink, ok
}
func PopWaitingSinks(sourceId string) []Sink {
mutex.Lock()
defer mutex.Unlock()
source, ok := waitingSinks[sourceId]
if !ok {
return nil
}
sinks := make([]Sink, len(source))
var index = 0
for _, sink := range source {
sinks[index] = sink
index++
}
delete(waitingSinks, sourceId)
return sinks
}
func ExistSinkInWaitingQueue(sourceId string, sinkId SinkId) bool {
mutex.RLock()
defer mutex.RUnlock()
source, ok := waitingSinks[sourceId]
if !ok {
return false
}
_, ok = source[sinkId]
return ok
}
func ExistSink(sourceId string, sinkId SinkId) bool {
if sourceId != "" {
if exist := ExistSinkInWaitingQueue(sourceId, sinkId); exist {
return true
}
}
return SinkManager.Exist(sinkId)
}

View File

@@ -52,8 +52,8 @@ const (
SessionStateClose = SessionState(7) //关闭状态
)
// ISource 父类Source负责, 除解析流以外的所有事情
type ISource interface {
// Source 父类Source负责, 除解析流以外的所有事情
type Source interface {
// Id Source的唯一ID/**
Id() string
@@ -72,10 +72,10 @@ type ISource interface {
// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader先将Sink添加到等待队列.
// 匹配拉流的编码器, 创建TransStream或向存在TransStream添加Sink
AddSink(sink ISink) bool
AddSink(sink Sink) bool
// RemoveSink 删除Sink/**
RemoveSink(sink ISink) bool
RemoveSink(sink Sink) bool
AddEvent(event SourceEvent, data interface{})
@@ -112,23 +112,23 @@ type ISource interface {
Init(input func(data []byte) error)
}
type SourceImpl struct {
hookSessionImpl
type PublishSource struct {
hookSession
Id_ string
Type_ SourceType
state SessionState
Conn net.Conn
TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
recordSink ISink //每个Source的录制流
hlsStream ITransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成
audioTranscoders []transcode.ITranscoder //音频解码器
videoTranscoders []transcode.ITranscoder //视频解码器
originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Stream
pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
recordSink Sink //每个Source的录制流
hlsStream TransStream //如果开开启HLS传输流, 不等拉流时, 创建直接生成
audioTranscoders []transcode.Transcoder //音频解码器
videoTranscoders []transcode.Transcoder //视频解码器
originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Stream
pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
existVideo bool //是否存在视频
completed bool
@@ -137,23 +137,23 @@ type SourceImpl struct {
Input_ func(data []byte) error //解决多态无法传递给子类的问题
//所有的输出协议, 持有Sink
transStreams map[TransStreamId]ITransStream
transStreams map[TransStreamId]TransStream
//sink的拉流和断开拉流事件都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
inputEvent chan []byte
responseEvent chan byte //解析完input的数据后才能继续从网络io中读取流
closeEvent chan byte
playingEventQueue chan ISink
playingDoneEventQueue chan ISink
playingEventQueue chan Sink
playingDoneEventQueue chan Sink
probeTimoutEvent chan bool
}
func (s *SourceImpl) Id() string {
func (s *PublishSource) Id() string {
return s.Id_
}
func (s *SourceImpl) Init(input func(data []byte) error) {
func (s *PublishSource) Init(input func(data []byte) error) {
s.Input_ = input
//初始化事件接收缓冲区
@@ -163,12 +163,12 @@ func (s *SourceImpl) Init(input func(data []byte) error) {
s.inputEvent = make(chan []byte)
s.responseEvent = make(chan byte)
s.closeEvent = make(chan byte)
s.playingEventQueue = make(chan ISink, 128)
s.playingDoneEventQueue = make(chan ISink, 128)
s.playingEventQueue = make(chan Sink, 128)
s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool)
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]ITransStream, 10)
s.transStreams = make(map[TransStreamId]TransStream, 10)
}
//创建录制流
@@ -189,7 +189,7 @@ func (s *SourceImpl) Init(input func(data []byte) error) {
}
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool {
func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool {
if index >= cap(s.pktBuffers) {
panic("流路数过多...")
}
@@ -211,7 +211,7 @@ func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMedia
return s.pktBuffers[index]
}
func (s *SourceImpl) LoopEvent() {
func (s *PublishSource) LoopEvent() {
for {
select {
case data := <-s.inputEvent:
@@ -244,15 +244,15 @@ func (s *SourceImpl) LoopEvent() {
}
}
func (s *SourceImpl) Input(data []byte) error {
func (s *PublishSource) Input(data []byte) error {
return nil
}
func (s *SourceImpl) OriginStreams() []utils.AVStream {
func (s *PublishSource) OriginStreams() []utils.AVStream {
return s.originStreams.All()
}
func (s *SourceImpl) TranscodeStreams() []utils.AVStream {
func (s *PublishSource) TranscodeStreams() []utils.AVStream {
return s.allStreams.All()
}
@@ -264,7 +264,7 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID)
return true
}
func (s *SourceImpl) AddSink(sink ISink) bool {
func (s *PublishSource) AddSink(sink Sink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
audioStream := s.originStreams.FindStreamWithType(utils.AVMediaTypeAudio)
@@ -315,7 +315,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
transStream, ok := s.transStreams[transStreamId]
if !ok {
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]ITransStream, 10)
s.transStreams = make(map[TransStreamId]TransStream, 10)
}
//创建一个新的传输流
log.Sugar.Debugf("创建%s-stream", sink.Protocol().ToString())
@@ -360,7 +360,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
return true
}
func (s *SourceImpl) RemoveSink(sink ISink) bool {
func (s *PublishSource) RemoveSink(sink Sink) bool {
id := sink.TransStreamId()
if id > 0 {
transStream := s.transStreams[id]
@@ -376,24 +376,24 @@ func (s *SourceImpl) RemoveSink(sink ISink) bool {
return b
}
func (s *SourceImpl) AddEvent(event SourceEvent, data interface{}) {
func (s *PublishSource) AddEvent(event SourceEvent, data interface{}) {
if SourceEventInput == event {
s.inputEvent <- data.([]byte)
<-s.responseEvent
} else if SourceEventPlay == event {
s.playingEventQueue <- data.(ISink)
s.playingEventQueue <- data.(Sink)
} else if SourceEventPlayDone == event {
s.playingDoneEventQueue <- data.(ISink)
s.playingDoneEventQueue <- data.(Sink)
} else if SourceEventClose == event {
s.closeEvent <- 0
}
}
func (s *SourceImpl) SetState(state SessionState) {
func (s *PublishSource) SetState(state SessionState) {
s.state = state
}
func (s *SourceImpl) Close() {
func (s *PublishSource) Close() {
//释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.Clear()
@@ -406,7 +406,7 @@ func (s *SourceImpl) Close() {
for _, transStream := range s.transStreams {
transStream.Close()
transStream.PopAllSink(func(sink ISink) {
transStream.PopAllSink(func(sink Sink) {
sink.SetTransStreamId(0)
{
sink.Lock()
@@ -424,11 +424,11 @@ func (s *SourceImpl) Close() {
s.transStreams = nil
}
func (s *SourceImpl) OnDiscardPacket(packet utils.AVPacket) {
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead()
}
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
if s.completed {
log.Sugar.Warnf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
return
@@ -461,7 +461,7 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
}
// 从DeMuxer解析完Stream后, 处理等待Sinks
func (s *SourceImpl) writeHeader() {
func (s *PublishSource) writeHeader() {
if s.completed {
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
return
@@ -489,15 +489,15 @@ func (s *SourceImpl) writeHeader() {
}
}
func (s *SourceImpl) IsCompleted() bool {
func (s *PublishSource) IsCompleted() bool {
return s.completed
}
func (s *SourceImpl) OnDeMuxStreamDone() {
func (s *PublishSource) OnDeMuxStreamDone() {
s.writeHeader()
}
func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
if AppConfig.GOPCache && s.existVideo {
s.gopBuffer.AddPacket(packet)
}
@@ -513,11 +513,11 @@ func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
}
}
func (s *SourceImpl) OnDeMuxDone() {
func (s *PublishSource) OnDeMuxDone() {
}
func (s *SourceImpl) Publish(source ISource, success func(), failure func(state utils.HookState)) {
func (s *PublishSource) Publish(source Source, success func(), failure func(state utils.HookState)) {
//streamId 已经被占用
if source_ := SourceManager.Find(source.Id()); source_ != nil {
fmt.Printf("推流已经占用 Source:%s", source.Id())
@@ -553,10 +553,10 @@ func (s *SourceImpl) Publish(source ISource, success func(), failure func(state
}
}
func (s *SourceImpl) PublishDone(source ISource, success func(), failure func(state utils.HookState)) {
func (s *PublishSource) PublishDone(source Source, success func(), failure func(state utils.HookState)) {
}
func (s *SourceImpl) Type() SourceType {
func (s *PublishSource) Type() SourceType {
return s.Type_
}

View File

@@ -5,25 +5,17 @@ import (
"sync"
)
type ISourceManager interface {
Add(source ISource) error
Find(id string) ISource
Remove(id string) (ISource, error)
}
var SourceManager ISourceManager
var SourceManager *sourceManger
func init() {
SourceManager = &sourceMangerImpl{}
SourceManager = &sourceManger{}
}
type sourceMangerImpl struct {
type sourceManger struct {
m sync.Map
}
func (s *sourceMangerImpl) Add(source ISource) error {
func (s *sourceManger) Add(source Source) error {
_, ok := s.m.LoadOrStore(source.Id(), source)
if ok {
return fmt.Errorf("the source %s has been exist", source.Id())
@@ -32,19 +24,19 @@ func (s *sourceMangerImpl) Add(source ISource) error {
return nil
}
func (s *sourceMangerImpl) Find(id string) ISource {
func (s *sourceManger) Find(id string) Source {
value, ok := s.m.Load(id)
if ok {
return value.(ISource)
return value.(Source)
}
return nil
}
func (s *sourceMangerImpl) Remove(id string) (ISource, error) {
func (s *sourceManger) Remove(id string) (Source, error) {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(ISource), nil
return value.(Source), nil
}
return nil, fmt.Errorf("source with id %s was not find", id)

View File

@@ -4,20 +4,6 @@ import (
"github.com/yangjiechina/avformat/utils"
)
type IStreamManager interface {
Add(stream utils.AVStream)
FindStream(id utils.AVCodecID) utils.AVStream
FindStreamWithType(mediaType utils.AVMediaType) utils.AVStream
FindStreams(id utils.AVCodecID) []utils.AVStream
FindStreamsWithType(mediaType utils.AVMediaType) []utils.AVStream
All() []utils.AVStream
}
type StreamManager struct {
streams []utils.AVStream
}

43
stream/trans_factory.go Normal file
View File

@@ -0,0 +1,43 @@
package stream
import (
"fmt"
"github.com/yangjiechina/avformat/utils"
)
type TransStreamFactory func(source Source, protocol Protocol, streams []utils.AVStream) (TransStream, error)
var (
transStreamFactories map[Protocol]TransStreamFactory
)
func init() {
transStreamFactories = make(map[Protocol]TransStreamFactory, 8)
}
func RegisterTransStreamFactory(protocol Protocol, streamFunc TransStreamFactory) {
_, ok := transStreamFactories[protocol]
if ok {
panic(fmt.Sprintf("%s has been registered", protocol.ToString()))
}
transStreamFactories[protocol] = streamFunc
}
func FindTransStreamFactory(protocol Protocol) (TransStreamFactory, error) {
f, ok := transStreamFactories[protocol]
if !ok {
return nil, fmt.Errorf("unknown protocol %s", protocol.ToString())
}
return f, nil
}
func CreateTransStream(source Source, protocol Protocol, streams []utils.AVStream) (TransStream, error) {
factory, err := FindTransStreamFactory(protocol)
if err != nil {
return nil, err
}
return factory(source, protocol, streams)
}

View File

@@ -1,106 +1,12 @@
package stream
import (
"fmt"
"github.com/yangjiechina/avformat/stream"
"github.com/yangjiechina/avformat/utils"
)
// TransStreamId 每个传输流的唯一Id由协议+流Id组成
type TransStreamId uint64
type TransStreamFactory func(source ISource, protocol Protocol, streams []utils.AVStream) (ITransStream, error)
var (
// AVCodecID转为byte的对应关系
narrowCodecIds map[int]byte
transStreamFactories map[Protocol]TransStreamFactory
)
func init() {
narrowCodecIds = map[int]byte{
int(utils.AVCodecIdH263): 0x1,
int(utils.AVCodecIdH264): 0x2,
int(utils.AVCodecIdH265): 0x3,
int(utils.AVCodecIdAV1): 0x4,
int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105,
}
transStreamFactories = make(map[Protocol]TransStreamFactory, 8)
}
func RegisterTransStreamFactory(protocol Protocol, streamFunc TransStreamFactory) {
_, ok := transStreamFactories[protocol]
if ok {
panic(fmt.Sprintf("%s has been registered", protocol.ToString()))
}
transStreamFactories[protocol] = streamFunc
}
func FindTransStreamFactory(protocol Protocol) (TransStreamFactory, error) {
f, ok := transStreamFactories[protocol]
if !ok {
return nil, fmt.Errorf("unknown protocol %s", protocol.ToString())
}
return f, nil
}
func CreateTransStream(source ISource, protocol Protocol, streams []utils.AVStream) (ITransStream, error) {
factory, err := FindTransStreamFactory(protocol)
if err != nil {
return nil, err
}
return factory(source, protocol, streams)
}
// GenerateTransStreamId 根据传入的推拉流协议和编码器ID生成StreamId
// 请确保ids根据值升序排序传参
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id)]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}*/
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id.CodecId())]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}
// ITransStream 讲AVPacket封装成传输流转发给各个Sink
type ITransStream interface {
// TransStream 将AVPacket封装成传输流转发给各个Sink
type TransStream interface {
Init()
Input(packet utils.AVPacket) error
@@ -109,39 +15,38 @@ type ITransStream interface {
WriteHeader() error
AddSink(sink ISink) error
AddSink(sink Sink) error
ExistSink(id SinkId) bool
RemoveSink(id SinkId) (ISink, bool)
RemoveSink(id SinkId) (Sink, bool)
PopAllSink(handler func(sink ISink))
PopAllSink(handler func(sink Sink))
AllSink() []ISink
AllSink() []Sink
Close() error
SendPacket(data []byte) error
}
type TransStreamImpl struct {
Sinks map[SinkId]ISink
muxer stream.Muxer
Tracks []utils.AVStream
transBuffer MemoryPool //每个TransStream也缓存封装后的流
Completed bool
ExistVideo bool
type BaseTransStream struct {
Sinks map[SinkId]Sink
muxer stream.Muxer
Tracks []utils.AVStream
Completed bool
ExistVideo bool
}
func (t *TransStreamImpl) Init() {
t.Sinks = make(map[SinkId]ISink, 64)
func (t *BaseTransStream) Init() {
t.Sinks = make(map[SinkId]Sink, 64)
}
func (t *TransStreamImpl) Input(packet utils.AVPacket) error {
func (t *BaseTransStream) Input(packet utils.AVPacket) error {
return nil
}
func (t *TransStreamImpl) AddTrack(stream utils.AVStream) error {
func (t *BaseTransStream) AddTrack(stream utils.AVStream) error {
t.Tracks = append(t.Tracks, stream)
if utils.AVMediaTypeVideo == stream.Type() {
t.ExistVideo = true
@@ -149,17 +54,17 @@ func (t *TransStreamImpl) AddTrack(stream utils.AVStream) error {
return nil
}
func (t *TransStreamImpl) AddSink(sink ISink) error {
func (t *BaseTransStream) AddSink(sink Sink) error {
t.Sinks[sink.Id()] = sink
return nil
}
func (t *TransStreamImpl) ExistSink(id SinkId) bool {
func (t *BaseTransStream) ExistSink(id SinkId) bool {
_, ok := t.Sinks[id]
return ok
}
func (t *TransStreamImpl) RemoveSink(id SinkId) (ISink, bool) {
func (t *BaseTransStream) RemoveSink(id SinkId) (Sink, bool) {
sink, ok := t.Sinks[id]
if ok {
delete(t.Sinks, id)
@@ -168,7 +73,7 @@ func (t *TransStreamImpl) RemoveSink(id SinkId) (ISink, bool) {
return sink, ok
}
func (t *TransStreamImpl) PopAllSink(handler func(sink ISink)) {
func (t *BaseTransStream) PopAllSink(handler func(sink Sink)) {
for _, sink := range t.Sinks {
handler(sink)
}
@@ -176,16 +81,16 @@ func (t *TransStreamImpl) PopAllSink(handler func(sink ISink)) {
t.Sinks = nil
}
func (t *TransStreamImpl) AllSink() []ISink {
func (t *BaseTransStream) AllSink() []Sink {
//TODO implement me
panic("implement me")
}
func (t *TransStreamImpl) Close() error {
func (t *BaseTransStream) Close() error {
return nil
}
func (t *TransStreamImpl) SendPacket(data []byte) error {
func (t *BaseTransStream) SendPacket(data []byte) error {
for _, sink := range t.Sinks {
sink.Input(data)
}

64
stream/trans_utils.go Normal file
View File

@@ -0,0 +1,64 @@
package stream
import "github.com/yangjiechina/avformat/utils"
// TransStreamId 每个传输流的唯一Id由协议+流Id组成
type TransStreamId uint64
var (
// AVCodecID转为byte的对应关系
narrowCodecIds map[int]byte
)
func init() {
narrowCodecIds = map[int]byte{
int(utils.AVCodecIdH263): 0x1,
int(utils.AVCodecIdH264): 0x2,
int(utils.AVCodecIdH265): 0x3,
int(utils.AVCodecIdAV1): 0x4,
int(utils.AVCodecIdVP8): 0x5,
int(utils.AVCodecIdVP9): 0x6,
int(utils.AVCodecIdAAC): 101,
int(utils.AVCodecIdMP3): 102,
int(utils.AVCodecIdOPUS): 103,
int(utils.AVCodecIdPCMALAW): 104,
int(utils.AVCodecIdPCMMULAW): 105,
}
}
// GenerateTransStreamId 根据传入的推拉流协议和编码器ID生成StreamId
// 请确保ids根据值升序排序传参
/*func GenerateTransStreamId(protocol Protocol, ids ...utils.AVCodecID) TransStreamId {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id)]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}*/
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8)
var streamId uint64
streamId = uint64(protocol) << 56
for i, id := range ids {
bId, ok := narrowCodecIds[int(id.CodecId())]
utils.Assert(ok)
streamId |= uint64(bId) << (48 - i*8)
}
return TransStreamId(streamId)
}

View File

@@ -1,4 +1,4 @@
package transcode
type ITranscoder interface {
type Transcoder interface {
}