推拉异常时关闭远程连接,发送首个音视频包采用绝对时间戳

This commit is contained in:
dexter
2023-01-18 21:14:54 +08:00
parent a0b55d5e1b
commit 95125b5560
3 changed files with 37 additions and 41 deletions

View File

@@ -44,6 +44,11 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
RTMPPlugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err)) RTMPPlugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err))
return nil, err return nil, err
} }
defer func() {
if err != nil || client == nil {
conn.Close()
}
}()
client = &NetConnection{ client = &NetConnection{
Conn: conn, Conn: conn,
Reader: bufio.NewReader(conn), Reader: bufio.NewReader(conn),
@@ -61,7 +66,10 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
} }
client.appName = ps[1] client.appName = ps[1]
err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize)) err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize))
client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{ if err != nil {
return
}
err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{
CommandMessage{"connect", 1}, CommandMessage{"connect", 1},
map[string]any{ map[string]any{
"app": client.appName, "app": client.appName,
@@ -71,6 +79,9 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
}, },
nil, nil,
}) })
if err != nil {
return
}
for { for {
msg, err := client.RecvMessage() msg, err := client.RecvMessage()
if err != nil { if err != nil {
@@ -98,8 +109,10 @@ type RTMPPusher struct {
} }
func (pusher *RTMPPusher) Connect() (err error) { func (pusher *RTMPPusher) Connect() (err error) {
pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL) if pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL); err == nil {
RTMPPlugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL)) pusher.SetIO(pusher.NetConnection.Conn)
RTMPPlugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL))
}
return return
} }
@@ -151,15 +164,18 @@ type RTMPPuller struct {
} }
func (puller *RTMPPuller) Connect() (err error) { func (puller *RTMPPuller) Connect() (err error) {
puller.NetConnection, err = NewRTMPClient(puller.RemoteURL) if puller.NetConnection, err = NewRTMPClient(puller.RemoteURL); err == nil {
RTMPPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL)) puller.SetIO(puller.NetConnection.Conn)
RTMPPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
}
return return
} }
func (puller *RTMPPuller) Pull() (err error) { func (puller *RTMPPuller) Pull() (err error) {
puller.absTs = make(map[uint32]uint32) puller.absTs = make(map[uint32]uint32)
puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) defer puller.Stop()
for { err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for err == nil {
msg, err := puller.RecvMessage() msg, err := puller.RecvMessage()
if err != nil { if err != nil {
break break

View File

@@ -12,6 +12,8 @@ import (
type RTMPSender struct { type RTMPSender struct {
Subscriber Subscriber
NetStream NetStream
firstAudioSent bool
firstVideoSent bool
} }
func (rtmp *RTMPSender) OnEvent(event any) { func (rtmp *RTMPSender) OnEvent(event any) {
@@ -25,9 +27,19 @@ func (rtmp *RTMPSender) OnEvent(event any) {
case VideoDeConf: case VideoDeConf:
rtmp.sendAVMessage(0, v.AVCC, false, true) rtmp.sendAVMessage(0, v.AVCC, false, true)
case *AudioFrame: case *AudioFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) if rtmp.firstAudioSent {
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false)
} else {
rtmp.firstAudioSent = true
rtmp.sendAVMessage(v.AbsTime, v.AVCC, true, true)
}
case *VideoFrame: case *VideoFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false) if rtmp.firstVideoSent {
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false)
} else {
rtmp.firstVideoSent = true
rtmp.sendAVMessage(v.AbsTime, v.AVCC, false, true)
}
default: default:
rtmp.Subscriber.OnEvent(event) rtmp.Subscriber.OnEvent(event)
} }

View File

@@ -7,7 +7,6 @@ import (
"io" "io"
"net" "net"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -43,37 +42,6 @@ const (
SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message" SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message"
) )
func newConnectResponseMessageData(objectEncoding float64) (amfobj map[string]any) {
amfobj = make(map[string]any)
amfobj["fmsVer"] = "monibuca/" + Engine.Version
amfobj["capabilities"] = 31
amfobj["mode"] = 1
amfobj["Author"] = "dexter"
amfobj["level"] = Level_Status
amfobj["code"] = NetConnection_Connect_Success
amfobj["objectEncoding"] = uint64(objectEncoding)
return
}
func newPublishResponseMessageData(streamid uint32, code, level string) (amfobj map[string]any) {
amfobj = make(map[string]any)
amfobj["code"] = code
amfobj["level"] = level
amfobj["streamid"] = streamid
return
}
func newPlayResponseMessageData(streamid uint32, code, level string) (amfobj map[string]any) {
amfobj = make(map[string]any)
amfobj["code"] = code
amfobj["level"] = level
amfobj["streamid"] = streamid
return
}
type NetConnection struct { type NetConnection struct {
*bufio.Reader `json:"-"` *bufio.Reader `json:"-"`
net.Conn `json:"-"` net.Conn `json:"-"`