This commit is contained in:
dexter
2022-03-13 00:10:32 +08:00
parent 618e989a1b
commit 005ec953d8
4 changed files with 62 additions and 61 deletions

View File

@@ -88,6 +88,7 @@ func (pusher *RTMPPusher) Connect() (err error) {
func (pusher *RTMPPusher) Push() {
pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
defer pusher.Stop()
for {
msg, err := pusher.RecvMessage()
if err != nil {
@@ -97,24 +98,27 @@ func (pusher *RTMPPusher) Push() {
case RTMP_MSG_AMF0_COMMAND:
cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName {
case "_result":
case Response_Result, Response_OnStatus:
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
pusher.StreamID = response.StreamId
URL, _ := url.Parse(pusher.RemoteURL)
ps := strings.Split(URL.Path, "/")
pusher.Args = URL.Query()
m := &PublishMessage{
CURDStreamMessage{
CommandMessage{
"publish",
0,
1,
},
response.StreamId,
},
pusher.Stream.StreamName,
ps[len(ps)-1],
"live",
}
pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
} else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok {
if response.Infomation["code"] == "NetStream.Publish.Start" {
if response.Infomation["code"] == NetStream_Publish_Start {
go pusher.PlayBlock(pusher)
} else {
return
}
@@ -155,6 +159,7 @@ func (puller *RTMPPuller) Pull() {
if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
puller.StreamID = response.StreamId
m := &PlayMessage{}
m.StreamId = response.StreamId
m.TransactionId = 1
m.CommandMessage.CommandName = "play"
URL, _ := url.Parse(puller.RemoteURL)

View File

@@ -19,9 +19,9 @@ func (rtmp *RTMPSender) OnEvent(event any) {
case VideoDeConf:
rtmp.sendAVMessage(0, v.AVCC, false, true)
case *AudioFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false)
rtmp.sendAVMessage(v.DeltaTime, v.GetAVCC(), true, false)
case *VideoFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false)
rtmp.sendAVMessage(v.DeltaTime, v.GetAVCC(), false, false)
default:
rtmp.Subscriber.OnEvent(event)
}

11
msg.go
View File

@@ -217,7 +217,10 @@ func decodeCommandAMF0(chunk *Chunk) {
case "play":
amf.readNull()
m := &PlayMessage{
cmdMsg,
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.readString(),
float64(-2),
float64(-1),
@@ -292,7 +295,7 @@ func decodeCommandAMF0(chunk *Chunk) {
cmdMsg,
amf.readBool(),
}
case "_result", "_error", "onStatus":
case Response_Result, Response_Error, Response_OnStatus:
if cmdMsg.TransactionId == 2 {
chunk.MsgData = &ResponseCreateStreamMessage{
cmdMsg, amf.readObject(), uint32(amf.readNumber()),
@@ -481,7 +484,7 @@ func (msg *CreateStreamMessage) Encode3() {
// Play Message
// The client sends this command to the server to play a stream. A playlist can also be created using this command multiple times
type PlayMessage struct {
CommandMessage
CURDStreamMessage
StreamName string
Start float64
Duration float64
@@ -585,7 +588,7 @@ type PublishMessage struct {
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
// “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode0() []byte {
func (msg *PublishMessage) Encode() []byte {
var amf AMF
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))

View File

@@ -35,9 +35,10 @@ func (s *RTMPSubscriber) OnEvent(event any) {
s.RTMPSender.OnEvent(event)
}
func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
defer conn.Close()
senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*RTMPReceiver)
nc := NetConnection{
nc := &NetConnection{
TCPConn: conn,
Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
@@ -48,10 +49,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
tmpBuf: make([]byte, 4),
}
ctx, cancel := context.WithCancel(engine.Engine)
defer func() {
nc.Close()
cancel() //终止所有发布者和订阅者
}()
defer cancel()
/* Handshake */
if err := nc.Handshake(); err != nil {
plugin.Error("handshake", zap.Error(err))
@@ -69,11 +67,10 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
cmd := msg.MsgData.(Commander).GetCommand()
plugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
switch cmd.CommandName {
case "connect":
connect := msg.MsgData.(*CallMessage)
app := connect.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := connect.Object["objectEncoding"] // AMF编码方法
switch cmd := msg.MsgData.(type) {
case *CallMessage: //connect
app := cmd.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
if objectEncoding != nil {
nc.objectEncoding = objectEncoding.(float64)
}
@@ -102,66 +99,63 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
"objectEncoding": nc.objectEncoding,
}
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case "createStream":
case *CommandMessage: // "createStream"
streamId := atomic.AddUint32(&gstreamid, 1)
plugin.Info("createStream:", zap.Uint32("streamId", streamId))
nc.ResponseCreateStream(cmd.TransactionId, streamId)
case "publish":
pm := msg.MsgData.(*PublishMessage)
case *CURDStreamMessage:
if stream, ok := senders[cmd.StreamId]; ok {
stream.Stop()
delete(senders, cmd.StreamId)
}
case *ReleaseStreamMessage:
m := &CommandMessage{
CommandName: "releaseStream_error",
TransactionId: cmd.TransactionId,
}
s := engine.Streams.Get(nc.appName + "/" + cmd.StreamName)
if s != nil && s.Publisher != nil {
if p, ok := s.Publisher.(*RTMPReceiver); ok {
m.CommandName = "releaseStream_result"
p.Stop()
delete(receivers, p.StreamID)
}
}
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case *PublishMessage:
receiver := &RTMPReceiver{
NetStream: NetStream{
NetConnection: &nc,
StreamID: pm.StreamId,
NetConnection: nc,
StreamID: cmd.StreamId,
},
}
receiver.SetParentCtx(ctx)
if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) == nil {
receivers[receiver.StreamID] = receiver
if plugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
receivers[cmd.StreamId] = receiver
receiver.absTs = make(map[uint32]uint32)
receiver.Begin()
err = receiver.Response(pm.TransactionId, NetStream_Publish_Start, Level_Status)
err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status)
} else {
err = receiver.Response(pm.TransactionId, NetStream_Publish_BadName, Level_Error)
err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
}
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + pm.StreamName
case *PlayMessage:
streamPath := nc.appName + "/" + cmd.StreamName
sender := &RTMPSubscriber{}
sender.NetStream = NetStream{
&nc,
msg.MessageStreamID,
nc,
cmd.StreamId,
}
sender.SetParentCtx(ctx)
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
if plugin.Subscribe(streamPath, sender) == nil {
if plugin.Subscribe(streamPath, sender) != nil {
sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
} else {
senders[sender.StreamID] = sender
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED, msg.MessageStreamID)
sender.Begin()
sender.Response(pm.TransactionId, NetStream_Play_Reset, Level_Status)
sender.Response(pm.TransactionId, NetStream_Play_Start, Level_Status)
sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status)
sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status)
go sender.PlayBlock(sender)
} else {
sender.Response(pm.TransactionId, NetStream_Play_Failed, Level_Error)
}
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := senders[cm.StreamId]; ok {
stream.Stop()
delete(senders, cm.StreamId)
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
m := &CommandMessage{
CommandName: "releaseStream",
TransactionId: cm.TransactionId,
}
if p, ok := receivers[msg.MessageStreamID]; ok {
m.CommandName += "_result"
p.Stop()
} else {
m.CommandName += "_error"
}
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
case RTMP_MSG_AUDIO:
if r, ok := receivers[msg.MessageStreamID]; ok {
@@ -173,7 +167,6 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
}
} else {
plugin.Error("receive", zap.Error(err))
return
}
}