实现client逻辑

This commit is contained in:
dexter
2022-02-13 17:12:29 +08:00
parent 9109fc1744
commit b46cf00f65
8 changed files with 343 additions and 97 deletions

View File

@@ -7,7 +7,6 @@ import (
"sync/atomic"
"github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/util"
)
@@ -31,28 +30,12 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
plugin.Error("handshake", err)
return
}
var rec_audio, rec_video func(*Chunk)
for {
if msg, err := nc.RecvMessage(); err == nil {
if msg.MessageLength <= 0 {
continue
}
switch msg.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE:
nc.readChunkSize = int(msg.MsgData.(Uint32Message))
case RTMP_MSG_ABORT:
delete(nc.incompleteRtmpBody, uint32(msg.MsgData.(Uint32Message)))
case RTMP_MSG_ACK, RTMP_MSG_EDGE:
case RTMP_MSG_USER_CONTROL:
if _, ok := msg.MsgData.(*PingRequestMessage); ok {
nc.SendUserControl(RTMP_USER_PING_RESPONSE)
}
case RTMP_MSG_ACK_SIZE:
nc.bandwidth = uint32(msg.MsgData.(Uint32Message))
case RTMP_MSG_BANDWIDTH:
m := msg.MsgData.(*SetPeerBandwidthMessage)
nc.bandwidth = m.AcknowledgementWindowsize
case RTMP_MSG_AMF0_COMMAND:
if msg.MsgData == nil {
break
@@ -88,34 +71,9 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
case "publish":
pm := msg.MsgData.(*PublishMessage)
if nc.Publish(nc.appName+"/"+pm.PublishingName, &nc, config.Publish) {
absTs := make(map[uint32]uint32)
vt := nc.Stream.NewVideoTrack()
at := nc.Stream.NewAudioTrack()
rec_audio = func(msg *Chunk) {
plugin.Tracef("rec_audio chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
at.WriteAVCC(absTs[msg.ChunkStreamID], msg.Body)
}
rec_video = func(msg *Chunk) {
plugin.Tracef("rev_video chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp)
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
vt.WriteAVCC(absTs[msg.ChunkStreamID], msg.Body)
}
var puber engine.Publisher
if puber.Publish(nc.appName+"/"+pm.PublishingName, &nc, config.Publish) {
nc.MediaReceiver = NewMediaReceiver(&puber)
nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
@@ -124,39 +82,21 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + pm.StreamName
subscriber := engine.Subscriber{
subscriber := &engine.Subscriber{
Type: "RTMP",
ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID),
}
if subscriber.Subscribe(streamPath, config.Subscribe) {
nc.subscribers[nc.streamID] = &subscriber
nc.subscribers[nc.streamID] = subscriber
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED)
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
vt, at := subscriber.WaitVideoTrack(), subscriber.WaitAudioTrack()
if vt != nil {
frame := vt.DecoderConfiguration
err = nc.sendAVMessage(0, net.Buffers(frame.AVCC), false, true)
subscriber.OnVideo = func(frame *engine.VideoFrame) error {
return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false)
}
}
if at != nil {
subscriber.OnAudio = func(frame *engine.AudioFrame) (err error) {
if at.CodecID == codec.CodecID_AAC {
frame := at.DecoderConfiguration
err = nc.sendAVMessage(0, net.Buffers{frame.AVCC}, true, true)
} else {
err = nc.sendAVMessage(0, frame.AVCC, true, true)
}
subscriber.OnAudio = func(frame *engine.AudioFrame) error {
return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false)
}
return
}
}
go subscriber.Play(at, vt)
go func() {
SendMedia(&nc, subscriber)
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Stop, Level_Status))
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Complete, Level_Status))
}()
} else {
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Failed, Level_Error))
}
@@ -179,9 +119,9 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
}
case RTMP_MSG_AUDIO:
rec_audio(msg)
nc.ReceiveAudio(msg)
case RTMP_MSG_VIDEO:
rec_video(msg)
nc.ReceiveVideo(msg)
}
} else {
plugin.Error(err)