mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-28 03:52:07 +08:00
通过管道收发推拉流事件
This commit is contained in:
@@ -2,6 +2,7 @@ package rtmp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat/libflv"
|
"github.com/yangjiechina/avformat/libflv"
|
||||||
|
"github.com/yangjiechina/avformat/librtmp"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/stream"
|
"github.com/yangjiechina/live-server/stream"
|
||||||
)
|
)
|
||||||
@@ -9,29 +10,40 @@ import (
|
|||||||
type Publisher struct {
|
type Publisher struct {
|
||||||
stream.SourceImpl
|
stream.SourceImpl
|
||||||
|
|
||||||
|
stack *librtmp.Stack
|
||||||
audioMemoryPool stream.MemoryPool
|
audioMemoryPool stream.MemoryPool
|
||||||
videoMemoryPool stream.MemoryPool
|
videoMemoryPool stream.MemoryPool
|
||||||
|
|
||||||
audioUnmark bool
|
audioMark bool
|
||||||
videoUnmark bool
|
videoMark bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPublisher(sourceId string) *Publisher {
|
func NewPublisher(sourceId string, stack *librtmp.Stack) *Publisher {
|
||||||
deMuxer := libflv.NewDeMuxer()
|
deMuxer := libflv.NewDeMuxer()
|
||||||
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId, TransDeMuxer: nil}, audioUnmark: false, videoUnmark: false}
|
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId, Type_: stream.SourceTypeRtmp, TransDeMuxer: deMuxer}, stack: stack, audioMark: false, videoMark: false}
|
||||||
//设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
|
//设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
|
||||||
deMuxer.SetHandler(publisher)
|
deMuxer.SetHandler(publisher)
|
||||||
publisher.SourceImpl.SetState(stream.SessionStateTransferring)
|
publisher.Input_ = publisher.Input
|
||||||
|
|
||||||
|
return publisher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) Init() {
|
||||||
//创建内存池
|
//创建内存池
|
||||||
publisher.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1))
|
p.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1))
|
||||||
if stream.AppConfig.GOPCache > 0 {
|
if stream.AppConfig.GOPCache > 0 {
|
||||||
//以每秒钟4M码率大小创建内存池
|
//以每秒钟4M码率大小创建内存池
|
||||||
publisher.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8 * stream.AppConfig.GOPCache)
|
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8 * stream.AppConfig.GOPCache)
|
||||||
} else {
|
} else {
|
||||||
publisher.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8)
|
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8)
|
||||||
}
|
}
|
||||||
return publisher
|
|
||||||
|
p.SourceImpl.Init()
|
||||||
|
go p.SourceImpl.LoopEvent()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) Input(data []byte) {
|
||||||
|
p.stack.Input(nil, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnDiscardPacket(pkt interface{}) {
|
func (p *Publisher) OnDiscardPacket(pkt interface{}) {
|
||||||
@@ -60,15 +72,10 @@ func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
|
|||||||
if ret, buffer := p.SourceImpl.OnDeMuxStream(stream_); ret && buffer != nil {
|
if ret, buffer := p.SourceImpl.OnDeMuxStream(stream_); ret && buffer != nil {
|
||||||
buffer.SetDiscardHandler(p.OnDiscardPacket)
|
buffer.SetDiscardHandler(p.OnDiscardPacket)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnDeMuxStreamDone() {
|
func (p *Publisher) OnDeMuxPacket(packet utils.AVPacket) {
|
||||||
|
p.SourceImpl.OnDeMuxPacket(packet)
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Publisher) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
|
||||||
p.SourceImpl.OnDeMuxPacket(index, packet)
|
|
||||||
|
|
||||||
if stream.AppConfig.GOPCache > 0 {
|
if stream.AppConfig.GOPCache > 0 {
|
||||||
return
|
return
|
||||||
@@ -81,44 +88,40 @@ func (p *Publisher) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnDeMuxDone() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnVideo 从rtm chunk解析过来的视频包
|
// OnVideo 从rtm chunk解析过来的视频包
|
||||||
func (p *Publisher) OnVideo(data []byte, ts uint32) {
|
func (p *Publisher) OnVideo(data []byte, ts uint32) {
|
||||||
if data == nil {
|
if data == nil {
|
||||||
data = p.videoMemoryPool.Fetch()
|
data = p.videoMemoryPool.Fetch()
|
||||||
p.videoUnmark = false
|
p.videoMark = false
|
||||||
}
|
}
|
||||||
|
|
||||||
//_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
|
p.SourceImpl.TransDeMuxer.(*libflv.DeMuxer).InputVideo(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnAudio(data []byte, ts uint32) {
|
func (p *Publisher) OnAudio(data []byte, ts uint32) {
|
||||||
if data == nil {
|
if data == nil {
|
||||||
data = p.audioMemoryPool.Fetch()
|
data = p.audioMemoryPool.Fetch()
|
||||||
p.audioUnmark = false
|
p.audioMark = false
|
||||||
}
|
}
|
||||||
|
|
||||||
//_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
|
_ = p.SourceImpl.TransDeMuxer.(*libflv.DeMuxer).InputAudio(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnPartPacket 从rtmp解析过来的部分音视频包
|
// OnPartPacket 从rtmp解析过来的部分音视频包
|
||||||
func (p *Publisher) OnPartPacket(index int, data []byte, first bool) {
|
func (p *Publisher) OnPartPacket(index int, data []byte, first bool) {
|
||||||
//audio
|
//audio
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
if !p.audioUnmark {
|
if !p.audioMark {
|
||||||
p.audioMemoryPool.Mark()
|
p.audioMemoryPool.Mark()
|
||||||
p.audioUnmark = true
|
p.audioMark = true
|
||||||
}
|
}
|
||||||
|
|
||||||
p.audioMemoryPool.Write(data)
|
p.audioMemoryPool.Write(data)
|
||||||
//video
|
//video
|
||||||
} else if index == 1 {
|
} else if index == 1 {
|
||||||
if !p.videoUnmark {
|
if !p.videoMark {
|
||||||
p.videoMemoryPool.Mark()
|
p.videoMemoryPool.Mark()
|
||||||
p.videoUnmark = true
|
p.videoMark = true
|
||||||
}
|
}
|
||||||
|
|
||||||
p.videoMemoryPool.Write(data)
|
p.videoMemoryPool.Write(data)
|
||||||
|
@@ -53,4 +53,5 @@ func (s *serverImpl) OnPacket(conn net.Conn, data []byte) {
|
|||||||
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
|
func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
|
||||||
t := conn.(*transport.Conn)
|
t := conn.(*transport.Conn)
|
||||||
t.Data.(*sessionImpl).Close()
|
t.Data.(*sessionImpl).Close()
|
||||||
|
t.Data = nil
|
||||||
}
|
}
|
||||||
|
@@ -7,7 +7,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Session 负责除RTMP连接和断开以外的所有生命周期处理
|
// Session 负责除连接和断开以外的所有RTMP生命周期处理
|
||||||
type Session interface {
|
type Session interface {
|
||||||
Input(conn net.Conn, data []byte) error //接受网络数据包再交由Stack处理
|
Input(conn net.Conn, data []byte) error //接受网络数据包再交由Stack处理
|
||||||
|
|
||||||
@@ -31,17 +31,23 @@ type sessionImpl struct {
|
|||||||
stack *librtmp.Stack
|
stack *librtmp.Stack
|
||||||
//publisher/sink
|
//publisher/sink
|
||||||
handle interface{}
|
handle interface{}
|
||||||
conn net.Conn
|
|
||||||
|
isPublish bool
|
||||||
|
conn net.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookState) {
|
func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookState) {
|
||||||
s.SessionImpl.Stream = app + "/" + stream_
|
s.SessionImpl.Stream = app + "/" + stream_
|
||||||
publisher := NewPublisher(s.SessionImpl.Stream)
|
publisher := NewPublisher(s.SessionImpl.Stream, s.stack)
|
||||||
s.stack.SetOnPublishHandler(publisher)
|
s.stack.SetOnPublishHandler(publisher)
|
||||||
s.stack.SetOnTransDeMuxerHandler(publisher)
|
s.stack.SetOnTransDeMuxerHandler(publisher)
|
||||||
|
|
||||||
//stream.SessionImpl统一处理, Source是否已经存在, Hook回调....
|
//stream.SessionImpl统一处理, Source是否已经存在, Hook回调....
|
||||||
s.SessionImpl.OnPublish(publisher, nil, func() {
|
s.SessionImpl.OnPublish(publisher, nil, func() {
|
||||||
s.handle = publisher
|
s.handle = publisher
|
||||||
|
s.isPublish = true
|
||||||
|
publisher.Init()
|
||||||
|
|
||||||
response <- utils.HookStateOK
|
response <- utils.HookStateOK
|
||||||
}, func(state utils.HookState) {
|
}, func(state utils.HookState) {
|
||||||
response <- state
|
response <- state
|
||||||
@@ -51,7 +57,7 @@ func (s *sessionImpl) OnPublish(app, stream_ string, response chan utils.HookSta
|
|||||||
func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) {
|
func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState) {
|
||||||
s.SessionImpl.Stream = app + "/" + stream_
|
s.SessionImpl.Stream = app + "/" + stream_
|
||||||
|
|
||||||
sink := NewSink(stream.GenerateSinkId(s.conn), s.conn)
|
sink := NewSink(stream.GenerateSinkId(s.conn), s.SessionImpl.Stream, s.conn)
|
||||||
s.SessionImpl.OnPlay(sink, nil, func() {
|
s.SessionImpl.OnPlay(sink, nil, func() {
|
||||||
s.handle = sink
|
s.handle = sink
|
||||||
response <- utils.HookStateOK
|
response <- utils.HookStateOK
|
||||||
@@ -61,8 +67,27 @@ func (s *sessionImpl) OnPlay(app, stream_ string, response chan utils.HookState)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
|
func (s *sessionImpl) Input(conn net.Conn, data []byte) error {
|
||||||
return s.stack.Input(conn, data)
|
//如果是推流,并且握手成功,后续收到的包,都将发送给LoopEvent处理
|
||||||
|
if s.isPublish {
|
||||||
|
s.handle.(*Publisher).AddEvent(stream.SourceEventInput, data)
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return s.stack.Input(conn, data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) Close() {
|
func (s *sessionImpl) Close() {
|
||||||
|
if s.handle == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := s.handle.(*Publisher)
|
||||||
|
if ok {
|
||||||
|
if s.isPublish {
|
||||||
|
s.handle.(*Publisher).AddEvent(stream.SourceEventClose, nil)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sink := s.handle.(stream.ISink)
|
||||||
|
sink.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -6,6 +6,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewSink(id stream.SinkId, conn net.Conn) stream.ISink {
|
func NewSink(id stream.SinkId, sourceId string, conn net.Conn) stream.ISink {
|
||||||
return &stream.SinkImpl{Id_: id, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE}
|
return &stream.SinkImpl{Id_: id, SourceId_: sourceId, State_: stream.SessionStateCreate, Protocol_: stream.ProtocolRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE}
|
||||||
}
|
}
|
||||||
|
@@ -89,9 +89,9 @@ func (r *ringBuffer) All() ([]interface{}, []interface{}) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.head <= r.tail {
|
if r.tail <= r.head {
|
||||||
return r.data[r.head:], r.data[:r.tail]
|
return r.data[r.head:], r.data[:r.tail]
|
||||||
} else {
|
} else {
|
||||||
return r.data[r.head:], nil
|
return r.data[r.head:r.tail], nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
@@ -34,6 +35,7 @@ func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, suc
|
|||||||
//streamId 已经被占用
|
//streamId 已经被占用
|
||||||
source := SourceManager.Find(s.Stream)
|
source := SourceManager.Find(s.Stream)
|
||||||
if source != nil {
|
if source != nil {
|
||||||
|
fmt.Printf("推流已经占用 Source:%s", source_.Id())
|
||||||
failure(utils.HookStateOccupy)
|
failure(utils.HookStateOccupy)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -42,6 +44,7 @@ func (s *SessionImpl) OnPublish(source_ ISource, pra map[string]interface{}, suc
|
|||||||
if err := SourceManager.Add(source_); err == nil {
|
if err := SourceManager.Add(source_); err == nil {
|
||||||
success()
|
success()
|
||||||
} else {
|
} else {
|
||||||
|
fmt.Printf("添加失败 Source:%s", source_.Id())
|
||||||
failure(utils.HookStateOccupy)
|
failure(utils.HookStateOccupy)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -78,9 +81,11 @@ func (s *SessionImpl) OnPlay(sink ISink, pra map[string]interface{}, success fun
|
|||||||
f := func() {
|
f := func() {
|
||||||
source := SourceManager.Find(s.Stream)
|
source := SourceManager.Find(s.Stream)
|
||||||
if source == nil {
|
if source == nil {
|
||||||
|
fmt.Printf("添加到等待队列 sink:%s", sink.Id())
|
||||||
|
sink.SetState(SessionStateWait)
|
||||||
AddSinkToWaitingQueue(s.Stream, sink)
|
AddSinkToWaitingQueue(s.Stream, sink)
|
||||||
} else {
|
} else {
|
||||||
source.AddSink(sink)
|
source.AddEvent(SourceEventPlay, sink)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -3,6 +3,7 @@ package stream
|
|||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"net"
|
"net"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SinkId interface{}
|
type SinkId interface{}
|
||||||
@@ -14,11 +15,15 @@ type ISink interface {
|
|||||||
|
|
||||||
SourceId() string
|
SourceId() string
|
||||||
|
|
||||||
|
TransStreamId() TransStreamId
|
||||||
|
|
||||||
|
SetTransStreamId(id TransStreamId)
|
||||||
|
|
||||||
Protocol() Protocol
|
Protocol() Protocol
|
||||||
|
|
||||||
State() int
|
State() SessionState
|
||||||
|
|
||||||
SetState(state int)
|
SetState(state SessionState) bool
|
||||||
|
|
||||||
EnableVideo() bool
|
EnableVideo() bool
|
||||||
|
|
||||||
@@ -54,39 +59,17 @@ func GenerateSinkId(conn net.Conn) SinkId {
|
|||||||
return conn.RemoteAddr().String()
|
return conn.RemoteAddr().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
var waitingSinks map[string]map[SinkId]ISink
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
waitingSinks = make(map[string]map[SinkId]ISink, 1024)
|
|
||||||
}
|
|
||||||
|
|
||||||
func AddSinkToWaitingQueue(streamId string, sink ISink) {
|
|
||||||
waitingSinks[streamId][sink.Id()] = sink
|
|
||||||
}
|
|
||||||
|
|
||||||
func RemoveSinkFromWaitingQueue(streamId, sinkId SinkId) ISink {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func PopWaitingSinks(streamId string) []ISink {
|
|
||||||
source, ok := waitingSinks[streamId]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
sinks := make([]ISink, len(source))
|
|
||||||
var index = 0
|
|
||||||
for _, sink := range source {
|
|
||||||
sinks[index] = sink
|
|
||||||
}
|
|
||||||
return sinks
|
|
||||||
}
|
|
||||||
|
|
||||||
type SinkImpl struct {
|
type SinkImpl struct {
|
||||||
Id_ SinkId
|
Id_ SinkId
|
||||||
sourceId string
|
SourceId_ string
|
||||||
Protocol_ Protocol
|
Protocol_ Protocol
|
||||||
disableVideo bool
|
State_ SessionState
|
||||||
|
TransStreamId_ TransStreamId
|
||||||
|
disableVideo bool
|
||||||
|
//Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全
|
||||||
|
//如果Sink在等待队列-Sink断开,这个过程是非线程安全的
|
||||||
|
//SetState的时候,如果closed为true,返回false, 调用者自行删除sink
|
||||||
|
closed atomic.Bool
|
||||||
|
|
||||||
DesiredAudioCodecId_ utils.AVCodecID
|
DesiredAudioCodecId_ utils.AVCodecID
|
||||||
DesiredVideoCodecId_ utils.AVCodecID
|
DesiredVideoCodecId_ utils.AVCodecID
|
||||||
@@ -109,21 +92,42 @@ func (s *SinkImpl) Input(data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) SourceId() string {
|
func (s *SinkImpl) SourceId() string {
|
||||||
return s.sourceId
|
return s.SourceId_
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SinkImpl) TransStreamId() TransStreamId {
|
||||||
|
return s.TransStreamId_
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SinkImpl) SetTransStreamId(id TransStreamId) {
|
||||||
|
s.TransStreamId_ = id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Protocol() Protocol {
|
func (s *SinkImpl) Protocol() Protocol {
|
||||||
return s.Protocol_
|
return s.Protocol_
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) State() int {
|
func (s *SinkImpl) State() SessionState {
|
||||||
//TODO implement me
|
return s.State_
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) SetState(state int) {
|
func (s *SinkImpl) SetState(state SessionState) bool {
|
||||||
//TODO implement me
|
load := s.closed.Load()
|
||||||
panic("implement me")
|
if load {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.State_ < SessionStateClose {
|
||||||
|
s.State_ = state
|
||||||
|
}
|
||||||
|
|
||||||
|
//更改状态期间,被Close
|
||||||
|
//if s.closed.CompareAndSwap(false, false)
|
||||||
|
//{
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
|
||||||
|
return !s.closed.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) EnableVideo() bool {
|
func (s *SinkImpl) EnableVideo() bool {
|
||||||
@@ -143,5 +147,16 @@ func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SinkImpl) Close() {
|
func (s *SinkImpl) Close() {
|
||||||
|
//Source的TransStream中删除sink
|
||||||
|
if s.State_ == SessionStateTransferring {
|
||||||
|
source := SourceManager.Find(s.SourceId_)
|
||||||
|
source.AddEvent(SourceEventPlayDone, s)
|
||||||
|
s.State_ = SessionStateClose
|
||||||
|
} else if s.State_ == SessionStateWait {
|
||||||
|
//非线程安全
|
||||||
|
//从等待队列中删除sink
|
||||||
|
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
|
||||||
|
s.State_ = SessionStateClose
|
||||||
|
s.closed.Store(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
53
stream/sink_manager.go
Normal file
53
stream/sink_manager.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package stream
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
var waitingSinks map[string]map[SinkId]ISink
|
||||||
|
|
||||||
|
var mutex sync.Mutex
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
waitingSinks = make(map[string]map[SinkId]ISink, 1024)
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddSinkToWaitingQueue(streamId string, sink ISink) {
|
||||||
|
m, ok := waitingSinks[streamId]
|
||||||
|
if !ok {
|
||||||
|
mutex.Lock()
|
||||||
|
mutex.Unlock()
|
||||||
|
if m, ok = waitingSinks[streamId]; !ok {
|
||||||
|
m = make(map[SinkId]ISink, 64)
|
||||||
|
waitingSinks[streamId] = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m[sink.Id()] = sink
|
||||||
|
}
|
||||||
|
|
||||||
|
func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkId) (ISink, bool) {
|
||||||
|
m, ok := waitingSinks[sourceId]
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
sink, ok := m[sinkId]
|
||||||
|
if ok {
|
||||||
|
delete(m, sinkId)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sink, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func PopWaitingSinks(sourceId string) []ISink {
|
||||||
|
source, ok := waitingSinks[sourceId]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sinks := make([]ISink, len(source))
|
||||||
|
var index = 0
|
||||||
|
for _, sink := range source {
|
||||||
|
sinks[index] = sink
|
||||||
|
}
|
||||||
|
return sinks
|
||||||
|
}
|
127
stream/source.go
127
stream/source.go
@@ -2,9 +2,10 @@ package stream
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/yangjiechina/avformat"
|
"github.com/yangjiechina/avformat/stream"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
"github.com/yangjiechina/live-server/transcode"
|
"github.com/yangjiechina/live-server/transcode"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -30,9 +31,9 @@ const (
|
|||||||
ProtocolRtmpStr = "rtmp"
|
ProtocolRtmpStr = "rtmp"
|
||||||
|
|
||||||
SourceEventPlay = SourceEvent(1)
|
SourceEventPlay = SourceEvent(1)
|
||||||
SourceEventPlayDone = SourceEvent(1)
|
SourceEventPlayDone = SourceEvent(2)
|
||||||
SourceEventInput = SourceEvent(1)
|
SourceEventInput = SourceEvent(3)
|
||||||
SourceEventClose = SourceEvent(1)
|
SourceEventClose = SourceEvent(4)
|
||||||
)
|
)
|
||||||
|
|
||||||
// SessionState 推拉流Session状态
|
// SessionState 推拉流Session状态
|
||||||
@@ -44,7 +45,9 @@ const (
|
|||||||
SessionStateHandshaking = SessionState(2)
|
SessionStateHandshaking = SessionState(2)
|
||||||
SessionStateHandshakeFailure = SessionState(3)
|
SessionStateHandshakeFailure = SessionState(3)
|
||||||
SessionStateHandshakeDone = SessionState(4)
|
SessionStateHandshakeDone = SessionState(4)
|
||||||
SessionStateTransferring = SessionState(5)
|
SessionStateWait = SessionState(5)
|
||||||
|
SessionStateTransferring = SessionState(6)
|
||||||
|
SessionStateClose = SessionState(7)
|
||||||
)
|
)
|
||||||
|
|
||||||
type ISource interface {
|
type ISource interface {
|
||||||
@@ -54,9 +57,6 @@ type ISource interface {
|
|||||||
// Input 输入推流数据
|
// Input 输入推流数据
|
||||||
Input(data []byte)
|
Input(data []byte)
|
||||||
|
|
||||||
// CreateTranscoder 创建转码器
|
|
||||||
CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder
|
|
||||||
|
|
||||||
// OriginStreams 返回推流的原始Streams
|
// OriginStreams 返回推流的原始Streams
|
||||||
OriginStreams() []utils.AVStream
|
OriginStreams() []utils.AVStream
|
||||||
|
|
||||||
@@ -80,14 +80,16 @@ type ISource interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateSource func(id string, type_ SourceType, handler avformat.OnDeMuxerHandler)
|
type CreateSource func(id string, type_ SourceType, handler stream.OnDeMuxerHandler)
|
||||||
|
|
||||||
|
var TranscoderFactory func(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder
|
||||||
|
|
||||||
type SourceImpl struct {
|
type SourceImpl struct {
|
||||||
Id_ string
|
Id_ string
|
||||||
type_ SourceType
|
Type_ SourceType
|
||||||
state SessionState
|
state SessionState
|
||||||
|
|
||||||
TransDeMuxer avformat.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
|
TransDeMuxer stream.DeMuxer //负责从推流协议中解析出AVStream和AVPacket
|
||||||
recordSink ISink //每个Source唯一的一个录制流
|
recordSink ISink //每个Source唯一的一个录制流
|
||||||
audioTranscoders []transcode.ITranscoder //音频解码器
|
audioTranscoders []transcode.ITranscoder //音频解码器
|
||||||
videoTranscoders []transcode.ITranscoder //视频解码器
|
videoTranscoders []transcode.ITranscoder //视频解码器
|
||||||
@@ -95,7 +97,10 @@ type SourceImpl struct {
|
|||||||
allStreams StreamManager //推流Streams+转码器获得的Streams
|
allStreams StreamManager //推流Streams+转码器获得的Streams
|
||||||
buffers []StreamBuffer
|
buffers []StreamBuffer
|
||||||
|
|
||||||
|
Input_ func(data []byte) //解决无法多态传递给子类的问题
|
||||||
|
|
||||||
completed bool
|
completed bool
|
||||||
|
mutex sync.Mutex //只用作AddStream期间
|
||||||
probeTimer *time.Timer
|
probeTimer *time.Timer
|
||||||
|
|
||||||
//所有的输出协议, 持有Sink
|
//所有的输出协议, 持有Sink
|
||||||
@@ -104,6 +109,7 @@ type SourceImpl struct {
|
|||||||
//sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
|
//sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作
|
||||||
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
|
//golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件
|
||||||
inputEvent chan []byte
|
inputEvent chan []byte
|
||||||
|
responseEvent chan byte //解析完input的数据后,才能继续从网络io中读取流
|
||||||
closeEvent chan byte
|
closeEvent chan byte
|
||||||
playingEventQueue chan ISink
|
playingEventQueue chan ISink
|
||||||
playingDoneEventQueue chan ISink
|
playingDoneEventQueue chan ISink
|
||||||
@@ -113,37 +119,35 @@ func (s *SourceImpl) Id() string {
|
|||||||
return s.Id_
|
return s.Id_
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) Input(data []byte) {
|
func (s *SourceImpl) Init() {
|
||||||
if SessionStateTransferring == s.state {
|
//初始化事件接收缓冲区
|
||||||
s.inputEvent <- data
|
s.SetState(SessionStateTransferring)
|
||||||
} else {
|
//收流和网络断开的chan都阻塞执行
|
||||||
s.TransDeMuxer.Input(data, nil)
|
s.inputEvent = make(chan []byte)
|
||||||
}
|
s.responseEvent = make(chan byte)
|
||||||
|
s.closeEvent = make(chan byte)
|
||||||
|
s.playingEventQueue = make(chan ISink, 128)
|
||||||
|
s.playingDoneEventQueue = make(chan ISink, 128)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) LoopEvent() {
|
func (s *SourceImpl) LoopEvent() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case data := <-s.inputEvent:
|
case data := <-s.inputEvent:
|
||||||
s.TransDeMuxer.Input(data, nil)
|
s.Input_(data)
|
||||||
|
s.responseEvent <- 0
|
||||||
break
|
break
|
||||||
case sink := <-s.playingEventQueue:
|
case sink := <-s.playingEventQueue:
|
||||||
s.AddSink(sink)
|
s.AddSink(sink)
|
||||||
break
|
break
|
||||||
case sink := <-s.playingDoneEventQueue:
|
case sink := <-s.playingDoneEventQueue:
|
||||||
s.AddSink(sink)
|
s.RemoveSink(sink)
|
||||||
break
|
break
|
||||||
case _ = <-s.closeEvent:
|
case _ = <-s.closeEvent:
|
||||||
s.Close()
|
s.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SourceImpl) CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OriginStreams() []utils.AVStream {
|
func (s *SourceImpl) OriginStreams() []utils.AVStream {
|
||||||
@@ -235,8 +239,15 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
|
|||||||
_ = transStream.WriteHeader()
|
_ = transStream.WriteHeader()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sink.SetTransStreamId(transStreamId)
|
||||||
transStream.AddSink(sink)
|
transStream.AddSink(sink)
|
||||||
|
|
||||||
|
state := sink.SetState(SessionStateTransferring)
|
||||||
|
if !state {
|
||||||
|
transStream.RemoveSink(sink.Id())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if AppConfig.GOPCache > 0 && !ok {
|
if AppConfig.GOPCache > 0 && !ok {
|
||||||
//先交叉发送
|
//先交叉发送
|
||||||
for i := 0; i < bufferCount; i++ {
|
for i := 0; i < bufferCount; i++ {
|
||||||
@@ -262,18 +273,30 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) RemoveSink(sink ISink) bool {
|
func (s *SourceImpl) RemoveSink(sink ISink) bool {
|
||||||
return true
|
id := sink.TransStreamId()
|
||||||
|
if id > 0 {
|
||||||
|
transStream := s.transStreams[id]
|
||||||
|
//如果从传输流没能删除sink, 再从等待队列删除
|
||||||
|
_, b := transStream.RemoveSink(sink.Id())
|
||||||
|
if b {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, b := RemoveSinkFromWaitingQueue(sink.SourceId(), sink.Id())
|
||||||
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) AddEvent(event SourceEvent, data interface{}) {
|
func (s *SourceImpl) AddEvent(event SourceEvent, data interface{}) {
|
||||||
if SourceEventInput == event {
|
if SourceEventInput == event {
|
||||||
|
s.inputEvent <- data.([]byte)
|
||||||
|
<-s.responseEvent
|
||||||
} else if SourceEventPlay == event {
|
} else if SourceEventPlay == event {
|
||||||
|
s.playingEventQueue <- data.(ISink)
|
||||||
} else if SourceEventPlayDone == event {
|
} else if SourceEventPlayDone == event {
|
||||||
|
s.playingDoneEventQueue <- data.(ISink)
|
||||||
} else if SourceEventClose == event {
|
} else if SourceEventClose == event {
|
||||||
|
s.closeEvent <- 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -282,10 +305,28 @@ func (s *SourceImpl) SetState(state SessionState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) Close() {
|
func (s *SourceImpl) Close() {
|
||||||
|
//释放解复用器
|
||||||
|
//释放转码器
|
||||||
|
//释放每路转协议流, 将所有sink添加到等待队列
|
||||||
|
_, _ = SourceManager.Remove(s.Id_)
|
||||||
|
for _, transStream := range s.transStreams {
|
||||||
|
transStream.PopAllSinks(func(sink ISink) {
|
||||||
|
sink.SetTransStreamId(0)
|
||||||
|
state := sink.SetState(SessionStateWait)
|
||||||
|
if state {
|
||||||
|
AddSinkToWaitingQueue(s.Id_, sink)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s.transStreams = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
|
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
|
||||||
|
//整块都受保护 确保Add的Stream 都能WriteHeader
|
||||||
|
s.mutex.Lock()
|
||||||
|
defer s.mutex.Unlock()
|
||||||
|
|
||||||
if s.completed {
|
if s.completed {
|
||||||
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
|
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
|
||||||
return false, nil
|
return false, nil
|
||||||
@@ -293,15 +334,17 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
|
|||||||
|
|
||||||
s.originStreams.Add(stream)
|
s.originStreams.Add(stream)
|
||||||
s.allStreams.Add(stream)
|
s.allStreams.Add(stream)
|
||||||
//if len(s.originStreams.All()) == 1 {
|
|
||||||
// s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, s.writeHeader)
|
|
||||||
//}
|
|
||||||
|
|
||||||
//为每个Stream创建对于的Buffer
|
//启动探测超时计时器
|
||||||
|
if len(s.originStreams.All()) == 1 && AppConfig.ProbeTimeout > 100 {
|
||||||
|
s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, s.writeHeader)
|
||||||
|
}
|
||||||
|
|
||||||
|
//为每个Stream创建对应的Buffer
|
||||||
if AppConfig.GOPCache > 0 {
|
if AppConfig.GOPCache > 0 {
|
||||||
buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000))
|
buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000))
|
||||||
|
//OnDeMuxStream的调用顺序,就是AVStream和AVPacket的Index的递增顺序
|
||||||
s.buffers = append(s.buffers, buffer)
|
s.buffers = append(s.buffers, buffer)
|
||||||
|
|
||||||
return true, buffer
|
return true, buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,11 +353,18 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
|
|||||||
|
|
||||||
// 从DeMuxer解析完Stream后, 处理等待Sinks
|
// 从DeMuxer解析完Stream后, 处理等待Sinks
|
||||||
func (s *SourceImpl) writeHeader() {
|
func (s *SourceImpl) writeHeader() {
|
||||||
utils.Assert(!s.completed)
|
{
|
||||||
|
s.mutex.Lock()
|
||||||
|
defer s.mutex.Unlock()
|
||||||
|
if s.completed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.completed = true
|
||||||
|
}
|
||||||
|
|
||||||
if s.probeTimer != nil {
|
if s.probeTimer != nil {
|
||||||
s.probeTimer.Stop()
|
s.probeTimer.Stop()
|
||||||
}
|
}
|
||||||
s.completed = true
|
|
||||||
|
|
||||||
sinks := PopWaitingSinks(s.Id_)
|
sinks := PopWaitingSinks(s.Id_)
|
||||||
for _, sink := range sinks {
|
for _, sink := range sinks {
|
||||||
@@ -326,7 +376,7 @@ func (s *SourceImpl) OnDeMuxStreamDone() {
|
|||||||
s.writeHeader()
|
s.writeHeader()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
|
||||||
if AppConfig.GOPCache > 0 {
|
if AppConfig.GOPCache > 0 {
|
||||||
buffer := s.buffers[packet.Index()]
|
buffer := s.buffers[packet.Index()]
|
||||||
buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts())
|
buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts())
|
||||||
@@ -338,4 +388,5 @@ func (s *SourceImpl) OnDeMuxPacket(index int, packet utils.AVPacket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceImpl) OnDeMuxDone() {
|
func (s *SourceImpl) OnDeMuxDone() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/yangjiechina/avformat"
|
"github.com/yangjiechina/avformat/stream"
|
||||||
"github.com/yangjiechina/avformat/utils"
|
"github.com/yangjiechina/avformat/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -73,14 +73,16 @@ type ITransStream interface {
|
|||||||
|
|
||||||
AddSink(sink ISink)
|
AddSink(sink ISink)
|
||||||
|
|
||||||
RemoveSink(id SinkId)
|
RemoveSink(id SinkId) (ISink, bool)
|
||||||
|
|
||||||
|
PopAllSinks(handler func(sink ISink))
|
||||||
|
|
||||||
AllSink() []ISink
|
AllSink() []ISink
|
||||||
}
|
}
|
||||||
|
|
||||||
type TransStreamImpl struct {
|
type TransStreamImpl struct {
|
||||||
Sinks map[SinkId]ISink
|
Sinks map[SinkId]ISink
|
||||||
muxer avformat.Muxer
|
muxer stream.Muxer
|
||||||
Tracks []utils.AVStream
|
Tracks []utils.AVStream
|
||||||
transBuffer MemoryPool //每个TransStream也缓存封装后的流
|
transBuffer MemoryPool //每个TransStream也缓存封装后的流
|
||||||
Completed bool
|
Completed bool
|
||||||
@@ -98,8 +100,21 @@ func (t *TransStreamImpl) AddSink(sink ISink) {
|
|||||||
t.Sinks[sink.Id()] = sink
|
t.Sinks[sink.Id()] = sink
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) RemoveSink(id SinkId) {
|
func (t *TransStreamImpl) RemoveSink(id SinkId) (ISink, bool) {
|
||||||
delete(t.Sinks, id)
|
sink, ok := t.Sinks[id]
|
||||||
|
if ok {
|
||||||
|
delete(t.Sinks, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sink, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TransStreamImpl) PopAllSinks(handler func(sink ISink)) {
|
||||||
|
for _, sink := range t.Sinks {
|
||||||
|
handler(sink)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Sinks = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransStreamImpl) AllSink() []ISink {
|
func (t *TransStreamImpl) AllSink() []ISink {
|
||||||
|
Reference in New Issue
Block a user