diff --git a/api.go b/api.go index 5ad2c4b..16e056c 100644 --- a/api.go +++ b/api.go @@ -105,6 +105,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { //返回监听的端口 response := &struct { + IP string `json:"ip"` Port uint16 `json:"port,omitempty"` }{} @@ -124,7 +125,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { source := stream.SourceManager.Find(v.Source) if source != nil { - err = &MalformedRequest{Code: http.StatusBadRequest, Msg: "gbsource 已经存在"} + err = &MalformedRequest{Code: http.StatusBadRequest, Msg: fmt.Sprintf("创建GB28181 Source失败 %s 已经存在", v.Source)} return } @@ -152,6 +153,7 @@ func (api *ApiServer) createGBSource(w http.ResponseWriter, r *http.Request) { return } + response.IP = stream.AppConfig.PublicIP response.Port = port httpResponseOk(w, response) } diff --git a/gb28181/filter.go b/gb28181/filter.go index 185ef35..d2ae5a8 100644 --- a/gb28181/filter.go +++ b/gb28181/filter.go @@ -2,16 +2,22 @@ package gb28181 import ( "github.com/pion/rtp" + "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/lkm/log" + "github.com/yangjiechina/lkm/stream" "net" ) type Filter interface { AddSource(ssrc uint32, source GBSource) bool + RemoveSource(ssrc uint32) + Input(conn net.Conn, data []byte) GBSource ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, error) + + PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource) } type BaseFilter struct { @@ -28,3 +34,23 @@ func (r BaseFilter) ParseRtpPacket(conn net.Conn, data []byte) (*rtp.Packet, err return &packet, err } + +func (r BaseFilter) PreparePublishSource(conn net.Conn, ssrc uint32, source GBSource) { + source.SetConn(conn) + source.SetSSRC(ssrc) + + source.SetState(stream.SessionStateTransferring) + + if stream.AppConfig.Hook.EnablePublishEvent() { + go func() { + _, state := stream.HookPublishEvent(source) + if utils.HookStateOK != state { + log.Sugar.Errorf("GB28181 推流失败") + + if conn != nil { + conn.Close() + } + } + }() + } +} diff --git a/gb28181/filter_single.go b/gb28181/filter_single.go index 2ed5d7e..e57ae4d 100644 --- a/gb28181/filter_single.go +++ b/gb28181/filter_single.go @@ -1,6 +1,7 @@ package gb28181 import ( + "github.com/yangjiechina/lkm/stream" "net" ) @@ -16,9 +17,10 @@ func NewSingleFilter(source GBSource) *SingleFilter { func (s *SingleFilter) AddSource(ssrc uint32, source GBSource) bool { panic("implement me") - /* utils.Assert(s.source == nil) - s.source = source - return true*/ +} + +func (s *SingleFilter) RemoveSource(ssrc uint32) { + panic("implement me") } func (s *SingleFilter) Input(conn net.Conn, data []byte) GBSource { @@ -31,6 +33,10 @@ func (s *SingleFilter) Input(conn net.Conn, data []byte) GBSource { return nil } + if stream.SessionStateHandshakeDone == s.source.State() { + s.PreparePublishSource(conn, packet.SSRC, s.source) + } + s.source.InputRtp(packet) return s.source } diff --git a/gb28181/filter_ssrc.go b/gb28181/filter_ssrc.go index 14d7836..fb3c22c 100644 --- a/gb28181/filter_ssrc.go +++ b/gb28181/filter_ssrc.go @@ -1,13 +1,16 @@ package gb28181 import ( + "github.com/yangjiechina/lkm/stream" "net" + "sync" ) type SSRCFilter struct { BaseFilter sources map[uint32]GBSource + mute sync.RWMutex } func NewSharedFilter(guestCount int) *SSRCFilter { @@ -15,13 +18,21 @@ func NewSharedFilter(guestCount int) *SSRCFilter { } func (r SSRCFilter) AddSource(ssrc uint32, source GBSource) bool { - _, ok := r.sources[ssrc] - if ok { - return false + r.mute.Lock() + defer r.mute.Lock() + + if _, ok := r.sources[ssrc]; !ok { + r.sources[ssrc] = source + return true } - r.sources[ssrc] = source - return true + return false +} + +func (r SSRCFilter) RemoveSource(ssrc uint32) { + r.mute.Lock() + defer r.mute.Lock() + delete(r.sources, ssrc) } func (r SSRCFilter) Input(conn net.Conn, data []byte) GBSource { @@ -30,11 +41,22 @@ func (r SSRCFilter) Input(conn net.Conn, data []byte) GBSource { return nil } - source, ok := r.sources[packet.SSRC] + var source GBSource + var ok bool + { + r.mute.RLock() + source, ok = r.sources[packet.SSRC] + r.mute.RUnlock() + } + if !ok { return nil } + if stream.SessionStateHandshakeDone == source.State() { + r.PreparePublishSource(conn, packet.SSRC, source) + } + source.InputRtp(packet) return source } diff --git a/gb28181/gb28181_test.go b/gb28181/gb28181_test.go index 90bc18d..1a72ad2 100644 --- a/gb28181/gb28181_test.go +++ b/gb28181/gb28181_test.go @@ -137,10 +137,10 @@ func createSource(source, transport, setup string, ssrc uint32) int { func TestUDPRecv(t *testing.T) { path := "D:\\GOProjects\\avformat\\gb28181_h264.rtp" ssrc := 0xBEBC201 - ip := "192.168.31.112" + ip := "192.168.2.148" localAddr := "0.0.0.0:20001" network := "tcp" - setup := "active" + setup := "passive" id := "hls_mystream" port := createSource(id, network, setup, uint32(ssrc)) @@ -190,7 +190,7 @@ func TestUDPRecv(t *testing.T) { panic(err) } - connectSource(id, "192.168.31.112:20001") + connectSource(id, "192.168.2.148:20001") // } diff --git a/gb28181/source.go b/gb28181/source.go index 6810e9e..adef263 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -12,7 +12,6 @@ import ( "github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/stream" "net" - "net/http" ) type TransportType int @@ -32,6 +31,8 @@ var ( SharedTCPServer *TCPServer ) +// GBSource GB28181推流Source, 接收PS流解析生成AVStream和AVPacket, 后续全权交给父类Source处理. +// udp/passive/active 都继承本接口, filter负责解析rtp包, 根据ssrc匹配对应的Source. type GBSource interface { stream.Source @@ -40,20 +41,20 @@ type GBSource interface { TransportType() TransportType PrepareTransDeMuxer(id string, ssrc uint32) + + SetConn(conn net.Conn) + + SetSSRC(ssrc uint32) } -// BaseGBSource GB28181推流Source -// 负责解析生成AVStream和AVPacket, 后续全权交给父类Source处理. type BaseGBSource struct { stream.PublishSource - deMuxerCtx *libmpeg.PSDeMuxerContext - + deMuxerCtx *libmpeg.PSDeMuxerContext audioStream utils.AVStream videoStream utils.AVStream - ssrc uint32 - + ssrc uint32 transport transport.ITransport } @@ -94,7 +95,7 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1 } if !success { - return nil, 0, fmt.Errorf("source existing") + return nil, 0, fmt.Errorf("ssrc conflict") } port = stream.AppConfig.GB28181.Port[0] @@ -139,11 +140,12 @@ func NewGBSource(id string, ssrc uint32, tcp bool, active bool) (GBSource, uint1 source.PrepareTransDeMuxer(id, ssrc) _, state := stream.PreparePublishSource(source, false) - if http.StatusOK != state { + if utils.HookStateOK != state { return nil, 0, fmt.Errorf("error code %d", state) } - source.Init(source.Input) + source.SetType(stream.SourceType28181) + source.Init(source.Input, source.Close) go source.LoopEvent() return source, port, err } @@ -303,10 +305,32 @@ func (source *BaseGBSource) OnCompletePacket(index int, mediaType utils.AVMediaT } func (source *BaseGBSource) Close() { + log.Sugar.Infof("GB28181推流结束 ssrc:%d %s", source.ssrc, source.PublishSource.PrintInfo()) + + //释放收流端口 if source.transport != nil { source.transport.Close() source.transport = nil } + //删除ssrc关联 + if !stream.AppConfig.GB28181.IsMultiPort() { + if SharedTCPServer != nil { + SharedTCPServer.filter.RemoveSource(source.ssrc) + } + + if SharedUDPServer != nil { + SharedUDPServer.filter.RemoveSource(source.ssrc) + } + } + source.PublishSource.Close() } + +func (source *BaseGBSource) SetConn(conn net.Conn) { + source.Conn = conn +} + +func (source *BaseGBSource) SetSSRC(ssrc uint32) { + source.ssrc = ssrc +} diff --git a/gb28181/source_passive.go b/gb28181/source_passive.go index 1aaa6a3..a5829a6 100644 --- a/gb28181/source_passive.go +++ b/gb28181/source_passive.go @@ -2,7 +2,6 @@ package gb28181 import ( "github.com/pion/rtp" - "github.com/yangjiechina/lkm/stream" ) type PassiveSource struct { @@ -18,6 +17,6 @@ func (t PassiveSource) TransportType() TransportType { } func (t PassiveSource) InputRtp(pkt *rtp.Packet) error { - t.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload) + t.PublishSource.Input(pkt.Payload) return nil } diff --git a/gb28181/source_udp.go b/gb28181/source_udp.go index dc19998..b236b1c 100644 --- a/gb28181/source_udp.go +++ b/gb28181/source_udp.go @@ -45,6 +45,6 @@ func (u UDPSource) InputRtp(pkt *rtp.Packet) error { u.rtpBuffer.FreeHead() - u.PublishSource.AddEvent(stream.SourceEventInput, pkt.Payload) + u.PublishSource.Input(pkt.Payload) } } diff --git a/gb28181/tcp_server.go b/gb28181/tcp_server.go index d866dd1..09d9831 100644 --- a/gb28181/tcp_server.go +++ b/gb28181/tcp_server.go @@ -32,7 +32,7 @@ func NewTCPServer(addr net.Addr, filter Filter) (*TCPServer, error) { } func (T *TCPServer) OnConnected(conn net.Conn) { - log.Sugar.Infof("客户端链接 conn:%s", conn.RemoteAddr().String()) + log.Sugar.Infof("GB28181连接 conn:%s", conn.RemoteAddr().String()) } func (T *TCPServer) OnPacket(conn net.Conn, data []byte) { @@ -53,7 +53,7 @@ func (T *TCPServer) OnPacket(conn net.Conn, data []byte) { } func (T *TCPServer) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Infof("客户端断开链接 conn:%s", conn.RemoteAddr().String()) + log.Sugar.Infof("GB28181断开连接 conn:%s", conn.RemoteAddr().String()) con := conn.(*transport.Conn) if con.Data != nil { diff --git a/jt1078/jt_server.go b/jt1078/jt_server.go index c635f58..7b59ea8 100644 --- a/jt1078/jt_server.go +++ b/jt1078/jt_server.go @@ -4,7 +4,6 @@ import ( "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/lkm/log" - "github.com/yangjiechina/lkm/stream" "net" ) @@ -26,18 +25,20 @@ func (s jtServer) OnConnected(conn net.Conn) { log.Sugar.Debugf("jtserver连接 conn:%s", conn.RemoteAddr().String()) t := conn.(*transport.Conn) - t.Data = NewSession() + t.Data = NewSession(conn) } func (s jtServer) OnPacket(conn net.Conn, data []byte) { - conn.(*transport.Conn).Data.(*Session).AddEvent(stream.SourceEventInput, data) + conn.(*transport.Conn).Data.(*Session).Input(data) } func (s jtServer) OnDisConnected(conn net.Conn, err error) { log.Sugar.Debugf("jtserver断开连接 conn:%s", conn.RemoteAddr().String()) t := conn.(*transport.Conn) + utils.Assert(t.Data != nil) t.Data.(*Session).Close() + t.Data = nil } func (s jtServer) Start(addr net.Addr) error { diff --git a/jt1078/jt_session.go b/jt1078/jt_session.go index f5e6cd1..467ebaa 100644 --- a/jt1078/jt_session.go +++ b/jt1078/jt_session.go @@ -7,6 +7,7 @@ import ( "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/lkm/log" "github.com/yangjiechina/lkm/stream" + "net" ) const ( @@ -211,12 +212,16 @@ func (s *Session) OnJtPTPPacket(data []byte) { s.rtpPacket = &RtpPacket{} *s.rtpPacket = packet - _, state := stream.PreparePublishSource(s, true) - if utils.HookStateOK != state { - log.Sugar.Errorf("1078推流失败 source:%s", s.phone) - } + go func() { + _, state := stream.PreparePublishSource(s, true) + if utils.HookStateOK != state { + log.Sugar.Errorf("1078推流失败 source:%s", s.phone) - s.Close() + if s.Conn != nil { + s.Conn.Close() + } + } + }() } //完整包/最后一个分包, 创建AVPacket @@ -271,15 +276,30 @@ func (s *Session) Input(data []byte) error { } func (s *Session) Close() { + log.Sugar.Infof("1078推流结束 phone number:%s %s", s.phone, s.PublishSource.PrintInfo()) + if s.audioBuffer != nil { + s.audioBuffer.Clear() + } + + if s.videoBuffer != nil { + s.videoBuffer.Clear() + } + + s.PublishSource.Close() } -func NewSession() *Session { - session := Session{} +func NewSession(conn net.Conn) *Session { + session := Session{ + PublishSource: stream.PublishSource{ + Conn: conn, + Type_: stream.SourceType1078, + }, + } delimiter := [4]byte{0x30, 0x31, 0x63, 0x64} session.decoder = transport.NewDelimiterFrameDecoder(1024*1024*2, delimiter[:], session.OnJtPTPPacket) - session.Init(session.Input) + session.Init(session.Input, session.Close) go session.LoopEvent() return &session } diff --git a/main.go b/main.go index 2454810..171330c 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func NewDefaultAppConfig() stream.AppConfig_ { MergeWriteLatency: 350, PublicIP: "192.168.2.148", IdleTimeout: int64(60 * time.Second), - ReceiveTimeout: int64(60 * time.Second), + ReceiveTimeout: int64(10 * time.Second), Hls: stream.HlsConfig{ Enable: false, @@ -81,13 +81,13 @@ func NewDefaultAppConfig() stream.AppConfig_ { Hook: stream.HookConfig{ Enable: true, Timeout: int64(60 * time.Second), - OnPublishUrl: "http://localhost:8082/api/v1/on_publish", - OnPublishDoneUrl: "http://localhost:8082/api/v1/on_publish_done", - OnPlayUrl: "http://localhost:8082/api/v1/on_play", - OnPlayDoneUrl: "http://localhost:8082/api/on_play_done", - OnRecordUrl: "http://localhost:8082/api/v1/on_reocrd", - OnIdleTimeoutUrl: "http://localhost:8082/api/v1/on_idle_timeout", - OnReceiveTimeoutUrl: "http://localhost:8082/api/v1/on_recv_timeout", + OnPublishUrl: "http://localhost:9000/api/v1/hook/on_publish", + OnPublishDoneUrl: "http://localhost:9000/api/v1/hook/on_publish_done", + OnPlayUrl: "http://localhost:9000/api/v1/hook/on_play", + OnPlayDoneUrl: "http://localhost:9000/api/v1/hook/on_play_done", + OnRecordUrl: "http://localhost:9000/api/v1/hook/on_reocrd", + OnIdleTimeoutUrl: "http://localhost:9000/api/v1/hook/on_idle_timeout", + OnReceiveTimeoutUrl: "http://localhost:9000/api/v1/hook/on_receive_timeout", }, } } diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 9807b93..1eb6c9b 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -45,7 +45,7 @@ func (s *Session) OnPublish(app, stream_ string, response chan utils.HookState) s.handle = source s.isPublisher = true - source.Init(source.Input) + source.Init(source.Input, source.Close) go source.LoopEvent() } @@ -72,7 +72,7 @@ func (s *Session) OnPlay(app, stream_ string, response chan utils.HookState) { func (s *Session) Input(conn net.Conn, data []byte) error { //如果是推流,并且握手成功,后续收到的包,都将发送给LoopEvent处理 if s.isPublisher { - s.handle.(*Publisher).AddEvent(stream.SourceEventInput, data) + s.handle.(*Publisher).PublishSource.Input(data) return nil } else { return s.stack.Input(conn, data) @@ -80,8 +80,6 @@ func (s *Session) Input(conn net.Conn, data []byte) error { } func (s *Session) Close() { - log.Sugar.Debugf("释放rtmp session conn:%s", s.conn.RemoteAddr().String()) - //释放协议栈 if s.stack != nil { s.stack.Close() @@ -92,13 +90,16 @@ func (s *Session) Close() { return } - _, ok := s.handle.(*Publisher) + publisher, ok := s.handle.(*Publisher) if ok { + log.Sugar.Infof("rtmp推流结束 %s", publisher.PrintInfo()) + if s.isPublisher { - s.handle.(*Publisher).AddEvent(stream.SourceEventClose, nil) + s.handle.(*Publisher).Close() } } else { sink := s.handle.(stream.Sink) + log.Sugar.Infof("rtmp拉流结束 %s", sink.PrintInfo()) sink.Close() } } diff --git a/stream/config.go b/stream/config.go index e293aa8..e29e964 100644 --- a/stream/config.go +++ b/stream/config.go @@ -100,8 +100,8 @@ type HookConfig struct { OnPlayUrl string `json:"on_play"` //拉流回调 OnPlayDoneUrl string `json:"on_play_done"` //拉流结束回调 OnRecordUrl string `json:"on_record"` //录制流回调 - OnIdleTimeoutUrl string `json:"on_idle_timeout"` //多久没有sink拉流回调 - OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //多久没有推流回调 + OnIdleTimeoutUrl string `json:"on_idle_timeout"` //没有sink拉流回调 + OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调 } func (hook *HookConfig) EnablePublishEvent() bool { @@ -144,8 +144,8 @@ type AppConfig_ struct { GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 ProbeTimeout int `json:"probe_timeout"` PublicIP string `json:"public_ip"` - IdleTimeout int64 `json:"idle_timeout"` //多长时间没有sink拉流, 单位秒 - ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒 + IdleTimeout int64 `json:"idle_timeout"` //多长时间没有拉流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. + ReceiveTimeout int64 `json:"receive_timeout"` //多长时间没有收到流, 单位秒. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. diff --git a/stream/hook.go b/stream/hook.go index 0a64266..5a6237d 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -4,22 +4,23 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/yangjiechina/lkm/log" "net/http" "time" ) -// 每个通知的时间都需要携带的字段 +// 每个通知事件都需要携带的字段 type eventInfo struct { - stream string //stream id - protocol string //推拉流协议 - remoteAddr string //peer地址 + Stream string `json:"stream"` //stream id + Protocol string `json:"protocol"` //推拉流协议 + RemoteAddr string `json:"remote_addr"` //peer地址 } func NewHookPlayEventInfo(sink Sink) eventInfo { - return eventInfo{stream: sink.SourceId(), protocol: sink.Protocol().ToString(), remoteAddr: sink.PrintInfo()} + return eventInfo{Stream: sink.SourceId(), Protocol: sink.Protocol().ToString(), RemoteAddr: sink.PrintInfo()} } func NewHookPublishEventInfo(source Source) eventInfo { - return eventInfo{stream: source.Id(), protocol: source.Type().ToString(), remoteAddr: source.RemoteAddr()} + return eventInfo{Stream: source.Id(), Protocol: source.Type().ToString(), RemoteAddr: source.RemoteAddr()} } func sendHookEvent(url string, body interface{}) (*http.Response, error) { @@ -36,6 +37,8 @@ func sendHookEvent(url string, body interface{}) (*http.Response, error) { return nil, err } + log.Sugar.Infof("发送hook通知 url:%s body:%s", url, marshal) + request.Header.Set("Content-Type", "application/json") return client.Do(request) } @@ -47,8 +50,8 @@ func Hook(event HookEvent, body interface{}) (*http.Response, error) { } response, err := sendHookEvent(url, body) - if err != nil && http.StatusOK != response.StatusCode { - return response, fmt.Errorf("code:%d reason:%s", response.StatusCode, response.Status) + if err == nil && http.StatusOK != response.StatusCode { + return response, fmt.Errorf("reason %s", response.Status) } return response, err diff --git a/stream/hook_sink.go b/stream/hook_sink.go index 0c69767..a54d9a5 100644 --- a/stream/hook_sink.go +++ b/stream/hook_sink.go @@ -47,7 +47,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) { var response *http.Response if AppConfig.Hook.EnableOnPlayDone() { - hook, err := Hook(HookEventPlay, NewHookPlayEventInfo(sink)) + hook, err := Hook(HookEventPlayDone, NewHookPlayEventInfo(sink)) if err != nil { log.Sugar.Errorf("通知播放结束事件失败 err:%s sink:%s-%v source:%s", err.Error(), sink.Protocol().ToString(), sink.Id(), sink.SourceId()) return hook, false diff --git a/stream/hook_source.go b/stream/hook_source.go index 91ca761..c052fd6 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -59,20 +59,34 @@ func HookPublishDoneEvent(source Source) { } } -func HookReceiveTimeoutEvent(source Source) { +func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState) { + var response *http.Response + if AppConfig.Hook.EnableOnReceiveTimeout() { - _, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source)) + resp, err := Hook(HookEventReceiveTimeout, NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知收流超时事件失败 source:%s err:%s", source.Id(), err.Error()) + return resp, utils.HookStateFailure } + + response = resp } + + return response, utils.HookStateOK } -func HookIdleTimeoutEvent(source Source) { +func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState) { + var response *http.Response + if AppConfig.Hook.EnableOnIdleTimeout() { - _, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source)) + resp, err := Hook(HookEventIdleTimeout, NewHookPublishEventInfo(source)) if err != nil { log.Sugar.Errorf("通知空闲超时时间失败 source:%s err:%s", source.Id(), err.Error()) + return resp, utils.HookStateFailure } + + response = resp } + + return response, utils.HookStateOK } diff --git a/stream/sink.go b/stream/sink.go index bd4e24f..b40f6ae 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -186,6 +186,7 @@ func (s *BaseSink) Close() { return } + var state SessionState { s.Lock() defer s.UnLock() @@ -193,16 +194,17 @@ func (s *BaseSink) Close() { return } + state = s.State_ s.State_ = SessionStateClose } - if s.State_ == SessionStateTransferring { + if state == SessionStateTransferring { source := SourceManager.Find(s.SourceId_) source.AddEvent(SourceEventPlayDone, s) - } else if s.State_ == SessionStateWait { + } else if state == SessionStateWait { RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) //拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送. - HookPlayDoneEvent(s) + go HookPlayDoneEvent(s) } } func (s *BaseSink) PrintInfo() string { diff --git a/stream/source.go b/stream/source.go index f526b1d..4099d32 100644 --- a/stream/source.go +++ b/stream/source.go @@ -35,11 +35,10 @@ const ( ProtocolHls = Protocol(4) ProtocolRtc = Protocol(5) - SourceEventPlay = SourceEvent(1) - SourceEventPlayDone = SourceEvent(2) - SourceEventInput = SourceEvent(3) - SourceEventClose = SourceEvent(4) - SourceEventProbeTimeout = SourceEvent(5) + SourceEventPlay = SourceEvent(1) + SourceEventPlayDone = SourceEvent(2) + SourceEventInput = SourceEvent(3) + SourceEventClose = SourceEvent(4) ) const ( @@ -64,6 +63,8 @@ type Source interface { // Type 推流类型 Type() SourceType + SetType(sourceType SourceType) + // OriginStreams 返回推流的原始Streams OriginStreams() []utils.AVStream @@ -107,17 +108,21 @@ type Source interface { // OnDeMuxDone 所有流解析完毕回调 OnDeMuxDone() - Init(input func(data []byte) error) + Init(inputCB func(data []byte) error, closeCB func()) LoopEvent() RemoteAddr() string + PrintInfo() string + // StartReceiveDataTimer 启动收流超时计时器 StartReceiveDataTimer() // StartIdleTimer 启动拉流空闲计时器 StartIdleTimer() + + State() SessionState } type PublishSource struct { @@ -140,39 +145,39 @@ type PublishSource struct { completed bool probeTimer *time.Timer - Input_ func(data []byte) error //解决多态无法传递给子类的问题 + inputCB func(data []byte) error //子类Input回调 + closeCB func() //子类Close回调 //所有的输出协议, 持有Sink transStreams map[TransStreamId]TransStream //sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作 //golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件 - inputDataEvent chan []byte - dataConsumedEvent chan byte //解析完input的数据后,才能继续从网络io中读取流 - closedEvent chan byte - playingEventQueue chan Sink - playingDoneEventQueue chan Sink - probeTimoutEvent chan bool - receiveDataTimeoutEvent chan byte - idleTimeoutEvent chan byte + inputDataEvent chan []byte + dataConsumedEvent chan byte //解析完input的数据后,才能继续从网络io中读取流 + closedEvent chan byte + playingEventQueue chan Sink + playingDoneEventQueue chan Sink + probeTimoutEvent chan bool lastPacketTime time.Time removeSinkTime time.Time receiveDataTimer *time.Timer idleTimer *time.Timer sinkCount int + closed bool } func (s *PublishSource) Id() string { return s.Id_ } -func (s *PublishSource) Init(input func(data []byte) error) { - s.Input_ = input +func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func()) { + s.inputCB = inputCB + s.closeCB = closeCB + s.SetState(SessionStateHandshakeDone) //初始化事件接收缓冲区 - s.SetState(SessionStateTransferring) - //收流和网络断开的chan都阻塞执行 s.inputDataEvent = make(chan []byte) s.dataConsumedEvent = make(chan byte) @@ -181,13 +186,6 @@ func (s *PublishSource) Init(input func(data []byte) error) { s.playingDoneEventQueue = make(chan Sink, 128) s.probeTimoutEvent = make(chan bool) - if AppConfig.ReceiveTimeout > 0 { - s.receiveDataTimeoutEvent = make(chan byte) - } - if AppConfig.IdleTimeout > 0 { - s.idleTimeoutEvent = make(chan byte) - } - if s.transStreams == nil { s.transStreams = make(map[TransStreamId]TransStream, 10) } @@ -235,13 +233,20 @@ func (s *PublishSource) LoopEvent() { for { select { case data := <-s.inputDataEvent: - if AppConfig.ReceiveTimeout > 0 { - s.lastPacketTime = time.Now() - } + if !s.closed { + if AppConfig.ReceiveTimeout > 0 { + s.lastPacketTime = time.Now() + } - if err := s.Input_(data); err != nil { - log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error()) - s.Close() + if s.state == SessionStateHandshakeDone { + s.state = SessionStateTransferring + //不在父类处理hook和prepare事情 + } + + if err := s.inputCB(data); err != nil { + log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", s.Id_, err.Error()) + s.Close() + } } s.dataConsumedEvent <- 0 @@ -259,26 +264,17 @@ func (s *PublishSource) LoopEvent() { s.RemoveSink(sink) break case _ = <-s.closedEvent: - s.Close() + s.doClose() return case _ = <-s.probeTimoutEvent: s.writeHeader() break - case _ = <-s.receiveDataTimeoutEvent: - log.Sugar.Errorf("收流超时 source:%s", s.Id_) - s.Close() - HookReceiveTimeoutEvent(s) - break - case _ = <-s.idleTimeoutEvent: - log.Sugar.Errorf("空闲超时 source:%s", s.Id_) - s.Close() - HookIdleTimeoutEvent(s) - break } } } func (s *PublishSource) Input(data []byte) error { + s.AddEvent(SourceEventInput, data) return nil } @@ -352,7 +348,7 @@ func (s *PublishSource) AddSink(sink Sink) bool { s.transStreams = make(map[TransStreamId]TransStream, 10) } //创建一个新的传输流 - log.Sugar.Debugf("创建%s-stream", sink.Protocol().ToString()) + log.Sugar.Debugf("创建%s-stream source:%s", sink.Protocol().ToString(), s.Id_) var err error transStream, err = CreateTransStream(s, sink.Protocol(), streams[:size]) @@ -431,7 +427,18 @@ func (s *PublishSource) SetState(state SessionState) { s.state = state } -func (s *PublishSource) Close() { +func (s *PublishSource) doClose() { + if s.closed { + return + } + + //清空未写完的buffer + for _, buffer := range s.pktBuffers { + if buffer != nil { + buffer.Reset() + } + } + //释放GOP缓存 if s.gopBuffer != nil { s.gopBuffer.Clear() @@ -466,14 +473,20 @@ func (s *PublishSource) Close() { if SessionStateClose == sink.State() { log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo()) } else { + sink.SetState(SessionStateWait) AddSinkToWaitingQueue(s.Id_, sink) } } }) } - HookPublishDoneEvent(s) + s.closed = true s.transStreams = nil + go HookPublishDoneEvent(s) +} + +func (s *PublishSource) Close() { + s.AddEvent(SourceEventClose, nil) } func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) { @@ -573,6 +586,10 @@ func (s *PublishSource) Type() SourceType { return s.Type_ } +func (s *PublishSource) SetType(sourceType SourceType) { + s.Type_ = sourceType +} + func (s *PublishSource) RemoteAddr() string { if s.Conn == nil { return "" @@ -581,6 +598,10 @@ func (s *PublishSource) RemoteAddr() string { return s.Conn.RemoteAddr().String() } +func (s *PublishSource) PrintInfo() string { + return fmt.Sprintf("id:%s type:%s conn:%s ", s.Id_, s.Type_.ToString(), s.RemoteAddr()) +} + func (s *PublishSource) StartReceiveDataTimer() { utils.Assert(s.receiveDataTimer == nil) utils.Assert(AppConfig.ReceiveTimeout > 0) @@ -589,11 +610,19 @@ func (s *PublishSource) StartReceiveDataTimer() { s.receiveDataTimer = time.AfterFunc(time.Duration(AppConfig.ReceiveTimeout), func() { dis := time.Now().Sub(s.lastPacketTime) + //如果开启Hook通知, 根据响应决定是否关闭Source + //如果通知失败, 或者非200应答, 释放Source + //如果没有开启Hook通知, 直接删除 if dis >= time.Duration(AppConfig.ReceiveTimeout) { - s.receiveDataTimeoutEvent <- 0 - } else { - s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis)))) + log.Sugar.Errorf("收流超时 source:%s", s.Id_) + response, state := HookReceiveTimeoutEvent(s) + if utils.HookStateOK != state || response == nil { + s.closeCB() + return + } } + + s.receiveDataTimer.Reset(time.Duration(math.Abs(float64(time.Duration(AppConfig.ReceiveTimeout) - dis)))) }) } @@ -606,9 +635,18 @@ func (s *PublishSource) StartIdleTimer() { dis := time.Now().Sub(s.removeSinkTime) if s.sinkCount < 1 && dis >= time.Duration(AppConfig.IdleTimeout) { - s.idleTimeoutEvent <- 0 - } else { - s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis))))) + log.Sugar.Errorf("空闲超时 source:%s", s.Id_) + response, state := HookIdleTimeoutEvent(s) + if utils.HookStateOK != state || response == nil { + s.closeCB() + return + } } + + s.idleTimer.Reset(time.Duration(math.Abs(float64(AppConfig.IdleTimeout - int64(dis))))) }) } + +func (s *PublishSource) State() SessionState { + return s.state +}