diff --git a/flv/flv_sink.go b/flv/flv_sink.go index d25e5a3..a8874c4 100644 --- a/flv/flv_sink.go +++ b/flv/flv_sink.go @@ -27,5 +27,5 @@ func (s *Sink) Write(index int, data [][]byte, ts int64) error { } func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink { - return &Sink{BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}} + return &Sink{BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}} } diff --git a/gb28181/forward_sink.go b/gb28181/forward_sink.go index 2fea04a..a11dc83 100644 --- a/gb28181/forward_sink.go +++ b/gb28181/forward_sink.go @@ -75,7 +75,7 @@ func (f *ForwardSink) Close() { // NewForwardSink 创建国标级联转发流Sink // 返回监听的端口和Sink func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stream.SinkID, sourceId string) (stream.Sink, int, error) { - sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup} + sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup} if SetupUDP == setup { remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr) diff --git a/rtc/rtc_sink.go b/rtc/rtc_sink.go index ff4678a..4763b65 100644 --- a/rtc/rtc_sink.go +++ b/rtc/rtc_sink.go @@ -144,5 +144,5 @@ func (s *Sink) Write(index int, data [][]byte, ts int64) error { } func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink { - return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb} + return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb} } diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index 1638fb8..f8442ab 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -147,7 +147,7 @@ func (s *Sink) Close() { func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink { return &Sink{ - stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn}, + stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtsp, Conn: conn}, nil, cb, } diff --git a/stream/source.go b/stream/source.go index 8484d66..05a9c38 100644 --- a/stream/source.go +++ b/stream/source.go @@ -432,7 +432,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool { var err error transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), tracks) if err != nil { - log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID) + log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), s.ID) return false } @@ -441,23 +441,23 @@ func (s *PublishSource) doAddSink(sink Sink) bool { sink.SetTransStreamID(transStreamId) + err := sink.StartStreaming(transStream) + if err != nil { + log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID) + return false + } + { sink.Lock() defer sink.UnLock() if SessionStateClosed == sink.GetState() { - log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String()) + log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String()) } else { sink.SetState(SessionStateTransferring) } } - err := sink.StartStreaming(transStream) - if err != nil { - log.Sugar.Errorf("开始推流失败 err: %s", err.Error()) - return false - } - // 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流 if !sink.IsReady() { return true @@ -699,10 +699,10 @@ func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) { func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) { if s.completed { - log.Sugar.Warnf("添加track失败,已经WriteHeader. source: %s", s.ID) + log.Sugar.Warnf("添加%s track失败,已经WriteHeader. source: %s", stream.Type().ToString(), s.ID) return } else if !s.NotTrackAdded(stream.Index()) { - log.Sugar.Warnf("添加track失败,已经添加索引为%d的track. source: %s", stream.Index(), s.ID) + log.Sugar.Warnf("添加%s track失败,已经添加索引为%d的track. source: %s", stream.Type().ToString(), stream.Index(), s.ID) return }