初步调通

This commit is contained in:
dexter
2022-02-06 08:50:41 +08:00
parent a5e5d31fda
commit c1fedbc529
10 changed files with 608 additions and 1093 deletions

View File

@@ -5,43 +5,30 @@ import (
"fmt"
"log"
"net"
"strings"
"sync/atomic"
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/util"
)
var gstreamid = uint32(64)
func processRtmp(conn net.Conn) {
var stream *engine.Stream
streams := make(map[uint32]*engine.Subscriber)
defer func() {
conn.Close()
if stream != nil {
stream.Close()
}
for _, s := range streams {
s.Close()
}
}()
func (cfg *RTMPConfig) Process(conn *net.TCPConn) {
nc := NetConnection{
ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
TCPConn: conn,
Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32][]byte),
incompleteRtmpBody: make(map[uint32]util.Buffer),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
nextStreamID: func() uint32 {
return atomic.AddUint32(&gstreamid, 1)
},
tmpBuf: make([]byte, 4),
subscribers: make(map[uint32]*engine.Subscriber),
}
defer nc.Close()
/* Handshake */
if utils.MayBeError(Handshake(nc.ReadWriter)) {
return
}
if utils.MayBeError(nc.OnConnect()) {
if util.MayBeError(nc.Handshake()) {
return
}
var rec_audio, rec_video func(*Chunk)
@@ -52,27 +39,59 @@ func processRtmp(conn net.Conn) {
continue
}
switch msg.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE:
nc.readChunkSize = int(msg.MsgData.(Uint32Message))
case RTMP_MSG_ABORT:
delete(nc.incompleteRtmpBody, uint32(msg.MsgData.(Uint32Message)))
case RTMP_MSG_ACK, RTMP_MSG_EDGE:
case RTMP_MSG_USER_CONTROL:
if _, ok := msg.MsgData.(*PingRequestMessage); ok {
nc.SendUserControl(RTMP_USER_PING_RESPONSE)
}
case RTMP_MSG_ACK_SIZE:
nc.bandwidth = uint32(msg.MsgData.(Uint32Message))
case RTMP_MSG_BANDWIDTH:
m := msg.MsgData.(*SetPeerBandwidthMessage)
nc.bandwidth = m.AcknowledgementWindowsize
case RTMP_MSG_AMF0_COMMAND:
if msg.MsgData == nil {
break
}
cmd := msg.MsgData.(Commander).GetCommand()
util.Println(cmd.CommandName)
switch cmd.CommandName {
case "connect":
connect := msg.MsgData.(*CallMessage)
app := connect.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := connect.Object["objectEncoding"] // AMF编码方法
if objectEncoding != nil {
nc.objectEncoding = objectEncoding.(float64)
}
nc.appName = app.(string)
log.Printf("app:%v,objectEncoding:%v", app, objectEncoding)
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
nc.writeChunkSize = config.ChunkSize
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
err = nc.SendMessage(RTMP_MSG_BANDWIDTH, &SetPeerBandwidthMessage{
AcknowledgementWindowsize: uint32(512 << 10),
LimitType: byte(2),
})
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_CONNECT_RESPONSE_MESSAGE, nc.objectEncoding)
case "createStream":
nc.streamID = nc.nextStreamID()
nc.streamID = atomic.AddUint32(&gstreamid, 1)
log.Println("createStream:", nc.streamID)
err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
if utils.MayBeError(err) {
err = nc.SendCommand(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
if util.MayBeError(err) {
return
}
case "publish":
pm := msg.MsgData.(*PublishMessage)
streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0]
stream = &engine.Stream{Type: "RTMP", StreamPath: streamPath}
if stream.Publish() {
nc.Config = config.Publish
if nc.Publish(nc.appName+"/"+pm.PublishingName, &nc) {
absTs := make(map[uint32]uint32)
vt := stream.NewVideoTrack(0)
at := stream.NewAudioTrack(0)
vt := nc.Stream.NewVideoTrack()
at := nc.Stream.NewAudioTrack()
rec_audio = func(msg *Chunk) {
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
@@ -82,7 +101,7 @@ func processRtmp(conn net.Conn) {
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
at.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
at.WriteAVCC(absTs[msg.ChunkStreamID], msg.Body)
}
rec_video = func(msg *Chunk) {
if msg.ChunkType == 0 {
@@ -93,99 +112,79 @@ func processRtmp(conn net.Conn) {
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
vt.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
vt.WriteAVCC(absTs[msg.ChunkStreamID], msg.Body)
}
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
err = nc.SendCommand(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
}
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0]
nc.writeChunkSize = config.ChunkSize
streamPath := nc.appName + "/" + pm.StreamName
subscriber := engine.Subscriber{
Type: "RTMP",
ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID),
}
if err = subscriber.Subscribe(streamPath); err == nil {
streams[nc.streamID] = &subscriber
err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize))
err = nc.SendMessage(SEND_STREAM_IS_RECORDED_MESSAGE, nil)
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
vt, at := subscriber.WaitVideoTrack(), subscriber.WaitAudioTrack()
if subscriber.Subscribe(streamPath, config.Subscribe) {
nc.subscribers[nc.streamID] = &subscriber
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED)
err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN)
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
vt, at := subscriber.WaitVideoTrack("h264", "h265"), subscriber.WaitAudioTrack("aac", "pcma", "pcmu")
if vt != nil {
var lastTimeStamp uint32
var getDeltaTime func(uint32) uint32
getDeltaTime = func(ts uint32) (t uint32) {
lastTimeStamp = ts
getDeltaTime = func(ts uint32) (t uint32) {
t = ts - lastTimeStamp
lastTimeStamp = ts
return
}
return
}
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.ExtraData.Payload})
subscriber.OnVideo = func(ts uint32, pack *engine.VideoPack) {
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Timestamp: 0, Payload: pack.Payload})
subscriber.OnVideo = func(ts uint32, pack *engine.VideoPack) {
err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: getDeltaTime(ts), Payload: pack.Payload})
}
frame := vt.DecoderConfiguration
err = nc.sendAVMessage(0, frame.AVCC, false, true)
subscriber.OnVideo = func(frame *engine.VideoFrame) bool {
err = nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false)
return err == nil
}
}
if at != nil {
var lastTimeStamp uint32
var getDeltaTime func(uint32) uint32
getDeltaTime = func(ts uint32) (t uint32) {
lastTimeStamp = ts
getDeltaTime = func(ts uint32) (t uint32) {
t = ts - lastTimeStamp
lastTimeStamp = ts
return
subscriber.OnAudio = func(frame *engine.AudioFrame) bool {
if at.CodecID == codec.CodecID_AAC {
frame := at.DecoderConfiguration
err = nc.sendAVMessage(0, frame.AVCC, true, true)
} else {
err = nc.sendAVMessage(0, frame.AVCC, true, true)
}
return
}
subscriber.OnAudio = func(ts uint32, pack *engine.AudioPack) {
if at.CodecID == 10 {
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.ExtraData})
subscriber.OnAudio = func(frame *engine.AudioFrame) bool {
err = nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false)
return err == nil
}
subscriber.OnAudio = func(ts uint32, pack *engine.AudioPack) {
err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: getDeltaTime(ts), Payload: pack.Payload})
}
subscriber.OnAudio(ts, pack)
return err == nil
}
}
go subscriber.Play(at, vt)
} else {
err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Failed, Level_Error))
}
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := streams[cm.StreamId]; ok {
if stream, ok := nc.subscribers[cm.StreamId]; ok {
stream.Close()
delete(streams, cm.StreamId)
delete(nc.subscribers, cm.StreamId)
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0]
amfobj := newAMFObjects()
if s := engine.FindStream(streamPath); s != nil {
amfobj := make(AMFObject)
if nc.Stream != nil && nc.Stream.AppName == nc.appName && nc.Stream.StreamName == cm.StreamName {
amfobj["level"] = "_result"
s.Close()
nc.Stream.UnPublish()
} else {
amfobj["level"] = "_error"
}
amfobj["tid"] = cm.TransactionId
err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
}
case RTMP_MSG_AUDIO:
rec_audio(msg)
case RTMP_MSG_VIDEO:
rec_video(msg)
}
msg.Recycle()
} else {
util.Println(err)
return
}
}