fix: 因sink推流失败, 删除sink时, 拉流计数为负数问题

This commit is contained in:
ydajiang
2025-01-07 20:53:20 +08:00
parent fdf4ec4786
commit 4e62e7edb2
5 changed files with 14 additions and 14 deletions

View File

@@ -27,5 +27,5 @@ func (s *Sink) Write(index int, data [][]byte, ts int64) error {
}
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {
return &Sink{BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}}
return &Sink{BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamFlv, Conn: transport.NewConn(conn), TCPStreaming: true}}
}

View File

@@ -75,7 +75,7 @@ func (f *ForwardSink) Close() {
// NewForwardSink 创建国标级联转发流Sink
// 返回监听的端口和Sink
func NewForwardSink(ssrc uint32, serverAddr string, setup SetupType, sinkId stream.SinkID, sourceId string) (stream.Sink, int, error) {
sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup}
sink := &ForwardSink{BaseSink: stream.BaseSink{ID: sinkId, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamGBStreamForward}, ssrc: ssrc, setup: setup}
if SetupUDP == setup {
remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)

View File

@@ -144,5 +144,5 @@ func (s *Sink) Write(index int, data [][]byte, ts int64) error {
}
func NewSink(id stream.SinkID, sourceId string, offer string, cb func(sdp string)) stream.Sink {
return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
return &Sink{stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtc, TCPStreaming: false}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}

View File

@@ -147,7 +147,7 @@ func (s *Sink) Close() {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, cb func(sdp string)) stream.Sink {
return &Sink{
stream.BaseSink{ID: id, SourceID: sourceId, Protocol: stream.TransStreamRtsp, Conn: conn},
stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtsp, Conn: conn},
nil,
cb,
}

View File

@@ -432,7 +432,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
var err error
transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), tracks)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), s.ID)
return false
}
@@ -441,23 +441,23 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
sink.SetTransStreamID(transStreamId)
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID)
return false
}
{
sink.Lock()
defer sink.UnLock()
if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String())
} else {
sink.SetState(SessionStateTransferring)
}
}
err := sink.StartStreaming(transStream)
if err != nil {
log.Sugar.Errorf("开始推流失败 err: %s", err.Error())
return false
}
// 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流
if !sink.IsReady() {
return true
@@ -699,10 +699,10 @@ func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket) {
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) {
if s.completed {
log.Sugar.Warnf("添加track失败,已经WriteHeader. source: %s", s.ID)
log.Sugar.Warnf("添加%s track失败,已经WriteHeader. source: %s", stream.Type().ToString(), s.ID)
return
} else if !s.NotTrackAdded(stream.Index()) {
log.Sugar.Warnf("添加track失败,已经添加索引为%d的track. source: %s", stream.Index(), s.ID)
log.Sugar.Warnf("添加%s track失败,已经添加索引为%d的track. source: %s", stream.Type().ToString(), stream.Index(), s.ID)
return
}