修复rtc拉流失败问题

This commit is contained in:
yangjiechina
2024-10-31 15:57:31 +08:00
parent 6841c4725f
commit afd6b6e965
9 changed files with 58 additions and 30 deletions

View File

@@ -79,7 +79,7 @@ func NewTCPSession(conn net.Conn, filter Filter) *TCPSession {
session.Init(source) session.Init(source)
} }
if stream.SessionStateHandshakeDone == session.source.State() { if stream.SessionStateHandshakeSuccess == session.source.State() {
session.source.PreparePublish(conn, packet.SSRC, session.source) session.source.PreparePublish(conn, packet.SSRC, session.source)
} }

View File

@@ -46,7 +46,7 @@ func (U *UDPServer) OnPacket(conn net.Conn, data []byte) []byte {
return nil return nil
} }
if stream.SessionStateHandshakeDone == source.State() { if stream.SessionStateHandshakeSuccess == source.State() {
conn.(*transport.Conn).Data = source conn.(*transport.Conn).Data = source
source.PreparePublish(conn, packet.SSRC, source) source.PreparePublish(conn, packet.SSRC, source)
} }

View File

@@ -25,8 +25,8 @@ type Sink struct {
func (s *Sink) StartStreaming(transStream stream.TransStream) error { func (s *Sink) StartStreaming(transStream stream.TransStream) error {
// 创建PeerConnection // 创建PeerConnection
var videoTrack *webrtc.TrackLocalStaticSample var remoteTrack *webrtc.TrackLocalStaticSample
s.setTrackCount(transStream.TrackCount()) s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackCount())
connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{}) connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) { connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
@@ -64,16 +64,16 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
id = "video" id = "video"
} }
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion") remoteTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mimeType}, id, "pion")
if err != nil { if err != nil {
panic(err)
} else if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
return err return err
} else if _, err = connection.AddTrack(videoTrack); err != nil { } else if _, err := connection.AddTransceiverFromTrack(remoteTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
return err
} else if _, err = connection.AddTrack(remoteTrack); err != nil {
return err return err
} }
s.addTrack(index, videoTrack) s.tracks[index] = remoteTrack
} }
if len(connection.GetTransceivers()) == 0 { if len(connection.GetTransceivers()) == 0 {
@@ -93,26 +93,31 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
<-complete <-complete
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
s.state = state s.state = state
log.Sugar.Infof("ice state:%v sink:%d source:%s", state.String(), s.GetID(), s.SourceID) log.Sugar.Infof("ice state: %v sink: %d source: %s", state.String(), s.GetID(), s.SourceID)
if state > webrtc.ICEConnectionStateDisconnected { if state > webrtc.ICEConnectionStateDisconnected {
log.Sugar.Errorf("webrtc peer断开连接 sink: %v source :%s", s.GetID(), s.SourceID) log.Sugar.Errorf("webrtc peer断开连接 sink: %v source: %s", s.GetID(), s.SourceID)
s.Close() s.Close()
} }
}) })
s.peer = connection s.peer = connection
// offer的sdp, 应答给http请求 // offer的sdp, 应答给http请求
s.cb(connection.LocalDescription().SDP) if s.cb != nil {
s.cb(connection.LocalDescription().SDP)
s.cb = nil
}
return nil return nil
} }
func (s *Sink) setTrackCount(count int) {
s.tracks = make([]*webrtc.TrackLocalStaticSample, count)
}
func (s *Sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error { func (s *Sink) Close() {
s.tracks[index] = track if s.peer != nil {
return nil s.peer.Close()
s.peer = nil
}
s.BaseSink.Close()
} }
func (s *Sink) Write(index int, data [][]byte, ts int64) error { func (s *Sink) Write(index int, data [][]byte, ts int64) error {

View File

@@ -27,7 +27,7 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
t.AppendOutStreamBuffer(extra) t.AppendOutStreamBuffer(extra)
} }
t.AppendOutStreamBuffer(packet.Data()) t.AppendOutStreamBuffer(packet.AnnexBPacketData(t.BaseTransStream.Tracks[packet.Index()]))
} }
return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil return t.OutBuffer[:t.OutBufferSize], int64(uint32(packet.Duration(1000))), utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame(), nil

View File

@@ -27,7 +27,7 @@ func (s *Sink) Close() {
func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink { func NewSink(id stream.SinkID, sourceId string, conn net.Conn, stack *librtmp.Stack) stream.Sink {
return &Sink{ return &Sink{
BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreate, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE, TCPStreaming: true}, BaseSink: stream.BaseSink{ID: id, SourceID: sourceId, State: stream.SessionStateCreated, Protocol: stream.TransStreamRtmp, Conn: conn, DesiredAudioCodecId_: utils.AVCodecIdNONE, DesiredVideoCodecId_: utils.AVCodecIdNONE, TCPStreaming: true},
stack: stack, stack: stack,
} }
} }

View File

@@ -37,7 +37,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.GetID()) log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.GetID())
return response, utils.HookStateFailure return response, utils.HookStateFailure
} else { } else {
sink.SetState(SessionStateWait) sink.SetState(SessionStateWaiting)
AddSinkToWaitingQueue(sink.GetSourceID(), sink) AddSinkToWaitingQueue(sink.GetSourceID(), sink)
} }
} }

