增加对releaseStream的支持

This commit is contained in:
langhuihui
2020-12-09 20:29:46 +08:00
parent fb17dd8e87
commit f7721e2f02
3 changed files with 43 additions and 2 deletions

23
msg.go
View File

@@ -266,12 +266,18 @@ func decodeCommandAMF0(chunk *Chunk) {
cmdMsg,
readNumber(amf),
}
case "deleteStream", "closeStream", "releaseStream":
case "deleteStream", "closeStream":
amf.readNull()
chunk.MsgData = &CURDStreamMessage{
cmdMsg,
uint32(readNumber(amf)),
}
case "releaseStream":
amf.readNull()
chunk.MsgData = &ReleaseStreamMessage{
cmdMsg,
readString(amf),
}
case "receiveAudio", "receiveVideo":
amf.readNull()
chunk.MsgData = &ReceiveAVMessage{
@@ -329,6 +335,13 @@ type Commander interface {
func (cmd *CommandMessage) GetCommand() *CommandMessage {
return cmd
}
func (msg *CommandMessage) Encode() (b []byte) {
amf := newAMFEncoder()
amf.writeString(msg.CommandName)
amf.writeNumber(float64(msg.TransactionId))
amf.writeNull()
return amf.Bytes()
}
// Protocol control message 1.
// Set Chunk Size, is used to notify the peer of a new maximum chunk size
@@ -528,6 +541,14 @@ type CURDStreamMessage struct {
func (msg *CURDStreamMessage) Encode0() {
}
type ReleaseStreamMessage struct {
CommandMessage
StreamName string
}
func (msg *ReleaseStreamMessage) Encode0() {
}
// Receive Audio Message
// NetStream sends the receiveAudio message to inform the server whether to send or not to send the audio to the client
type ReceiveAVMessage struct {

View File

@@ -322,6 +322,14 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
m.StreamID = streamID
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_UNPUBLISH_RESPONSE_MESSAGE:
data, ok := args.(AMFObjects)
if !ok {
errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
}
m := new(CommandMessage)
m.TransactionId = data["tid"].(uint64)
m.CommandName = "releaseStream" + data["level"].(string)
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_FULL_AUDIO_MESSAGE:
audio, ok := args.(*avformat.SendPacket)
if !ok {
@@ -439,7 +447,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
}
msgLen := int(chunkHead.MessageLength)
if !ok {
currentBody = make([]byte,0,msgLen)
currentBody = make([]byte, 0, msgLen)
conn.incompleteRtmpBody[ChunkStreamID] = currentBody
}

View File

@@ -177,6 +177,18 @@ func processRtmp(conn net.Conn) {
stream.Cancel()
delete(streams, cm.StreamId)
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0]
amfobj := newAMFObjects()
if s := FindStream(streamPath); s != nil {
amfobj["level"] = "_result"
s.Close()
} else {
amfobj["level"] = "_error"
}
amfobj["tid"] = cm.TransactionId
err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, &amfobj)
}
case RTMP_MSG_AUDIO:
// pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO)