diff --git a/msg.go b/msg.go index d24678c..3ed2991 100644 --- a/msg.go +++ b/msg.go @@ -266,12 +266,18 @@ func decodeCommandAMF0(chunk *Chunk) { cmdMsg, readNumber(amf), } - case "deleteStream", "closeStream", "releaseStream": + case "deleteStream", "closeStream": amf.readNull() chunk.MsgData = &CURDStreamMessage{ cmdMsg, uint32(readNumber(amf)), } + case "releaseStream": + amf.readNull() + chunk.MsgData = &ReleaseStreamMessage{ + cmdMsg, + readString(amf), + } case "receiveAudio", "receiveVideo": amf.readNull() chunk.MsgData = &ReceiveAVMessage{ @@ -329,6 +335,13 @@ type Commander interface { func (cmd *CommandMessage) GetCommand() *CommandMessage { return cmd } +func (msg *CommandMessage) Encode() (b []byte) { + amf := newAMFEncoder() + amf.writeString(msg.CommandName) + amf.writeNumber(float64(msg.TransactionId)) + amf.writeNull() + return amf.Bytes() +} // Protocol control message 1. // Set Chunk Size, is used to notify the peer of a new maximum chunk size @@ -528,6 +541,14 @@ type CURDStreamMessage struct { func (msg *CURDStreamMessage) Encode0() { } +type ReleaseStreamMessage struct { + CommandMessage + StreamName string +} + +func (msg *ReleaseStreamMessage) Encode0() { +} + // Receive Audio Message // NetStream sends the receiveAudio message to inform the server whether to send or not to send the audio to the client type ReceiveAVMessage struct { diff --git a/netConnection.go b/netConnection.go index 4f2e964..24ce158 100644 --- a/netConnection.go +++ b/netConnection.go @@ -322,6 +322,14 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { m.StreamID = streamID return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_UNPUBLISH_RESPONSE_MESSAGE: + data, ok := args.(AMFObjects) + if !ok { + errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") + } + m := new(CommandMessage) + m.TransactionId = data["tid"].(uint64) + m.CommandName = "releaseStream" + data["level"].(string) + return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_FULL_AUDIO_MESSAGE: audio, ok := args.(*avformat.SendPacket) if !ok { @@ -439,7 +447,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { } msgLen := int(chunkHead.MessageLength) if !ok { - currentBody = make([]byte,0,msgLen) + currentBody = make([]byte, 0, msgLen) conn.incompleteRtmpBody[ChunkStreamID] = currentBody } diff --git a/netStream.go b/netStream.go index eb9bebb..1315a21 100644 --- a/netStream.go +++ b/netStream.go @@ -177,6 +177,18 @@ func processRtmp(conn net.Conn) { stream.Cancel() delete(streams, cm.StreamId) } + case "releaseStream": + cm := msg.MsgData.(*ReleaseStreamMessage) + streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0] + amfobj := newAMFObjects() + if s := FindStream(streamPath); s != nil { + amfobj["level"] = "_result" + s.Close() + } else { + amfobj["level"] = "_error" + } + amfobj["tid"] = cm.TransactionId + err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, &amfobj) } case RTMP_MSG_AUDIO: // pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO)