测试hook拉流

This commit is contained in:
yangjiechina
2024-06-12 23:05:19 +08:00
parent e6d7001bdc
commit 17973b3e9e
19 changed files with 283 additions and 125 deletions

4
api.go
View File

@@ -105,6 +105,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
//返回监听的端口
response := &struct {
IP string `json:"ip"`
Port uint16 `json:"port,omitempty"`
}{}
@@ -124,7 +125,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
source := stream.SourceManager.Find(v.Source)
if source != nil {
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 已经存在"}
err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建GB28181 Source失败 %s 已经存在", v.Source)}
return
}
@@ -152,6 +153,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) {
return
}
response.IP = stream.AppConfig.PublicIP
response.Port = port
httpResponseOk(w, response)
}

View File

@@ -2,16 +2,22 @@ package gb28181
import (
"github.com/pion/rtp"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
type Filter interface {
AddSource(ssrc uint32, source GBSource) bool
RemoveSource(ssrc uint32)
Input(conn net.Conn, data []byte) GBSource
ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, error)
PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource)
}
type BaseFilter struct {
@@ -28,3 +34,23 @@ func (r BaseFilter) ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, err
return &packet, err
}
func (r BaseFilter) PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource) {
source.SetConn(conn)
source.SetSSRC(ssrc)
source.SetState(stream.SessionStateTransferring)
if stream.AppConfig.Hook.EnablePublishEvent() {
go func() {
_, state := stream.HookPublishEvent(source)
if utils.HookStateOK != state {
log.Sugar.Errorf("GB28181 推流失败")
if conn != nil {
conn.Close()
}
}
}()
}
}

View File

@@ -1,6 +1,7 @@
package gb28181
import (
"github.com/yangjiechina/lkm/stream"
"net"
)
@@ -16,9 +17,10 @@ func NewSingleFilter(source GBSource) *SingleFilter {
func (s *SingleFilter) AddSource(ssrc uint32, source GBSource) bool {
panic("implement me")
/* utils.Assert(s.source == nil)
s.source = source
return true*/
}
func (s *SingleFilter) RemoveSource(ssrc uint32) {
panic("implement me")
}
func (s *SingleFilter) Input(conn net.Conn, data []byte) GBSource {
@@ -31,6 +33,10 @@ func (s *SingleFilter) Input(conn net.Conn, data []byte) GBSource {
return nil
}
if stream.SessionStateHandshakeDone == s.source.State() {
s.PreparePublishSource(conn, packet.SSRC, s.source)
}
s.source.InputRtp(packet)
return s.source
}

View File

@@ -1,13 +1,16 @@
package gb28181
import (
"github.com/yangjiechina/lkm/stream"
"net"
"sync"
)
type SSRCFilter struct {
BaseFilter
sources map[uint32]GBSource
mute sync.RWMutex
}
func NewSharedFilter(guestCount int) *SSRCFilter {
@@ -15,13 +18,21 @@ func NewSharedFilter(guestCount int) *SSRCFilter {
}
func (r SSRCFilter) AddSource(ssrc uint32, source GBSource) bool {
_, ok := r.sources[ssrc]
if ok {
r.mute.Lock()
defer r.mute.Lock()
if _, ok := r.sources[ssrc]; !ok {
r.sources[ssrc] = source
return true
}
return false
}
r.sources[ssrc] = source
return true
func (r SSRCFilter) RemoveSource(ssrc uint32) {
r.mute.Lock()
defer r.mute.Lock()
delete(r.sources, ssrc)
}
func (r SSRCFilter) Input(conn net.Conn, data []byte) GBSource {
@@ -30,11 +41,22 @@ func (r SSRCFilter) Input(conn net.Conn, data []byte) GBSource {
return nil
}
source, ok := r.sources[packet.SSRC]
var source GBSource
var ok bool
{
r.mute.RLock()
source, ok = r.sources[packet.SSRC]
r.mute.RUnlock()
}
if !ok {
return nil
}
if stream.SessionStateHandshakeDone == source.State() {
r.PreparePublishSource(conn, packet.SSRC, source)
}
source.InputRtp(packet)
return source
}

View File

@@ -137,10 +137,10 @@ func createSource(source, transport, setup string, ssrc uint32) int {
func TestUDPRecv(t *testing.T) {
path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp"
ssrc := 0xBEBC201
ip := "192.168.31.112"
ip := "192.168.2.148"
localAddr := "0.0.0.0:20001"
network := "tcp"
setup := "active"
setup := "passive"
id := "hls_mystream"
port := createSource(id, network, setup, uint32(ssrc))
@@ -190,7 +190,7 @@ func TestUDPRecv(t *testing.T) {
panic(err)
}
connectSource(id, "192.168.31.112:20001")
connectSource(id, "192.168.2.148:20001")
//
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
"net/http"
)
type TransportType int
@@ -32,6 +31,8 @@ var (
SharedTCPServer *TCPServer
)
// GBSource GB28181推流Source, 接收PS流解析生成AVStream和AVPacket, 后续全权交给父类Source处理.
// udp/passive/active 都继承本接口, filter负责解析rtp包, 根据ssrc匹配对应的Source.
type GBSource interface {
stream.Source
@@ -40,20 +41,20 @@ type GBSource interface {
TransportType() TransportType
PrepareTransDeMuxer(id string, ssrc uint32)
SetConn(conn net.Conn)
SetSSRC(ssrc uint32)
}
// BaseGBSource GB28181推流Source
// 负责解析生成AVStream和AVPacket, 后续全权交给父类Source处理.
type BaseGBSource struct {
stream.PublishSource
deMuxerCtx *libmpeg.PSDeMuxerContext
audioStream utils.AVStream
videoStream utils.AVStream
ssrc uint32
transport transport.ITransport
}
@@ -94,7 +95,7 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1
}
if !success {
return nil, 0, fmt.Errorf("source existing")
return nil, 0, fmt.Errorf("ssrc conflict")
}
port = stream.AppConfig.GB28181.Port[0]
@@ -139,11 +140,12 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1
source.PrepareTransDeMuxer(id, ssrc)
_, state := stream.PreparePublishSource(source, false)
if http.StatusOK != state {
if utils.HookStateOK != state {
return nil, 0, fmt.Errorf("error code %d", state)
}
source.Init(source.Input)
source.SetType(stream.SourceType28181)
source.Init(source.Input, source.Close)
go source.LoopEvent()
return source, port, err
}
@@ -303,10 +305,32 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT
}
func (source *BaseGBSource) Close() {
log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.PrintInfo())
//释放收流端口
if source.transport != nil {
source.transport.Close()
source.transport = nil
}
//删除ssrc关联
if !stream.AppConfig.GB28181.IsMultiPort() {
if SharedTCPServer != nil {
SharedTCPServer.filter.RemoveSource(source.ssrc)
}
if SharedUDPServer != nil {
SharedUDPServer.filter.RemoveSource(source.ssrc)
}
}
source.PublishSource.Close()
}
func (source *BaseGBSource) SetConn(conn net.Conn) {
source.Conn = conn
}
func (source *BaseGBSource) SetSSRC(ssrc uint32) {
source.ssrc = ssrc
}

View File

@@ -2,7 +2,6 @@ package gb28181
import (
"github.com/pion/rtp"
"github.com/yangjiechina/lkm/stream"
)
type PassiveSource struct {
@@ -18,6 +17,6 @@ func (t PassiveSource) TransportType() TransportType {
}
func (t PassiveSource) InputRtp(pkt *rtp.Packet) error {
t.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload)
t.PublishSource.Input(pkt.Payload)
return nil
}

View File

@@ -45,6 +45,6 @@ func (u UDPSource) InputRtp(pkt *rtp.Packet) error {
u.rtpBuffer.FreeHead()
u.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload)
u.PublishSource.Input(pkt.Payload)
}
}

View File

@@ -32,7 +32,7 @@ func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) {
}
func (T *TCPServer) OnConnected(conn net.Conn) {
log.Sugar.Infof("客户端链接 conn:%s", conn.RemoteAddr().String())
log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String())
}
func (T *TCPServer) OnPacket(conn net.Conn, data []byte) {
@@ -53,7 +53,7 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) {
}
func (T *TCPServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Infof("客户端断开接 conn:%s", conn.RemoteAddr().String())
log.Sugar.Infof("GB28181断开接 conn:%s", conn.RemoteAddr().String())
con := conn.(*transport.Conn)
if con.Data != nil {

View File

@@ -4,7 +4,6 @@ import (
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
@@ -26,18 +25,20 @@ func (s jtServer) OnConnected(conn net.Conn) {
log.Sugar.Debugf("jtserver连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
t.Data = NewSession()
t.Data = NewSession(conn)
}
func (s jtServer) OnPacket(conn net.Conn, data []byte) {
conn.(*transport.Conn).Data.(*Session).AddEvent(stream.SourceEventInput, data)
conn.(*transport.Conn).Data.(*Session).Input(data)
}
func (s jtServer) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("jtserver断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn)
utils.Assert(t.Data != nil)
t.Data.(*Session).Close()
t.Data = nil
}
func (s jtServer) Start(addr net.Addr) error {

View File

@@ -7,6 +7,7 @@ import (
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/lkm/log"
"github.com/yangjiechina/lkm/stream"
"net"
)
const (
@@ -211,12 +212,16 @@ func (s *Session) OnJtPTPPacket(data []byte) {
s.rtpPacket = &RtpPacket{}
*s.rtpPacket = packet
go func() {
_, state := stream.PreparePublishSource(s, true)
if utils.HookStateOK != state {
log.Sugar.Errorf("1078推流失败 source:%s", s.phone)
}
s.Close()
if s.Conn != nil {
s.Conn.Close()
}
}
}()
}
//完整包/最后一个分包, 创建AVPacket
@@ -271,15 +276,30 @@ func (s *Session) Input(data []byte) error {
}
func (s *Session) Close() {
log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo())
if s.audioBuffer != nil {
s.audioBuffer.Clear()
}
func NewSession() *Session {
session := Session{}
if s.videoBuffer != nil {
s.videoBuffer.Clear()
}
s.PublishSource.Close()
}
func NewSession(conn net.Conn) *Session {
session := Session{
PublishSource: stream.PublishSource{
Conn: conn,
Type_: stream.SourceType1078,
},
}
delimiter := [4]byte{0x30, 0x31, 0x63, 0x64}
session.decoder = transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:], session.OnJtPTPPacket)
session.Init(session.Input)
session.Init(session.Input, session.Close)
go session.LoopEvent()
return &session
}

16
main.go
View File

@@ -27,7 +27,7 @@ func NewDefaultAppConfig() stream.AppConfig_ {
MergeWriteLatency: 350,
PublicIP: "192.168.2.148",
IdleTimeout: int64(60 * time.Second),
ReceiveTimeout: int64(60 * time.Second),
ReceiveTimeout: int64(10 * time.Second),
Hls: stream.HlsConfig{
Enable: false,
@@ -81,13 +81,13 @@ func NewDefaultAppConfig() stream.AppConfig_ {
Hook: stream.HookConfig{
Enable: true,
Timeout: int64(60 * time.Second),
OnPublishUrl: "http://localhost:8082/api/v1/on_publish",
OnPublishDoneUrl: "http://localhost:8082/api/v1/on_publish_done",
OnPlayUrl: "http://localhost:8082/api/v1/on_play",
OnPlayDoneUrl: "http://localhost:8082/api/on_play_done",
OnRecordUrl: "http://localhost:8082/api/v1/on_reocrd",
OnIdleTimeoutUrl: "http://localhost:8082/api/v1/on_idle_timeout",
OnReceiveTimeoutUrl: "http://localhost:8082/api/v1/on_recv_timeout",
OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish",
OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done",
OnPlayUrl: "http://localhost:9000/api/v1/hook/on_play",
OnPlayDoneUrl: "http://localhost:9000/api/v1/hook/on_play_done",
OnRecordUrl: "http://localhost:9000/api/v1/hook/on_reocrd",
OnIdleTimeoutUrl: "http://localhost:9000/api/v1/hook/on_idle_timeout",
OnReceiveTimeoutUrl: "http://localhost:9000/api/v1/hook/on_receive_timeout",
},
}
}

View File

@@ -45,7 +45,7 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState)
s.handle = source
s.isPublisher = true
source.Init(source.Input)
source.Init(source.Input, source.Close)
go source.LoopEvent()
}
@@ -72,7 +72,7 @@ func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) {
func (s *Session) Input(conn net.Conn, data []byte) error {
//如果是推流并且握手成功后续收到的包都将发送给LoopEvent处理
if s.isPublisher {
s.handle.(*Publisher).AddEvent(stream.SourceEventInput, data)
s.handle.(*Publisher).PublishSource.Input(data)
return nil
} else {
return s.stack.Input(conn, data)
@@ -80,8 +80,6 @@ func (s *Session) Input(conn net.Conn, data []byte) error {
}
func (s *Session) Close() {
log.Sugar.Debugf("释放rtmp session conn:%s", s.conn.RemoteAddr().String())
//释放协议栈
if s.stack != nil {
s.stack.Close()
@@ -92,13 +90,16 @@ func (s *Session) Close() {
return
}
_, ok := s.handle.(*Publisher)
publisher, ok := s.handle.(*Publisher)
if ok {
log.Sugar.Infof("rtmp推流结束 %s", publisher.PrintInfo())
if s.isPublisher {
s.handle.(*Publisher).AddEvent(stream.SourceEventClose, nil)
s.handle.(*Publisher).Close()
}
} else {
sink := s.handle.(stream.Sink)
log.Sugar.Infof("rtmp拉流结束 %s", sink.PrintInfo())
sink.Close()
}
}

View File

@@ -100,8 +100,8 @@ type HookConfig struct {
OnPlayUrl string `json:"on_play"` //拉流回调
OnPlayDoneUrl string `json:"on_play_done"` //拉流结束回调
OnRecordUrl string `json:"on_record"` //录制流回调
OnIdleTimeoutUrl string `json:"on_idle_timeout"` //多久没有sink拉流回调
OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //多久没有推流回调
OnIdleTimeoutUrl string `json:"on_idle_timeout"` //没有sink拉流回调
OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调
}
func (hook *HookConfig) EnablePublishEvent() bool {
@@ -144,8 +144,8 @@ type AppConfig_ struct {
GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"`
PublicIP string `json:"public_ip"`
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有sink拉流, 单位秒
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒
IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.

View File

@@ -4,22 +4,23 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/yangjiechina/lkm/log"
"net/http"
"time"
)
// 每个通知的时间都需要携带的字段
// 每个通知事件都需要携带的字段
type eventInfo struct {
stream string //stream id
protocol string //推拉流协议
remoteAddr string //peer地址
Stream string `json:"stream"` //stream id
Protocol string `json:"protocol"` //推拉流协议
RemoteAddr string `json:"remote_addr"` //peer地址
}
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{stream: sink.SourceId(), protocol: sink.Protocol().ToString(), remoteAddr: sink.PrintInfo()}
return eventInfo{Stream: sink.SourceId(), Protocol: sink.Protocol().ToString(), RemoteAddr: sink.PrintInfo()}
}
func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{stream: source.Id(), protocol: source.Type().ToString(), remoteAddr: source.RemoteAddr()}
return eventInfo{Stream: source.Id(), Protocol: source.Type().ToString(), RemoteAddr: source.RemoteAddr()}
}
func sendHookEvent(url string, body interface{}) (*http.Response, error) {
@@ -36,6 +37,8 @@ func sendHookEvent(url string, body interface{}) (*http.Response, error) {
return nil, err
}
log.Sugar.Infof("发送hook通知 url:%s body:%s", url, marshal)
request.Header.Set("Content-Type", "application/json")
return client.Do(request)
}
@@ -47,8 +50,8 @@ func Hook(event HookEvent, body interface{}) (*http.Response, error) {
}
response, err := sendHookEvent(url, body)
if err != nil && http.StatusOK != response.StatusCode {
return response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status)
if err == nil && http.StatusOK != response.StatusCode {
return response, fmt.Errorf("reason %s", response.Status)
}
return response, err

View File

@@ -47,7 +47,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
var response *http.Response
if AppConfig.Hook.EnableOnPlayDone() {
hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink))
hook, err := Hook(HookEventPlayDone, NewHookPlayEventInfo(sink))
if err != nil {
log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId())
return hook, false

View File

@@ -59,20 +59,34 @@ func HookPublishDoneEvent(source Source) {
}
}
func HookReceiveTimeoutEvent(source Source) {
func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnReceiveTimeout() {
_, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source))
resp, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error())
}
}
return resp, utils.HookStateFailure
}
func HookIdleTimeoutEvent(source Source) {
response = resp
}
return response, utils.HookStateOK
}
func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState) {
var response *http.Response
if AppConfig.Hook.EnableOnIdleTimeout() {
_, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source))
resp, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source))
if err != nil {
log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error())
return resp, utils.HookStateFailure
}
response = resp
}
return response, utils.HookStateOK
}