View File

@@ -3,6 +3,7 @@ package stream
import ( import (
"fmt" "fmt"
"github.com/lkmio/avformat/utils" "github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"net" "net"
"net/url" "net/url"
"sync" "sync"
@@ -184,6 +185,8 @@ func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID {
// 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink. // 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink.
// 2. 发送PlayDoneHook事件 // 2. 发送PlayDoneHook事件
func (s *BaseSink) Close() { func (s *BaseSink) Close() {
log.Sugar.Debugf("closing the %s sink. id: %s. current session state: %s", s.Protocol, SinkId2String(s.ID), s.State)
if SessionStateClosed == s.State { if SessionStateClosed == s.State {
return return
} }
@@ -194,7 +197,7 @@ func (s *BaseSink) Close() {
} }
// Sink未添加到任何队列, 不做处理 // Sink未添加到任何队列, 不做处理
if s.State < SessionStateWait { if s.State < SessionStateWaiting {
return return
} }
@@ -216,7 +219,7 @@ func (s *BaseSink) Close() {
if source := SourceManager.Find(s.SourceID); source != nil { if source := SourceManager.Find(s.SourceID); source != nil {
source.RemoveSink(s) source.RemoveSink(s)
} }
} else if state == SessionStateWait { } else if state == SessionStateWaiting {
// 从等待队列中删除Sink // 从等待队列中删除Sink
RemoveSinkFromWaitingQueue(s.SourceID, s.ID) RemoveSinkFromWaitingQueue(s.SourceID, s.ID)
go HookPlayDoneEvent(s) go HookPlayDoneEvent(s)

View File

@@ -207,7 +207,7 @@ func (s *PublishSource) SetID(id string) {
} }
func (s *PublishSource) Init(receiveQueueSize int) { func (s *PublishSource) Init(receiveQueueSize int) {
s.SetState(SessionStateHandshakeDone) s.SetState(SessionStateHandshakeSuccess)
// 初始化事件接收管道 // 初始化事件接收管道
// -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖 // -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖
@@ -557,14 +557,14 @@ func (s *PublishSource) SetState(state SessionState) {
} }
func (s *PublishSource) DoClose() { func (s *PublishSource) DoClose() {
log.Sugar.Debugf("closing the %s source. id: %s. closed flag: %t", s.Type, s.ID, s.closed)
if s.closed { if s.closed {
return return
} }
s.closed = true s.closed = true
log.Sugar.Infof("关闭推流源: %s", s.ID)
if s.TransDeMuxer != nil { if s.TransDeMuxer != nil {
s.TransDeMuxer.Close() s.TransDeMuxer.Close()
s.TransDeMuxer = nil s.TransDeMuxer = nil
@@ -633,7 +633,7 @@ func (s *PublishSource) DoClose() {
if SessionStateClosed == sink.GetState() { if SessionStateClosed == sink.GetState() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String()) log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String())
} else { } else {
sink.SetState(SessionStateWait) sink.SetState(SessionStateWaiting)
AddSinkToWaitingQueue(s.ID, sink) AddSinkToWaitingQueue(s.ID, sink)
} }

View File

@@ -36,11 +36,11 @@ const (
) )
const ( const (
SessionStateCreate = SessionState(1) // 新建状态 SessionStateCreated = SessionState(1) // 新建状态
SessionStateHandshaking = SessionState(2) // 握手中 SessionStateHandshaking = SessionState(2) // 握手中
SessionStateHandshakeFailure = SessionState(3) // 握手失败 SessionStateHandshakeFailure = SessionState(3) // 握手失败
SessionStateHandshakeDone = SessionState(4) // 握手完成 SessionStateHandshakeSuccess = SessionState(4) // 握手完成
SessionStateWait = SessionState(5) // 位于等待队列中 SessionStateWaiting = SessionState(5) // 位于等待队列中
SessionStateTransferring = SessionState(6) // 推拉流中 SessionStateTransferring = SessionState(6) // 推拉流中
SessionStateClosed = SessionState(7) // 关闭状态 SessionStateClosed = SessionState(7) // 关闭状态
) )
@@ -75,6 +75,26 @@ func (p TransStreamProtocol) String() string {
panic(fmt.Sprintf("unknown stream protocol %d", p)) panic(fmt.Sprintf("unknown stream protocol %d", p))
} }
func (s SessionState) String() string {
if SessionStateCreated == s {
return "create"
} else if SessionStateHandshaking == s {
return "handshaking"
} else if SessionStateHandshakeFailure == s {
return "handshake failure"
} else if SessionStateHandshakeSuccess == s {
return "handshake success"
} else if SessionStateWaiting == s {
return "waiting"
} else if SessionStateTransferring == s {
return "transferring"
} else if SessionStateClosed == s {
return "closed"
}
panic(fmt.Sprintf("unknown session state %d", s))
}
func Path2SourceId(path string, suffix string) (string, error) { func Path2SourceId(path string, suffix string) (string, error) {
source := strings.TrimSpace(path) source := strings.TrimSpace(path)
if strings.HasPrefix(source, "/") { if strings.HasPrefix(source, "/") {