View File

@@ -186,6 +186,7 @@ func (s *BaseSink) Close() {
return
}
var state SessionState
{
s.Lock()
defer s.UnLock()
@@ -193,16 +194,17 @@ func (s *BaseSink) Close() {
return
}
state = s.State_
s.State_ = SessionStateClose
}
if s.State_ == SessionStateTransferring {
if state == SessionStateTransferring {
source := SourceManager.Find(s.SourceId_)
source.AddEvent(SourceEventPlayDone, s)
} else if s.State_ == SessionStateWait {
} else if state == SessionStateWait {
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
//拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送.
HookPlayDoneEvent(s)
go HookPlayDoneEvent(s)
}
}
func (s *BaseSink) PrintInfo() string {

View File

@@ -39,7 +39,6 @@ const (
SourceEventPlayDone = SourceEvent(2)
SourceEventInput = SourceEvent(3)
SourceEventClose = SourceEvent(4)
SourceEventProbeTimeout = SourceEvent(5)
)
const (
@@ -64,6 +63,8 @@ type Source interface {
// Type 推流类型
Type() SourceType
SetType(sourceType SourceType)
// OriginStreams 返回推流的原始Streams
OriginStreams() []utils.AVStream
@@ -107,17 +108,21 @@ type Source interface {
// OnDeMuxDone 所有流解析完毕回调
OnDeMuxDone()
Init(input func(data []byte) error)
Init(inputCB func(data []byte) error, closeCB func())
LoopEvent()
RemoteAddr() string
PrintInfo() string
// StartReceiveDataTimer 启动收流超时计时器
StartReceiveDataTimer()
// StartIdleTimer 启动拉流空闲计时器
StartIdleTimer()
State() SessionState
}
type PublishSource struct {
@@ -140,7 +145,8 @@ type PublishSource struct {
completed bool
probeTimer *time.Timer
Input_ func(data []byte) error //解决多态无法传递给子类的问题
inputCB func(data []byte) error //子类Input回调
closeCB func() //子类Close回调
//所有的输出协议, 持有Sink
transStreams map[TransStreamId]TransStream
@@ -153,26 +159,25 @@ type PublishSource struct {
playingEventQueue chan Sink
playingDoneEventQueue chan Sink
probeTimoutEvent chan bool
receiveDataTimeoutEvent chan byte
idleTimeoutEvent chan byte
lastPacketTime time.Time
removeSinkTime time.Time
receiveDataTimer *time.Timer
idleTimer *time.Timer
sinkCount int
closed bool
}
func (s *PublishSource) Id() string {
return s.Id_
}
func (s *PublishSource) Init(input func(data []byte) error) {
s.Input_ = input
func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func()) {
s.inputCB = inputCB
s.closeCB = closeCB
s.SetState(SessionStateHandshakeDone)
//初始化事件接收缓冲区
s.SetState(SessionStateTransferring)
//收流和网络断开的chan都阻塞执行
s.inputDataEvent = make(chan []byte)
s.dataConsumedEvent = make(chan byte)
@@ -181,13 +186,6 @@ func (s *PublishSource) Init(input func(data []byte) error) {
s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool)
if AppConfig.ReceiveTimeout > 0 {
s.receiveDataTimeoutEvent = make(chan byte)
}
if AppConfig.IdleTimeout > 0 {
s.idleTimeoutEvent = make(chan byte)
}
if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10)
}
@@ -235,14 +233,21 @@ func (s *PublishSource) LoopEvent() {
for {
select {
case data := <-s.inputDataEvent:
if !s.closed {
if AppConfig.ReceiveTimeout > 0 {
s.lastPacketTime = time.Now()
}
if err := s.Input_(data); err != nil {
if s.state == SessionStateHandshakeDone {
s.state = SessionStateTransferring
//不在父类处理hook和prepare事情
}
if err := s.inputCB(data); err != nil {
log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error())
s.Close()
}
}
s.dataConsumedEvent <- 0
break
@@ -259,26 +264,17 @@ func (s *PublishSource) LoopEvent() {
s.RemoveSink(sink)
break
case _ = <-s.closedEvent:
s.Close()
s.doClose()
return
case _ = <-s.probeTimoutEvent:
s.writeHeader()
break
case _ = <-s.receiveDataTimeoutEvent:
log.Sugar.Errorf("收流超时 source:%s", s.Id_)
s.Close()
HookReceiveTimeoutEvent(s)
break
case _ = <-s.idleTimeoutEvent:
log.Sugar.Errorf("空闲超时 source:%s", s.Id_)
s.Close()
HookIdleTimeoutEvent(s)
break
}
}
}
func (s *PublishSource) Input(data []byte) error {
s.AddEvent(SourceEventInput, data)
return nil
}
@@ -352,7 +348,7 @@ func (s *PublishSource) AddSink(sink Sink) bool {
s.transStreams = make(map[TransStreamId]TransStream, 10)
}
//创建一个新的传输流
log.Sugar.Debugf("创建%s-stream", sink.Protocol().ToString())
log.Sugar.Debugf("创建%s-stream source:%s", sink.Protocol().ToString(), s.Id_)
var err error
transStream, err = CreateTransStream(s, sink.Protocol(), streams[:size])
@@ -431,7 +427,18 @@ func (s *PublishSource) SetState(state SessionState) {
s.state = state
}
func (s *PublishSource) Close() {
func (s *PublishSource) doClose() {
if s.closed {
return
}
//清空未写完的buffer
for _, buffer := range s.pktBuffers {
if buffer != nil {
buffer.Reset()
}
}
//释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.Clear()
@@ -466,14 +473,20 @@ func (s *PublishSource) Close() {
if SessionStateClose == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo())
} else {
sink.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.Id_, sink)
}
}
})
}
HookPublishDoneEvent(s)
s.closed = true
s.transStreams = nil
go HookPublishDoneEvent(s)
}
func (s *PublishSource) Close() {
s.AddEvent(SourceEventClose, nil)
}
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
@@ -573,6 +586,10 @@ func (s *PublishSource) Type() SourceType {
return s.Type_
}
func (s *PublishSource) SetType(sourceType SourceType) {
s.Type_ = sourceType
}
func (s *PublishSource) RemoteAddr() string {
if s.Conn == nil {
return ""
@@ -581,6 +598,10 @@ func (s *PublishSource) RemoteAddr() string {
return s.Conn.RemoteAddr().String()
}
func (s *PublishSource) PrintInfo() string {
return fmt.Sprintf("id:%s type:%s conn:%s ", s.Id_, s.Type_.ToString(), s.RemoteAddr())
}
func (s *PublishSource) StartReceiveDataTimer() {
utils.Assert(s.receiveDataTimer == nil)
utils.Assert(AppConfig.ReceiveTimeout > 0)
@@ -589,11 +610,19 @@ func (s *PublishSource) StartReceiveDataTimer() {
s.receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() {
dis := time.Now().Sub(s.lastPacketTime)
//如果开启Hook通知, 根据响应决定是否关闭Source
//如果通知失败, 或者非200应答, 释放Source
//如果没有开启Hook通知, 直接删除
if dis >= time.Duration(AppConfig.ReceiveTimeout) {
s.receiveDataTimeoutEvent <- 0
} else {
s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis))))
log.Sugar.Errorf("收流超时 source:%s", s.Id_)
response, state := HookReceiveTimeoutEvent(s)
if utils.HookStateOK != state || response == nil {
s.closeCB()
return
}
}
s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis))))
})
}
@@ -606,9 +635,18 @@ func (s *PublishSource) StartIdleTimer() {
dis := time.Now().Sub(s.removeSinkTime)
if s.sinkCount < 1 && dis >= time.Duration(AppConfig.IdleTimeout) {
s.idleTimeoutEvent <- 0
} else {
s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis)))))
log.Sugar.Errorf("空闲超时 source:%s", s.Id_)
response, state := HookIdleTimeoutEvent(s)
if utils.HookStateOK != state || response == nil {
s.closeCB()
return
}
}
s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis)))))
})
}
func (s *PublishSource) State() SessionState {
return s.state
}