This commit is contained in:
langhuihui
2024-03-22 17:51:15 +08:00
parent 0efbe886c8
commit 5826caddde
18 changed files with 2833 additions and 40 deletions

View File

@@ -13,17 +13,17 @@ import (
type AVFrame struct {
DataFrame
Timestamp time.Duration // 绝对时间戳
Wrap []IAVFrame `json:"-" yaml:"-"` // 封装格式
}
type DataFrame struct {
DeltaTime uint32 // 相对上一帧时间戳,毫秒
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Timestamp time.Duration // 绝对时间戳
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
CanRead bool `json:"-" yaml:"-"` // 是否可读取
readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
Raw any `json:"-" yaml:"-"` // 裸格式
Wrap []IAVFrame `json:"-" yaml:"-"` // 封装格式
DeltaTime uint32 // 相对上一帧时间戳,毫秒
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
CanRead bool `json:"-" yaml:"-"` // 是否可读取
readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
Raw any `json:"-" yaml:"-"` // 裸格式
sync.Cond `json:"-" yaml:"-"`
}
@@ -113,6 +113,7 @@ type IAVFrame interface {
DecodeConfig(*AVTrack) error
ToRaw(*AVTrack) (any, error)
FromRaw(*AVTrack, any) error
Recycle()
}
type Nalu [][]byte

View File

@@ -129,3 +129,9 @@ func (buffers *Buffers) ReadBE(n int) (num int, err error) {
}
return
}
func (buffers *Buffers) ToBytes() []byte {
ret := make([]byte, buffers.Length)
buffers.Read(ret)
return ret
}

View File

@@ -1,19 +1,40 @@
package util
import "net"
type Pool[T any] struct {
pool []*T
pool []T
}
func (p *Pool[T]) Get() *T {
func (p *Pool[T]) Get() T {
l := len(p.pool)
if l == 0 {
return new(T)
var t T
return t
}
t := p.pool[l-1]
p.pool = p.pool[:l-1]
return t
}
func (p *Pool[T]) Put(t *T) {
func (p *Pool[T]) Put(t T) {
p.pool = append(p.pool, t)
}
type IPool[T any] interface {
Get() T
Put(T)
}
type RecyclebleMemory struct {
IPool[[]byte]
Data net.Buffers
}
func (r *RecyclebleMemory) Recycle() {
if r.IPool != nil {
for _, b := range r.Data {
r.Put(b)
}
}
}

View File

@@ -1,10 +1,7 @@
package demo
import (
"net"
"m7s.live/m7s/v5"
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type DemoPlugin struct {
@@ -12,14 +9,14 @@ type DemoPlugin struct {
}
func (p *DemoPlugin) OnInit() {
puber, err := p.Publish("live/demo")
if err != nil {
return
}
puber.WriteVideo(&rtmp.RTMPVideo{
Timestamp: 0,
Buffers: net.Buffers{[]byte{0x17, 0x00, 0x67, 0x42, 0x00, 0x0a, 0x8f, 0x14, 0x01, 0x00, 0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x68, 0xce, 0x3c, 0x80}},
})
// puber, err := p.Publish("live/demo")
// if err != nil {
// return
// }
// puber.WriteVideo(&rtmp.RTMPVideo{
// Timestamp: 0,
// Buffers: net.Buffers{[]byte{0x17, 0x00, 0x67, 0x42, 0x00, 0x0a, 0x8f, 0x14, 0x01, 0x00, 0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x00, 0x01, 0x68, 0xce, 0x3c, 0x80}},
// })
}
func (p *DemoPlugin) OnStopPublish(puber *m7s.Publisher, err error) {

View File

@@ -1,9 +1,17 @@
package rtmp
import "m7s.live/m7s/v5"
import (
"io"
"net"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/rtmp/pkg"
)
type RTMPPlugin struct {
m7s.Plugin
ChunkSize int
KeepAlive bool
}
func (p *RTMPPlugin) OnInit() {
@@ -14,8 +22,166 @@ func (p *RTMPPlugin) OnStopPublish(puber *m7s.Publisher, err error) {
}
func (p *RTMPPlugin) OnEvent(event any) {
// ...
}
var _ = m7s.InstallPlugin[*RTMPPlugin]()
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
defer conn.Close()
// logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String()))
// senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*pkg.RTMPReceiver)
var err error
// logger.Info("conn")
defer func() {
// ze := zap.Error(err)
// logger.Info("conn close", ze)
// for _, sender := range senders {
// sender.Stop(ze)
// }
// for _, receiver := range receivers {
// receiver.Stop(ze)
// }
}()
nc := pkg.NewNetConnection(conn)
// ctx, cancel := context.WithCancel(p)
// defer cancel()
/* Handshake */
if err = nc.Handshake(); err != nil {
// logger.Error("handshake", zap.Error(err))
return
}
var msg *pkg.Chunk
var gstreamid uint32
for {
if msg, err = nc.RecvMessage(); err == nil {
if msg.MessageLength <= 0 {
continue
}
switch msg.MessageTypeID {
case pkg.RTMP_MSG_AMF0_COMMAND:
if msg.MsgData == nil {
break
}
// cmd := msg.MsgData.(pkg.Commander).GetCommand()
// logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
switch cmd := msg.MsgData.(type) {
case *pkg.CallMessage: //connect
app := cmd.Object["app"] // 客户端要连接到的服务应用名
objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
switch v := objectEncoding.(type) {
case float64:
nc.ObjectEncoding = v
default:
nc.ObjectEncoding = 0
}
nc.AppName = app.(string)
// logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
err = nc.SendMessage(pkg.RTMP_MSG_ACK_SIZE, pkg.Uint32Message(512<<10))
nc.WriteChunkSize = p.ChunkSize
err = nc.SendMessage(pkg.RTMP_MSG_CHUNK_SIZE, pkg.Uint32Message(p.ChunkSize))
err = nc.SendMessage(pkg.RTMP_MSG_BANDWIDTH, &pkg.SetPeerBandwidthMessage{
AcknowledgementWindowsize: uint32(512 << 10),
LimitType: byte(2),
})
err = nc.SendStreamID(pkg.RTMP_USER_STREAM_BEGIN, 0)
m := new(pkg.ResponseConnectMessage)
m.CommandName = pkg.Response_Result
m.TransactionId = 1
m.Properties = map[string]any{
"fmsVer": "monibuca/" + m7s.Version,
"capabilities": 31,
"mode": 1,
"Author": "dexter",
}
m.Infomation = map[string]any{
"level": pkg.Level_Status,
"code": pkg.NetConnection_Connect_Success,
"objectEncoding": nc.ObjectEncoding,
}
err = nc.SendMessage(pkg.RTMP_MSG_AMF0_COMMAND, m)
case *pkg.CommandMessage: // "createStream"
gstreamid++
// logger.Info("createStream:", zap.Uint32("streamId", gstreamid))
nc.ResponseCreateStream(cmd.TransactionId, gstreamid)
case *pkg.CURDStreamMessage:
// if stream, ok := receivers[cmd.StreamId]; ok {
// stream.Stop()
// delete(senders, cmd.StreamId)
// }
case *pkg.ReleaseStreamMessage:
// m := &CommandMessage{
// CommandName: "releaseStream_error",
// TransactionId: cmd.TransactionId,
// }
// s := engine.Streams.Get(nc.appName + "/" + cmd.StreamName)
// if s != nil && s.Publisher != nil {
// if p, ok := s.Publisher.(*RTMPReceiver); ok {
// // m.CommandName = "releaseStream_result"
// p.Stop()
// delete(receivers, p.StreamID)
// }
// }
// err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case *pkg.PublishMessage:
receiver := &pkg.RTMPReceiver{
NetStream: pkg.NetStream{
NetConnection: nc,
StreamID: cmd.StreamId,
},
}
// receiver.SetParentCtx(ctx)
if !p.KeepAlive {
// receiver.SetIO(conn)
}
receiver.Publisher, err = p.Publish(nc.AppName + "/" + cmd.PublishingName)
if err != nil {
delete(receivers, cmd.StreamId)
err = receiver.Response(cmd.TransactionId, pkg.NetStream_Publish_BadName, pkg.Level_Error)
} else {
receivers[cmd.StreamId] = receiver
receiver.Begin()
err = receiver.Response(cmd.TransactionId, pkg.NetStream_Publish_Start, pkg.Level_Status)
}
case *pkg.PlayMessage:
// streamPath := nc.appName + "/" + cmd.StreamName
// sender := &RTMPSubscriber{}
// sender.NetStream = NetStream{
// nc,
// cmd.StreamId,
// }
// sender.SetParentCtx(ctx)
// if !config.KeepAlive {
// sender.SetIO(conn)
// }
// sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
// if RTMPPlugin.Subscribe(streamPath, sender) != nil {
// sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
// } else {
// senders[sender.StreamID] = sender
// sender.Begin()
// sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status)
// sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status)
// go sender.PlayRaw()
// }
}
case pkg.RTMP_MSG_AUDIO:
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveAudio(msg)
} else {
// logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID))
}
case pkg.RTMP_MSG_VIDEO:
if r, ok := receivers[msg.MessageStreamID]; ok {
r.ReceiveVideo(msg)
} else {
// logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID))
}
}
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
// logger.Info("rtmp client closed")
return
} else {
// logger.Warn("ReadMessage", zap.Error(err))
return
}
}
}

319
plugin/rtmp/pkg/amf.go Normal file
View File

@@ -0,0 +1,319 @@
package pkg
import (
"fmt"
"io"
"reflect"
"m7s.live/m7s/v5/pkg/util"
)
// Action Message Format -- AMF 0
// Action Message Format -- AMF 3
// http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
// http://wwwimages.adobe.com/www.adobe.com/content/dam/Adobe/en/devnet/amf/pdf/amf-file-format-spec.pdf
// AMF Object == AMF Object Type(1 byte) + AMF Object Value
//
// AMF Object Value :
// AMF0_STRING : 2 bytes(datasize,记录string的长度) + data(string)
// AMF0_OBJECT : AMF0_STRING + AMF Object
// AMF0_NULL : 0 byte
// AMF0_NUMBER : 8 bytes
// AMF0_DATE : 10 bytes
// AMF0_BOOLEAN : 1 byte
// AMF0_ECMA_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF0_OBJECT
// AMF0_STRICT_ARRAY : 4 bytes(arraysize,记录数组的长度) + AMF Object
// 实际测试时,AMF0_ECMA_ARRAY数据如下:
// 8 0 0 0 13 0 8 100 117 114 97 116 105 111 110 0 0 0 0 0 0 0 0 0 0 5 119 105 100 116 104 0 64 158 0 0 0 0 0 0 0 6 104 101 105 103 104 116 0 64 144 224 0 0 0 0 0
// 8 0 0 0 13 | { 0 8 100 117 114 97 116 105 111 110 --- 0 0 0 0 0 0 0 0 0 } | { 0 5 119 105 100 116 104 --- 0 64 158 0 0 0 0 0 0 } | { 0 6 104 101 105 103 104 116 --- 0 64 144 224 0 0 0 0 0 } |...
// 13 | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | {AMF0_STRING --- AMF0_NUMBER} | ...
// 13 | {AMF0_OBJECT} | {AMF0_OBJECT} | {AMF0_OBJECT} | ...
// 13 | {duration --- 0} | {width --- 1920} | {height --- 1080} | ...
const (
AMF0_NUMBER = iota // 浮点数
AMF0_BOOLEAN
AMF0_STRING
AMF0_OBJECT
AMF0_MOVIECLIP
AMF0_NULL
AMF0_UNDEFINED
AMF0_REFERENCE
AMF0_ECMA_ARRAY
AMF0_END_OBJECT
AMF0_STRICT_ARRAY
AMF0_DATE
AMF0_LONG_STRING
AMF0_UNSUPPORTED
AMF0_RECORDSET
AMF0_XML_DOCUMENT
AMF0_TYPED_OBJECT
AMF0_AVMPLUS_OBJECT
)
var (
END_OBJ = []byte{0, 0, AMF0_END_OBJECT}
ObjectEnd = &struct{}{}
Undefined = &struct{}{}
)
type IAMF interface {
util.IBuffer
Unmarshal() (any, error)
Marshal(any) []byte
Marshals(...any) []byte
}
type EcmaArray map[string]any
type AMF struct {
util.Buffer
}
func ReadAMF[T string | float64 | bool | map[string]any](amf *AMF) (result T) {
value, err := amf.Unmarshal()
if err != nil {
return
}
result, _ = value.(T)
return
}
func (amf *AMF) ReadShortString() (result string) {
return ReadAMF[string](amf)
}
func (amf *AMF) ReadNumber() (result float64) {
return ReadAMF[float64](amf)
}
func (amf *AMF) ReadObject() (result map[string]any) {
return ReadAMF[map[string]any](amf)
}
func (amf *AMF) ReadBool() (result bool) {
return ReadAMF[bool](amf)
}
func (amf *AMF) readKey() (string, error) {
if !amf.CanReadN(2) {
return "", io.ErrUnexpectedEOF
}
l := int(amf.ReadUint16())
if !amf.CanReadN(l) {
return "", io.ErrUnexpectedEOF
}
return string(amf.ReadN(l)), nil
}
func (amf *AMF) readProperty(m map[string]any) (obj any, err error) {
var k string
var v any
if k, err = amf.readKey(); err == nil {
if v, err = amf.Unmarshal(); k == "" && v == ObjectEnd {
obj = m
} else if err == nil {
m[k] = v
}
}
return
}
func (amf *AMF) Unmarshal() (obj any, err error) {
if !amf.CanRead() {
return nil, io.ErrUnexpectedEOF
}
defer func(b util.Buffer) {
if err != nil {
amf.Buffer = b
}
}(amf.Buffer)
switch t := amf.ReadByte(); t {
case AMF0_NUMBER:
if !amf.CanReadN(8) {
return 0, io.ErrUnexpectedEOF
}
obj = amf.ReadFloat64()
case AMF0_BOOLEAN:
if !amf.CanRead() {
return false, io.ErrUnexpectedEOF
}
obj = amf.ReadByte() == 1
case AMF0_STRING:
obj, err = amf.readKey()
case AMF0_OBJECT:
m := make(map[string]any)
for err == nil && obj == nil {
obj, err = amf.readProperty(m)
}
case AMF0_NULL:
return nil, nil
case AMF0_UNDEFINED:
return Undefined, nil
case AMF0_ECMA_ARRAY:
size := amf.ReadUint32()
m := make(EcmaArray)
for i := uint32(0); i < size && err == nil && obj == nil; i++ {
obj, err = amf.readProperty(m)
}
case AMF0_END_OBJECT:
return ObjectEnd, nil
case AMF0_STRICT_ARRAY:
size := amf.ReadUint32()
var list []any
for i := uint32(0); i < size; i++ {
v, err := amf.Unmarshal()
if err != nil {
return nil, err
}
list = append(list, v)
}
obj = list
case AMF0_DATE:
if !amf.CanReadN(10) {
return 0, io.ErrUnexpectedEOF
}
obj = amf.ReadFloat64()
amf.ReadN(2)
case AMF0_LONG_STRING,
AMF0_XML_DOCUMENT:
if !amf.CanReadN(4) {
return "", io.ErrUnexpectedEOF
}
l := int(amf.ReadUint32())
if !amf.CanReadN(l) {
return "", io.ErrUnexpectedEOF
}
obj = string(amf.ReadN(l))
default:
err = fmt.Errorf("unsupported type:%d", t)
}
return
}
func (amf *AMF) writeProperty(key string, v any) {
amf.WriteUint16(uint16(len(key)))
amf.WriteString(key)
amf.Marshal(v)
}
func MarshalAMFs(v ...any) []byte {
var amf AMF
return amf.Marshals(v...)
}
func (amf *AMF) Marshals(v ...any) []byte {
for _, vv := range v {
amf.Marshal(vv)
}
return amf.Buffer
}
func (amf *AMF) Marshal(v any) []byte {
if v == nil {
amf.WriteByte(AMF0_NULL)
return amf.Buffer
}
switch vv := v.(type) {
case string:
if l := len(vv); l > 0xFFFF {
amf.WriteByte(AMF0_LONG_STRING)
amf.WriteUint32(uint32(l))
} else {
amf.WriteByte(AMF0_STRING)
amf.WriteUint16(uint16(l))
}
amf.WriteString(vv)
case float64, uint, float32, int, int16, int32, int64, uint16, uint32, uint64, uint8, int8:
amf.WriteByte(AMF0_NUMBER)
amf.WriteFloat64(ToFloat64(vv))
case bool:
amf.WriteByte(AMF0_BOOLEAN)
if vv {
amf.WriteByte(1)
} else {
amf.WriteByte(0)
}
case EcmaArray:
if vv == nil {
amf.WriteByte(AMF0_NULL)
return amf.Buffer
}
amf.WriteByte(AMF0_ECMA_ARRAY)
amf.WriteUint32(uint32(len(vv)))
for k, v := range vv {
amf.writeProperty(k, v)
}
amf.Write(END_OBJ)
case map[string]any:
if vv == nil {
amf.WriteByte(AMF0_NULL)
return amf.Buffer
}
amf.WriteByte(AMF0_OBJECT)
for k, v := range vv {
amf.writeProperty(k, v)
}
amf.Write(END_OBJ)
default:
v := reflect.ValueOf(vv)
if !v.IsValid() {
amf.WriteByte(AMF0_NULL)
return amf.Buffer
}
switch v.Kind() {
case reflect.Slice, reflect.Array:
amf.WriteByte(AMF0_STRICT_ARRAY)
size := v.Len()
amf.WriteUint32(uint32(size))
for i := 0; i < size; i++ {
amf.Marshal(v.Index(i).Interface())
}
amf.Write(END_OBJ)
case reflect.Ptr:
vv := reflect.Indirect(v)
if vv.Kind() == reflect.Struct {
amf.WriteByte(AMF0_OBJECT)
for i := 0; i < vv.NumField(); i++ {
amf.writeProperty(vv.Type().Field(i).Name, vv.Field(i).Interface())
}
amf.Write(END_OBJ)
}
default:
panic("amf Marshal faild")
}
}
return amf.Buffer
}
func ToFloat64(num any) float64 {
switch v := num.(type) {
case uint:
return float64(v)
case int:
return float64(v)
case uint8:
return float64(v)
case uint16:
return float64(v)
case uint32:
return float64(v)
case uint64:
return float64(v)
case int8:
return float64(v)
case int16:
return float64(v)
case int32:
return float64(v)
case int64:
return float64(v)
case float64:
return v
case float32:
return float64(v)
}
return 0
}

308
plugin/rtmp/pkg/amf3.go Normal file
View File

@@ -0,0 +1,308 @@
package pkg
import (
"errors"
"reflect"
"strconv"
"unicode"
)
const (
AMF3_UNDEFINED = iota
AMF3_NULL
AMF3_FALSE
AMF3_TRUE
AMF3_INTEGER
AMF3_DOUBLE
AMF3_STRING
AMF3_XML_DOC
AMF3_DATE
AMF3_ARRAY
AMF3_OBJECT
AMF3_XML
AMF3_BYTE_ARRAY
AMF3_VECTOR_INT
AMF3_VECTOR_UINT
AMF3_VECTOR_DOUBLE
AMF3_VECTOR_OBJECT
AMF3_DICTIONARY
)
type AMF3 struct {
AMF
scEnc map[string]int
scDec []string
ocEnc map[uintptr]int
ocDec []any
reservStruct bool
}
func (amf *AMF3) readString() (string, error) {
index, err := amf.readU29()
if err != nil {
return "", err
}
ret := ""
if (index & 0x01) == 0 {
ret = amf.scDec[int(index>>1)]
} else {
index >>= 1
ret = string(amf.ReadN(int(index)))
}
if ret != "" {
amf.scDec = append(amf.scDec, ret)
}
return ret, nil
}
func (amf *AMF3) Unmarshal() (obj any, err error) {
defer func() {
if e := recover(); e != nil {
err = errors.New("amf3 unmarshal error")
}
}()
switch amf.ReadByte() {
case AMF3_NULL:
return nil, nil
case AMF3_FALSE:
return false, nil
case AMF3_TRUE:
return true, nil
case AMF3_INTEGER:
return amf.readU29()
case AMF3_DOUBLE:
return amf.ReadFloat64(), nil
case AMF3_STRING:
return amf.readString()
case AMF3_OBJECT:
index, err := amf.readU29()
if err != nil {
return nil, err
}
if (index & 0x01) == 0 {
return amf.ocDec[int(index>>1)], nil
}
if index != 0x0b {
return nil, errors.New("invalid object type")
}
if amf.ReadByte() != 0x01 {
return nil, errors.New("type object not allowed")
}
ret := make(map[string]any)
amf.ocDec = append(amf.ocDec, ret)
for {
key, err := amf.readString()
if err != nil {
return nil, err
}
if key == "" {
break
}
ret[key], err = amf.Unmarshal()
if err != nil {
return nil, err
}
}
return ret, nil
}
return nil, errors.New("amf3 unmarshal error")
}
func (amf *AMF3) writeString(s string) error {
index, ok := amf.scEnc[s]
if ok {
amf.writeU29(uint32(index << 1))
return nil
}
err := amf.writeU29(uint32((len(s) << 1) | 0x01))
if err != nil {
return err
}
if s != "" {
amf.scEnc[s] = len(amf.scEnc)
}
amf.WriteString(s)
return nil
}
func (amf *AMF3) readU29() (uint32, error) {
var ret uint32 = 0
for i := 0; i < 4; i++ {
b := amf.ReadByte()
if i != 3 {
ret = (ret << 7) | uint32(b&0x7f)
if (b & 0x80) == 0 {
break
}
} else {
ret = (ret << 8) | uint32(b)
}
}
return ret, nil
}
func (amf *AMF3) writeU29(value uint32) error {
switch {
case value < 0x80:
amf.WriteByte(byte(value))
case value < 0x4000:
amf.Write([]byte{byte((value >> 7) | 0x80), byte(value & 0x7f)})
case value < 0x200000:
amf.Write([]byte{byte((value >> 14) | 0x80), byte((value >> 7) | 0x80), byte(value & 0x7f)})
case value < 0x20000000:
amf.Write([]byte{byte((value >> 22) | 0x80), byte((value >> 15) | 0x80), byte((value >> 7) | 0x80), byte(value & 0xff)})
default:
return errors.New("u29 over flow")
}
return nil
}
func (amf *AMF3) Marshals(v ...any) []byte {
for _, vv := range v {
amf.Marshal(vv)
}
return amf.Buffer
}
func MarshalAMF3s(v ...any) []byte {
var amf AMF3
amf.ocEnc = make(map[uintptr]int)
amf.scEnc = make(map[string]int)
return amf.Marshals(v...)
}
func (amf *AMF3) Marshal(v any) []byte {
if v == nil {
amf.WriteByte(AMF3_NULL)
return amf.Buffer
}
switch vv := v.(type) {
case string:
amf.WriteByte(AMF3_STRING)
amf.writeString(vv)
case bool:
if vv {
amf.WriteByte(AMF3_TRUE)
} else {
amf.WriteByte(AMF3_FALSE)
}
case int, int8, int16, int32, int64:
var value int64
reflect.ValueOf(&value).Elem().Set(reflect.ValueOf(vv).Convert(reflect.TypeOf(value)))
if value < -0xfffffff {
if value > -0x7fffffff {
return amf.Marshal(float64(value))
}
return amf.Marshal(strconv.FormatInt(value, 10))
}
amf.WriteByte(AMF3_INTEGER)
amf.writeU29(uint32(value))
case uint, uint8, uint16, uint32, uint64:
var value uint64
reflect.ValueOf(&value).Elem().Set(reflect.ValueOf(vv).Convert(reflect.TypeOf(value)))
if value >= 0x20000000 {
if value <= 0xffffffff {
return amf.Marshal(float64(value))
}
return amf.Marshal(strconv.FormatUint(value, 10))
}
amf.WriteByte(AMF3_INTEGER)
amf.writeU29(uint32(value))
case float32:
amf.Marshal(float64(vv))
case float64:
amf.WriteByte(AMF3_DOUBLE)
amf.WriteFloat64(vv)
case map[string]any:
amf.WriteByte(AMF3_OBJECT)
index, ok := amf.ocEnc[reflect.ValueOf(vv).Pointer()]
if ok {
index <<= 1
amf.writeU29(uint32(index << 1))
return nil
}
amf.WriteByte(0x0b)
err := amf.writeString("")
if err != nil {
return nil
}
for k, v := range vv {
err = amf.writeString(k)
if err != nil {
return nil
}
amf.Marshal(v)
}
amf.writeString("")
default:
v := reflect.ValueOf(vv)
if !v.IsValid() {
amf.WriteByte(AMF3_NULL)
return amf.Buffer
}
switch v.Kind() {
case reflect.Ptr:
if v.IsNil() {
amf.WriteByte(AMF3_NULL)
return amf.Buffer
}
vv := reflect.Indirect(v)
if vv.Kind() == reflect.Struct {
amf.WriteByte(AMF3_OBJECT)
index, ok := amf.ocEnc[v.Pointer()]
if ok {
index <<= 1
amf.writeU29(uint32(index << 1))
return nil
}
amf.WriteByte(0x0b)
err := amf.writeString("")
if err != nil {
return nil
}
t := vv.Type()
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
key := amf.getFieldName(f)
if key == "" {
continue
}
err = amf.writeString(key)
if err != nil {
return nil
}
fv := v.FieldByName(f.Name)
if fv.Kind() == reflect.Struct {
fv = fv.Addr()
}
amf.Marshal(fv.Interface())
}
amf.writeString("")
}
}
}
return amf.Buffer
}
func (amf *AMF3) getFieldName(f reflect.StructField) string {
chars := []rune(f.Name)
if unicode.IsLower(chars[0]) {
return ""
}
name := f.Tag.Get("amf.name")
if name != "" {
return name
}
if !amf.reservStruct {
chars[0] = unicode.ToLower(chars[0])
return string(chars)
}
return f.Name
}

82
plugin/rtmp/pkg/chunk.go Normal file
View File

@@ -0,0 +1,82 @@
package pkg
import (
"encoding/binary"
"m7s.live/m7s/v5/pkg/util"
)
// RTMP协议中基本的数据单元称为消息(Message).
// 当RTMP协议在互联网中传输数据的时候,消息会被拆分成更小的单元,称为消息块(Chunk).
// 在网络上传输数据时,消息需要被拆分成较小的数据块,才适合在相应的网络环境上传输.
// 理论上Type 0, 1, 2的Chunk都可以使用Extended Timestamp来传递时间
// Type 3由于严禁携带Extened Timestamp字段.但实际上只有Type 0才需要带此字段.
// 这是因为,对Type 1, 2来说,其时间为一个差值,一般肯定小于0x00FFFFF
// 对于除Audio,Video以外的基它Message,其时间字段都可以是置为0的似乎没有被用到.
// 只有在发送视频和音频数据时,才需要特别的考虑TimeStamp字段.基本依据是,要以HandShake时为起始点0来计算时间.
// 一般来说,建立一个相对时间,把一个视频帧的TimeStamp特意的在当前时间的基础上延迟3秒,则可以达到缓存的效果
const (
RTMP_CHUNK_HEAD_12 = 0 << 6 // Chunk Basic Header = (Chunk Type << 6) | Chunk Stream ID.
RTMP_CHUNK_HEAD_8 = 1 << 6
RTMP_CHUNK_HEAD_4 = 2 << 6
RTMP_CHUNK_HEAD_1 = 3 << 6
)
type Chunk struct {
ChunkHeader
AVData RTMPData
MsgData RtmpMessage
}
type ChunkHeader struct {
ChunkStreamID uint32 `json:""`
Timestamp uint32 `json:""` // 3 byte
MessageLength uint32 `json:""` // 3 byte
MessageTypeID byte `json:""` // 1 byte
MessageStreamID uint32 `json:""` // 4 byte
// Extended Timestamp (0 or 4 bytes): This field is present in certain
// circumstances depending on the encoded timestamp or timestamp
// delta field in the Chunk Message header. See Section 5.3.1.3 for
// more information
ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略
}
func (c *ChunkHeader) SetTimestamp(timestamp uint32) {
if timestamp >= 0xFFFFFF {
c.ExtendTimestamp = timestamp
c.Timestamp = 0xFFFFFF
} else {
c.ExtendTimestamp = 0
c.Timestamp = timestamp
}
}
// ChunkBasicHeader会决定ChunkMessgaeHeader,ChunkMessgaeHeader有4种(0,3,7,11 Bytes),因此可能有4种头.
// 1 -> ChunkBasicHeader(1) + ChunkMessageHeader(0)
// 4 -> ChunkBasicHeader(1) + ChunkMessageHeader(3)
// 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7)
// 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11)
func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) {
b.Reset()
csid := byte(h.ChunkStreamID)
b.WriteByte(t + csid)
if t < RTMP_CHUNK_HEAD_1 {
b.WriteUint24(h.Timestamp)
if t < RTMP_CHUNK_HEAD_4 {
b.WriteUint24(h.MessageLength)
b.WriteByte(h.MessageTypeID)
if t < RTMP_CHUNK_HEAD_8 {
binary.LittleEndian.PutUint32(b.Malloc(4), h.MessageStreamID)
}
}
}
if h.Timestamp == 0xffffff {
b.WriteUint32(h.ExtendTimestamp)
}
}

View File

@@ -9,8 +9,7 @@ import (
"m7s.live/m7s/v5/pkg/util"
)
var FourCC_H265 = [4]byte{'H', '2', '6', '5'}
var FourCC_AV1 = [4]byte{'a', 'v', '0', '1'}
type AVCDecoderConfigurationRecord struct {
ConfigurationVersion byte // 8 bits Version

View File

@@ -1,5 +1,9 @@
package pkg
import (
"m7s.live/m7s/v5/pkg/util"
)
const (
PacketTypeSequenceStart = iota
PacketTypeCodedFrames
@@ -7,4 +11,13 @@ const (
PacketTypeCodedFramesX
PacketTypeMetadata
PacketTypeMPEG2TSSequenceStart
)
)
var FourCC_H265 = [4]byte{'H', '2', '6', '5'}
var FourCC_AV1 = [4]byte{'a', 'v', '0', '1'}
type RTMPData struct {
Timestamp uint32
util.Buffers
util.RecyclebleMemory
}

78
plugin/rtmp/pkg/event.go Normal file
View File

@@ -0,0 +1,78 @@
package pkg
// http://help.adobe.com/zh_CN/AIR/1.5/jslr/flash/events/NetStatusEvent.html
const (
Response_OnStatus = "onStatus"
Response_Result = "_result"
Response_Error = "_error"
/* Level */
Level_Status = "status"
Level_Error = "error"
Level_Warning = "warning"
/* Code */
/* NetStream */
NetStream_Play_Reset = "NetStream.Play.Reset" // "status" 由播放列表重置导致
NetStream_Play_Start = "NetStream.Play.Start" // "status" 播放已开始
NetStream_Play_StreamNotFound = "NetStream.Play.StreamNotFound" // "error" 无法找到传递给 play()方法的 FLV
NetStream_Play_Stop = "NetStream.Play.Stop" // "status" 播放已结束
NetStream_Play_Failed = "NetStream.Play.Failed" // "error" 出于此表中列出的原因之外的某一原因(例如订阅者没有读取权限),播放发生了错误
NetStream_Play_PublishNotify = "NetStream.Play.PublishNotify" // "status" 发布者已经发布了流
NetStream_Play_UnpublishNotify = "NetStream.Play.UnpublishNotify" // "status" 发布者已经取消发布了流
NetStream_Play_Switch = "NetStream.Play.Switch"
NetStream_Play_Complete = "NetStream.Play.Complete"
NetStream_Data_Start = "NetStream.Data.Start"
NetStream_Publish_Start = "NetStream.Publish.Start" // "status" 已经成功发布.
NetStream_Publish_BadName = "NetStream.Publish.BadName" // "error" 试图发布已经被他人发布的流.
NetStream_Publish_Idle = "NetStream.Publish.Idle" // "status" 流发布者空闲而没有在传输数据.
NetStream_Unpublish_Success = "NetStream.Unpublish.Success" // "status" 已成功执行取消发布操作.
NetStream_Buffer_Empty = "NetStream.Buffer.Empty" // "status" 数据的接收速度不足以填充缓冲区.数据流将在缓冲区重新填充前中断,此时将发送 NetStream.Buffer.Full 消息,并且该流将重新开始播放
NetStream_Buffer_Full = "NetStream.Buffer.Full" // "status" 缓冲区已满并且流将开始播放
NetStream_Buffe_Flush = "NetStream.Buffer.Flush" // "status" 数据已完成流式处理,剩余的缓冲区将被清空
NetStream_Pause_Notify = "NetStream.Pause.Notify" // "status" 流已暂停
NetStream_Unpause_Notify = "NetStream.Unpause.Notify" // "status" 流已恢复
NetStream_Record_Start = "NetStream.Record.Start" // "status" 录制已开始.
NetStream_Record_NoAccess = "NetStream.Record.NoAccess" // "error" 试图录制仍处于播放状态的流或客户端没有访问权限的流.
NetStream_Record_Stop = "NetStream.Record.Stop" // "status" 录制已停止.
NetStream_Record_Failed = "NetStream.Record.Failed" // "error" 尝试录制流失败.
NetStream_Seek_Failed = "NetStream.Seek.Failed" // "error" 搜索失败,如果流处于不可搜索状态,则会发生搜索失败.
NetStream_Seek_InvalidTime = "NetStream.Seek.InvalidTime" // "error" 对于使用渐进式下载方式下载的视频,用户已尝试跳过到目前为止已下载的视频数据的结尾或在整个文件已下载后跳过视频的结尾进行搜寻或播放. message.details 属性包含一个时间代码,该代码指出用户可以搜寻的最后一个有效位置.
NetStream_Seek_Notify = "NetStream.Seek.Notify" // "status" 搜寻操作完成.
/* NetConnect */
NetConnection_Call_BadVersion = "NetConnection.Call.BadVersion" // "error" 以不能识别的格式编码的数据包.
NetConnection_Call_Failed = "NetConnection.Call.Failed" // "error" NetConnection.call 方法无法调用服务器端的方法或命令.
NetConnection_Call_Prohibited = "NetConnection.Call.Prohibited" // "error" Action Message Format (AMF) 操作因安全原因而被阻止. 或者是 AMF URL 与 SWF 不在同一个域,或者是 AMF 服务器没有信任 SWF 文件的域的策略文件.
NetConnection_Connect_AppShutdown = "NetConnection.Connect.AppShutdown" // "error" 正在关闭指定的应用程序.
NetConnection_Connect_InvalidApp = "NetConnection.Connect.InvalidApp" // "error" 连接时指定的应用程序名无效.
NetConnection_Connect_Success = "NetConnection.Connect.Success" // "status" 连接尝试成功.
NetConnection_Connect_Closed = "NetConnection.Connect.Closed" // "status" 成功关闭连接.
NetConnection_Connect_Failed = "NetConnection.Connect.Failed" // "error" 连接尝试失败.
NetConnection_Connect_Rejected = "NetConnection.Connect.Rejected" // "error" 连接尝试没有访问应用程序的权限.
/* SharedObject */
SharedObject_Flush_Success = "SharedObject.Flush.Success" //"status" "待定"状态已解析并且 SharedObject.flush() 调用成功.
SharedObject_Flush_Failed = "SharedObject.Flush.Failed" //"error" "待定"状态已解析,但 SharedObject.flush() 失败.
SharedObject_BadPersistence = "SharedObject.BadPersistence" //"error" 使用永久性标志对共享对象进行了请求,但请求无法被批准,因为已经使用其它标记创建了该对象.
SharedObject_UriMismatch = "SharedObject.UriMismatch" //"error" 试图连接到拥有与共享对象不同的 URI (URL) 的 NetConnection 对象.
)
// NetConnection、NetStream 或 SharedObject 对象报告其状态时,将调度 NetStatusEvent 对象
type NetStatusEvent struct {
Code string
Level string
}
func newNetStatusEvent(code, level string) (e *NetStatusEvent) {
e.Code = code
e.Level = level
return e
}

View File

@@ -0,0 +1,341 @@
package pkg
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"errors"
"fmt"
"io"
"math/rand"
"net"
"time"
"m7s.live/m7s/v5/pkg/util"
)
const (
C1S1_SIZE = 1536
C1S1_TIME_SIZE = 4
C1S1_VERSION_SIZE = 4
C1S1_DIGEST_SIZE = 764
C1S1_DIGEST_OFFSET_SIZE = 4
C1S1_DIGEST_OFFSET_MAX = 764 - 32 - 4
C1S1_DIGEST_DATA_SIZE = 32
C1S1_KEY_SIZE = 764
C1S1_KEY_OFFSET_SIZE = 4
C1S1_KEY_OFFSET_MAX = 764 - 128 - 4
C1S1_KEY_DATA_SIZE = 128
RTMP_HANDSHAKE_VERSION = 0x03
)
var (
FMS_KEY = []byte{
0x47, 0x65, 0x6e, 0x75, 0x69, 0x6e, 0x65, 0x20,
0x41, 0x64, 0x6f, 0x62, 0x65, 0x20, 0x46, 0x6c,
0x61, 0x73, 0x68, 0x20, 0x4d, 0x65, 0x64, 0x69,
0x61, 0x20, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
0x20, 0x30, 0x30, 0x31, // Genuine Adobe Flash Media Server 001
0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae,
} // 68
FP_KEY = []byte{
0x47, 0x65, 0x6E, 0x75, 0x69, 0x6E, 0x65, 0x20,
0x41, 0x64, 0x6F, 0x62, 0x65, 0x20, 0x46, 0x6C,
0x61, 0x73, 0x68, 0x20, 0x50, 0x6C, 0x61, 0x79,
0x65, 0x72, 0x20, 0x30, 0x30, 0x31, /* Genuine Adobe Flash Player 001 */
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8,
0x2E, 0x00, 0xD0, 0xD1, 0x02, 0x9E, 0x7E, 0x57,
0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB,
0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE,
} // 62
)
// C0 S0 (1 byte) : 版本号
// C1 S1 :
// Time (4 bytes)
// Zero (4 bytes) -> 这个字段必须都是0.如果不是0,代表要使用complex handshack
// Random data (128 Bytes)
// C2 S2 : 参考C1 S1
func ReadBuf(r io.Reader, length int) (buf []byte) {
buf = make([]byte, length)
io.ReadFull(r, buf)
return
}
func (nc *NetConnection) Handshake() error {
C0C1 := ReadBuf(nc.Reader, C1S1_SIZE+1)
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
return errors.New("C0 Error")
}
var C1 = C0C1[1:]
if len(C1) != C1S1_SIZE {
return errors.New("C1 Error")
}
var ts int
util.GetBE(C1[4:8], &ts)
if ts == 0 {
return nc.simple_handshake(C1)
}
return nc.complex_handshake(C1)
}
func (client *NetConnection) ClientHandshake() (err error) {
C0C1 := make([]byte, C1S1_SIZE+1)
C0C1[0] = RTMP_HANDSHAKE_VERSION
if _, err = client.Write(C0C1); err == nil {
// read S0 S1
if _, err = io.ReadFull(client.Reader, C0C1); err == nil {
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
err = errors.New("S1 C1 Error")
// C2
} else if _, err = client.Write(C0C1[1:]); err == nil {
_, err = io.ReadFull(client.Reader, C0C1[1:]) // S2
}
}
}
return
}
func (nc *NetConnection) simple_handshake(C1 []byte) error {
S0S1 := make([]byte, C1S1_SIZE+1)
S0S1[0] = RTMP_HANDSHAKE_VERSION
util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF)
copy(S0S1[5:], "Monibuca")
nc.Write(S0S1)
nc.Write(C1) // S2
if C2 := ReadBuf(nc.Reader, C1S1_SIZE); bytes.Compare(C2[8:], S0S1[9:]) != 0 {
return errors.New("C2 Error")
}
return nil
}
func (nc *NetConnection) complex_handshake(C1 []byte) error {
// 验证客户端,digest偏移位置和scheme由客户端定.
scheme, challenge, digest, ok, err := validateClient(C1)
if err != nil {
return err
}
if !ok {
fmt.Printf("digested handshake, scheme : %v\nchallenge : %v\ndigest : %v\nok : %v\nerr : %v\n", scheme, challenge, digest, ok, err)
return errors.New("validateClient failed")
}
// s1
S1 := create_S1()
S1_Digest_Offset := scheme_Digest_Offset(S1, scheme)
S1_Part1 := S1[:S1_Digest_Offset]
S1_Part2 := S1[S1_Digest_Offset+C1S1_DIGEST_DATA_SIZE:]
// s1 part1 + part2
buf := new(bytes.Buffer)
buf.Write(S1_Part1)
buf.Write(S1_Part2)
S1_Part1_Part2 := buf.Bytes()
// s1 digest
tmp_Hash, err := HMAC_SHA256(S1_Part1_Part2, FMS_KEY[:36])
if err != nil {
return err
}
// incomplete s1
copy(S1[S1_Digest_Offset:], tmp_Hash)
// s2
S2_Random := cerate_S2()
tmp_Hash, err = HMAC_SHA256(digest, FMS_KEY[:68])
if err != nil {
return err
}
// s2 digest
S2_Digest, err := HMAC_SHA256(S2_Random, tmp_Hash)
if err != nil {
return err
}
buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest}
buffer.WriteTo(nc)
ReadBuf(nc.Reader, 1536)
return nil
}
func validateClient(C1 []byte) (scheme int, challenge []byte, digest []byte, ok bool, err error) {
scheme, challenge, digest, ok, err = clientScheme(C1, 1)
if ok {
return scheme, challenge, digest, ok, nil
}
scheme, challenge, digest, ok, err = clientScheme(C1, 0)
if ok {
return scheme, challenge, digest, ok, nil
}
return scheme, challenge, digest, ok, errors.New("Client scheme error")
}
func clientScheme(C1 []byte, schem int) (scheme int, challenge []byte, digest []byte, ok bool, err error) {
digest_offset := -1
key_offset := -1
if schem == 0 {
digest_offset = scheme0_Digest_Offset(C1)
key_offset = scheme0_Key_Offset(C1)
} else if schem == 1 {
digest_offset = scheme1_Digest_Offset(C1)
key_offset = scheme1_Key_Offset(C1)
}
// digest
c1_Part1 := C1[:digest_offset]
c1_Part2 := C1[digest_offset+C1S1_DIGEST_DATA_SIZE:]
digest = C1[digest_offset : digest_offset+C1S1_DIGEST_DATA_SIZE]
// part1 + part2
buf := new(bytes.Buffer)
buf.Write(c1_Part1)
buf.Write(c1_Part2)
c1_Part1_Part2 := buf.Bytes()
tmp_Hash, err := HMAC_SHA256(c1_Part1_Part2, FP_KEY[:30])
if err != nil {
return 0, nil, nil, false, err
}
// ok
if bytes.Compare(digest, tmp_Hash) == 0 {
ok = true
} else {
ok = false
}
// challenge scheme
challenge = C1[key_offset : key_offset+C1S1_KEY_DATA_SIZE]
scheme = schem
return
}
func scheme_Digest_Offset(C1S1 []byte, scheme int) int {
if scheme == 0 {
return scheme0_Digest_Offset(C1S1)
} else if scheme == 1 {
return scheme1_Digest_Offset(C1S1)
}
return -1
}
// scheme0:
// time + version + digest + key
// time + version + [offset + random + digest-data + random-data] + key
// 4 + 4 + [4 + offset + 32 + 728-offset ] + 764
// 4 + 4 + 764 + 764
// 0 <= scheme0_digest_offset <= 728 == 764 - 32 - 4
// 如果digest.offset == 3,那么digest[7~38]为digest.digest-data,如果offset == 728, 那么digest[732~763]为digest-data)
func scheme0_Digest_Offset(C1S1 []byte) int {
scheme0_digest_offset := int(C1S1[8]&0xff) + int(C1S1[9]&0xff) + int(C1S1[10]&0xff) + int(C1S1[11]&0xff)
scheme0_digest_offset = (scheme0_digest_offset % C1S1_DIGEST_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_OFFSET_SIZE
if scheme0_digest_offset+32 >= C1S1_SIZE {
// digest error
// digest 数据超出1536.
}
return scheme0_digest_offset
}
// key:
// random-data + key-data + random-data + offset
// offset + 128 + 764-offset-128-4 + 4
// 0 <= scheme0_key_offset <= 632 == 764 - 128 - 4
// 如果key.offset == 3, 那么key[3~130]为key-data,这个位置是相对于key结构的第0个字节开始
func scheme0_Key_Offset(C1S1 []byte) int {
scheme0_key_offset := int(C1S1[1532]) + int(C1S1[1533]) + int(C1S1[1534]) + int(C1S1[1535])
scheme0_key_offset = (scheme0_key_offset % C1S1_KEY_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_SIZE
if scheme0_key_offset+128 >= C1S1_SIZE {
// key error
}
return scheme0_key_offset
}
// scheme1:
// time + version + key + digest
// 0 <= scheme1_digest_offset <= 728 == 764 - 32 - 4
func scheme1_Digest_Offset(C1S1 []byte) int {
scheme1_digest_offset := int(C1S1[772]&0xff) + int(C1S1[773]&0xff) + int(C1S1[774]&0xff) + int(C1S1[775]&0xff)
scheme1_digest_offset = (scheme1_digest_offset % C1S1_DIGEST_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_KEY_SIZE + C1S1_DIGEST_OFFSET_SIZE
if scheme1_digest_offset+32 >= C1S1_SIZE {
// digest error
}
return scheme1_digest_offset
}
// time + version + key + digest
// 0 <= scheme1_key_offset <= 632 == 764 - 128 - 4
func scheme1_Key_Offset(C1S1 []byte) int {
scheme1_key_offset := int(C1S1[768]) + int(C1S1[769]) + int(C1S1[770]) + int(C1S1[771])
scheme1_key_offset = (scheme1_key_offset % C1S1_KEY_OFFSET_MAX) + C1S1_TIME_SIZE + C1S1_VERSION_SIZE + C1S1_DIGEST_SIZE
if scheme1_key_offset+128 >= C1S1_SIZE {
// key error
}
return scheme1_key_offset
}
// HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出
// 哈希算法sha256.New, 密钥 key, 消息 message.
func HMAC_SHA256(message []byte, key []byte) ([]byte, error) {
mac := hmac.New(sha256.New, key)
_, err := mac.Write(message)
if err != nil {
return nil, err
}
return mac.Sum(nil), nil
}
func create_S1() []byte {
s1_Time := []byte{0, 0, 0, 0}
s1_Version := []byte{1, 1, 1, 1}
s1_key_Digest := make([]byte, 1536-8)
for i, _ := range s1_key_Digest {
s1_key_Digest[i] = byte(rand.Int() % 256)
}
buf := new(bytes.Buffer)
buf.Write(s1_Time)
buf.Write(s1_Version)
buf.Write(s1_key_Digest)
return buf.Bytes()
}
func cerate_S2() []byte {
s2_Random := make([]byte, 1536-32)
for i, _ := range s2_Random {
s2_Random[i] = byte(rand.Int() % 256)
}
return s2_Random
}

150
plugin/rtmp/pkg/media.go Normal file
View File

@@ -0,0 +1,150 @@
package pkg
import "m7s.live/m7s/v5"
// type AVSender struct {
// *RTMPSender
// ChunkHeader
// firstSent bool
// }
// func (av *AVSender) sendSequenceHead(seqHead []byte) {
// av.SetTimestamp(0)
// av.MessageLength = uint32(len(seqHead))
// for !av.writing.CompareAndSwap(false, true) {
// runtime.Gosched()
// }
// defer av.writing.Store(false)
// if av.firstSent {
// av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
// } else {
// av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
// }
// av.sendChunk(seqHead)
// }
// func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
// seq := frame.Sequence
// payloadLen := frame.AVCC.ByteLength
// if payloadLen == 0 {
// err := errors.New("payload is empty")
// av.Error("payload is empty", zap.Error(err))
// return err
// }
// if av.writeSeqNum > av.bandwidth {
// av.totalWrite += av.writeSeqNum
// av.writeSeqNum = 0
// av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
// av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
// }
// av.MessageLength = uint32(payloadLen)
// for !av.writing.CompareAndSwap(false, true) {
// runtime.Gosched()
// }
// defer av.writing.Store(false)
// // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// // 当Chunk Type为0时(即Chunk12),
// if !av.firstSent {
// av.firstSent = true
// av.SetTimestamp(absTime)
// av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
// } else {
// av.SetTimestamp(frame.DeltaTime)
// av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
// }
// //数据被覆盖导致序号变了
// if seq != frame.Sequence {
// return errors.New("sequence is not equal")
// }
// r := frame.AVCC.NewReader()
// chunk := net.Buffers{av.chunkHeader}
// av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
// for r.CanRead() {
// item := av.bytePool.Get(16)
// defer item.Recycle()
// av.WriteTo(RTMP_CHUNK_HEAD_1, &item.Value)
// // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
// chunk = append(chunk, item.Value)
// av.writeSeqNum += uint32(item.Value.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
// }
// _, err = chunk.WriteTo(av.Conn)
// return nil
// }
// type RTMPSender struct {
// Subscriber
// NetStream
// audio, video AVSender
// }
// func (rtmp *RTMPSender) OnEvent(event any) {
// switch v := event.(type) {
// case SEwaitPublish:
// rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
// case SEpublish:
// rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
// case ISubscriber:
// rtmp.audio.RTMPSender = rtmp
// rtmp.video.RTMPSender = rtmp
// rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO
// rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO
// rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO
// rtmp.video.MessageTypeID = RTMP_MSG_VIDEO
// rtmp.audio.MessageStreamID = rtmp.StreamID
// rtmp.video.MessageStreamID = rtmp.StreamID
// case AudioDeConf:
// rtmp.audio.sendSequenceHead(v)
// case VideoDeConf:
// rtmp.video.sendSequenceHead(v)
// case AudioFrame:
// if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// case VideoFrame:
// if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
// rtmp.Stop(zap.Error(err))
// }
// default:
// rtmp.Subscriber.OnEvent(event)
// }
// }
// func (r *RTMPSender) Response(tid uint64, code, level string) error {
// m := new(ResponsePlayMessage)
// m.CommandName = Response_OnStatus
// m.TransactionId = tid
// m.Infomation = map[string]any{
// "code": code,
// "level": level,
// "description": "",
// }
// m.StreamID = r.StreamID
// return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// }
type RTMPReceiver struct {
*m7s.Publisher
NetStream
}
func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
m := new(ResponsePublishMessage)
m.CommandName = Response_OnStatus
m.TransactionId = tid
m.Infomation = map[string]any{
"code": code,
"level": level,
"description": "",
}
m.StreamID = r.StreamID
return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
// r.WriteAudio(nil)
}
func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
r.WriteVideo(&RTMPVideo{msg.AVData})
}

788
plugin/rtmp/pkg/msg.go Normal file
View File

@@ -0,0 +1,788 @@
package pkg
import (
"encoding/binary"
"errors"
"strings"
"m7s.live/m7s/v5/pkg/util"
)
// https://zhuanlan.zhihu.com/p/196743129
const (
/* RTMP Message ID*/
// Protocal Control Messgae(1-7)
// Chunk
RTMP_MSG_CHUNK_SIZE = 1
RTMP_MSG_ABORT = 2
// RTMP
RTMP_MSG_ACK = 3
RTMP_MSG_USER_CONTROL = 4
RTMP_MSG_ACK_SIZE = 5
RTMP_MSG_BANDWIDTH = 6
RTMP_MSG_EDGE = 7
RTMP_MSG_AUDIO = 8
RTMP_MSG_VIDEO = 9
RTMP_MSG_AMF3_METADATA = 15
RTMP_MSG_AMF3_SHARED = 16
RTMP_MSG_AMF3_COMMAND = 17
RTMP_MSG_AMF0_METADATA = 18
RTMP_MSG_AMF0_SHARED = 19
RTMP_MSG_AMF0_COMMAND = 20
RTMP_MSG_AGGREGATE = 22
RTMP_DEFAULT_CHUNK_SIZE = 128
RTMP_MAX_CHUNK_SIZE = 65536
RTMP_MAX_CHUNK_HEADER = 18
// User Control Event
RTMP_USER_STREAM_BEGIN = 0
RTMP_USER_STREAM_EOF = 1
RTMP_USER_STREAM_DRY = 2
RTMP_USER_SET_BUFFLEN = 3
RTMP_USER_STREAM_IS_RECORDED = 4
RTMP_USER_PING_REQUEST = 6
RTMP_USER_PING_RESPONSE = 7
RTMP_USER_EMPTY = 31
// StreamID == (ChannelID-4)/5+1
// ChannelID == Chunk Stream ID
// StreamID == Message Stream ID
// Chunk Stream ID == 0, 第二个byte + 64
// Chunk Stream ID == 1, (第三个byte) * 256 + 第二个byte + 64
// Chunk Stream ID == 2.
// 2 < Chunk Stream ID < 64(2的6次方)
RTMP_CSID_CONTROL = 0x02
RTMP_CSID_COMMAND = 0x03
RTMP_CSID_AUDIO = 0x06
RTMP_CSID_DATA = 0x05
RTMP_CSID_VIDEO = 0x05
)
func newChunkHeader(messageType byte) *ChunkHeader {
head := new(ChunkHeader)
head.ChunkStreamID = RTMP_CSID_CONTROL
if messageType == RTMP_MSG_AMF0_COMMAND {
head.ChunkStreamID = RTMP_CSID_COMMAND
}
head.MessageTypeID = messageType
return head
}
func (h ChunkHeader) Clone() *ChunkHeader {
return &h
}
type RtmpMessage interface {
Encode(IAMF)
}
type HaveStreamID interface {
GetStreamID() uint32
}
func GetRtmpMessage(chunk *Chunk, body util.Buffer) error {
switch chunk.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE:
if body.Len() < 4 {
return errors.New("chunk.Body < 4")
}
chunk.MsgData = Uint32Message(body.ReadUint32())
case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件.
{
if body.Len() < 2 {
return errors.New("UserControlMessage.Body < 2")
}
base := UserControlMessage{
EventType: body.ReadUint16(),
EventData: body,
}
switch base.EventType {
case RTMP_USER_STREAM_BEGIN: // 服务端向客户端发送本事件通知对方一个流开始起作用可以用于通讯.在默认情况下,服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID.
m := &StreamIDMessage{
UserControlMessage: base,
StreamID: 0,
}
if len(base.EventData) >= 4 {
//服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID.
m.StreamID = body.ReadUint32()
}
chunk.MsgData = m
case RTMP_USER_STREAM_EOF, RTMP_USER_STREAM_DRY, RTMP_USER_STREAM_IS_RECORDED: // 服务端向客户端发送本事件通知客户端,数据回放完成.果没有发行额外的命令,就不再发送数据.客户端丢弃从流中接收的消息.4字节的事件数据表示,回放结束的流的ID.
chunk.MsgData = &StreamIDMessage{
UserControlMessage: base,
StreamID: body.ReadUint32(),
}
case RTMP_USER_SET_BUFFLEN: // 客户端向服务端发送本事件,告知对方自己存储一个流的数据的缓存的长度(毫秒单位).当服务端开始处理一个流得时候发送本事件.事件数据的头四个字节表示流ID,后4个字节表示缓存长度(毫秒单位).
chunk.MsgData = &SetBufferMessage{
StreamIDMessage: StreamIDMessage{
UserControlMessage: base,
StreamID: body.ReadUint32(),
},
Millisecond: body.ReadUint32(),
}
case RTMP_USER_PING_REQUEST: // 服务端通过本事件测试客户端是否可达.事件数据是4个字节的事件戳.代表服务调用本命令的本地时间.客户端在接收到kMsgPingRequest之后返回kMsgPingResponse事件
chunk.MsgData = &PingRequestMessage{
UserControlMessage: base,
Timestamp: body.ReadUint32(),
}
case RTMP_USER_PING_RESPONSE, RTMP_USER_EMPTY: // 客户端向服务端发送本消息响应ping请求.事件数据是接kMsgPingRequest请求的时间.
chunk.MsgData = &base
default:
chunk.MsgData = &base
}
}
case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽.
if body.Len() < 4 {
return errors.New("chunk.Body < 4")
}
m := &SetPeerBandwidthMessage{
AcknowledgementWindowsize: body.ReadUint32(),
}
if body.Len() > 0 {
m.LimitType = body[0]
}
chunk.MsgData = m
case RTMP_MSG_EDGE: // RTMP消息类型ID=7, 用于边缘服务与源服务器.
case RTMP_MSG_AUDIO: // RTMP消息类型ID=8, 音频数据.客户端或服务端发送本消息用于发送音频数据.
case RTMP_MSG_VIDEO: // RTMP消息类型ID=9, 视频数据.客户端或服务端发送本消息用于发送视频数据.
case RTMP_MSG_AMF3_METADATA: // RTMP消息类型ID=15, 数据消息.用AMF3编码.
case RTMP_MSG_AMF3_SHARED: // RTMP消息类型ID=16, 共享对象消息.用AMF3编码.
case RTMP_MSG_AMF3_COMMAND: // RTMP消息类型ID=17, 命令消息.用AMF3编码.
decodeCommandAMF0(chunk, body[1:])
case RTMP_MSG_AMF0_METADATA: // RTMP消息类型ID=18, 数据消息.用AMF0编码.
case RTMP_MSG_AMF0_SHARED: // RTMP消息类型ID=19, 共享对象消息.用AMF0编码.
case RTMP_MSG_AMF0_COMMAND: // RTMP消息类型ID=20, 命令消息.用AMF0编码.
decodeCommandAMF0(chunk, body) // 解析具体的命令消息
case RTMP_MSG_AGGREGATE:
default:
}
return nil
}
// 03 00 00 00 00 01 02 14 00 00 00 00 02 00 07 63 6F 6E 6E 65 63 74 00 3F F0 00 00 00 00 00 00 08
//
// 这个函数解析的是从02(第13个字节)开始,前面12个字节是Header,后面的是Payload,即解析Payload.
//
// 解析用AMF0编码的命令消息.(Payload)
// 第一个字节(Byte)为此数据的类型.例如:string,int,bool...
// string就是字符类型,一个byte的amf类型,两个bytes的字符长度,和N个bytes的数据.
// 比如: 02 00 02 33 22,第一个byte为amf类型,其后两个bytes为长度,注意这里的00 02是大端模式,33 22是字符数据
// umber类型其实就是double,占8bytes.
// 比如: 00 00 00 00 00 00 00 00,第一个byte为amf类型,其后8bytes为double值0.0
// boolean就是布尔类型,占用1byte.
// 比如:01 00,第一个byte为amf类型,其后1byte是值,false.
// object类型要复杂点.
// 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束
func decodeCommandAMF0(chunk *Chunk, body []byte) {
amf := AMF{body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去.
cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型.
cmdMsg := CommandMessage{
cmd,
uint64(amf.ReadNumber()),
}
switch cmd {
case "connect", "call":
chunk.MsgData = &CallMessage{
cmdMsg,
amf.ReadObject(),
amf.ReadObject(),
}
case "createStream":
amf.Unmarshal()
chunk.MsgData = &cmdMsg
case "play":
amf.Unmarshal()
m := &PlayMessage{
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.ReadShortString(),
float64(-2),
float64(-1),
true,
}
for i := 0; i < 3; i++ {
if v, _ := amf.Unmarshal(); v != nil {
switch vv := v.(type) {
case float64:
if i == 0 {
m.Start = vv
} else {
m.Duration = vv
}
case bool:
m.Reset = vv
i = 2
}
} else {
break
}
}
chunk.MsgData = m
case "play2":
amf.Unmarshal()
chunk.MsgData = &Play2Message{
cmdMsg,
uint64(amf.ReadNumber()),
amf.ReadShortString(),
amf.ReadShortString(),
uint64(amf.ReadNumber()),
amf.ReadShortString(),
}
case "publish":
amf.Unmarshal()
chunk.MsgData = &PublishMessage{
CURDStreamMessage{
cmdMsg,
chunk.MessageStreamID,
},
amf.ReadShortString(),
amf.ReadShortString(),
}
case "pause":
amf.Unmarshal()
chunk.MsgData = &PauseMessage{
cmdMsg,
amf.ReadBool(),
uint64(amf.ReadNumber()),
}
case "seek":
amf.Unmarshal()
chunk.MsgData = &SeekMessage{
cmdMsg,
uint64(amf.ReadNumber()),
}
case "deleteStream", "closeStream":
amf.Unmarshal()
chunk.MsgData = &CURDStreamMessage{
cmdMsg,
uint32(amf.ReadNumber()),
}
case "releaseStream":
amf.Unmarshal()
chunk.MsgData = &ReleaseStreamMessage{
cmdMsg,
amf.ReadShortString(),
}
case "receiveAudio", "receiveVideo":
amf.Unmarshal()
chunk.MsgData = &ReceiveAVMessage{
cmdMsg,
amf.ReadBool(),
}
case Response_Result, Response_Error, Response_OnStatus:
if cmdMsg.TransactionId == 2 {
chunk.MsgData = &ResponseCreateStreamMessage{
cmdMsg, amf.ReadObject(), uint32(amf.ReadNumber()),
}
return
}
response := &ResponseMessage{
cmdMsg,
amf.ReadObject(),
amf.ReadObject(), "",
}
if response.Infomation == nil && response.Properties != nil {
response.Infomation = response.Properties
}
// codef := zap.String("code", response.Infomation["code"].(string))
switch response.Infomation["level"] {
case Level_Status:
// RTMPPlugin.Info("_result :", codef)
case Level_Warning:
// RTMPPlugin.Warn("_result :", codef)
case Level_Error:
// RTMPPlugin.Error("_result :", codef)
}
if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Publish") {
chunk.MsgData = &ResponsePublishMessage{
cmdMsg,
response.Properties,
response.Infomation,
chunk.MessageStreamID,
}
} else if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Play") {
chunk.MsgData = &ResponsePlayMessage{
cmdMsg,
response.Infomation,
chunk.MessageStreamID,
}
} else {
chunk.MsgData = response
}
case "FCPublish", "FCUnpublish":
fallthrough
default:
chunk.MsgData = &struct{ CommandMessage }{cmdMsg}
// RTMPPlugin.Info("decode command amf0 ", zap.String("cmd", cmd))
}
}
/* Command Message */
type CommandMessage struct {
CommandName string // 命令名. 字符串. 命令名.设置为"connect"
TransactionId uint64 // 传输ID. 数字. 总是设为1
}
type Commander interface {
GetCommand() *CommandMessage
}
func (cmd *CommandMessage) GetCommand() *CommandMessage {
return cmd
}
func (msg *CommandMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, nil)
}
// Protocol control message 1.
// Set Chunk Size, is used to notify the peer of a new maximum chunk size
// chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the senders subsequent chunks until further notice
type Uint32Message uint32
func (msg Uint32Message) Encode(buf IAMF) {
binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg))
}
// Protocol control message 4, User Control Messages.
// User Control messages SHOULD use message stream ID 0 (known as the control stream) and, when sent over RTMP Chunk Stream,
// be sent on chunk stream ID 2. User Control messages are effective at the point they are received in the stream; their timestamps are ignored.
// Event Type (16 bits) : The first 2 bytes of the message data are used to identify the Event type. Event type is followed by Event data.
// Event Data
type UserControlMessage struct {
EventType uint16
EventData []byte
}
// Protocol control message 6, Set Peer Bandwidth Message.
// The client or the server sends this message to limit the output bandwidth of its peer.
// AcknowledgementWindowsize (4 bytes)
// LimitType : The Limit Type is one of the following values: 0 - Hard, 1 - Soft, 2- Dynamic.
type SetPeerBandwidthMessage struct {
AcknowledgementWindowsize uint32 // 4 bytes
LimitType byte
}
func (msg *SetPeerBandwidthMessage) Encode(buf IAMF) {
buf.WriteUint32(msg.AcknowledgementWindowsize)
buf.WriteByte(msg.LimitType)
}
// Message 15, 18. Data Message. The client or the server sends this message to send Metadata or any
// user data to the peer. Metadata includes details about the data(audio, video etc.) like creation time, duration,
// theme and so on. These messages have been assigned message type value of 18 for AMF0 and message type value of 15 for AMF3
type MetadataMessage struct {
Proterties map[string]interface{} `json:",omitempty"`
}
// Object 可选值:
// App 客户端要连接到的服务应用名 Testapp
// Flashver Flash播放器版本.和应用文档中getversion()函数返回的字符串相同. FMSc/1.0
// SwfUrl 发起连接的swf文件的url file://C:/ FlvPlayer.swf
// TcUrl 服务url.有下列的格式.protocol://servername:port/appName/appInstance rtmp://localhost::1935/testapp/instance1
// fpad 是否使用代理 true or false
// audioCodecs 指示客户端支持的音频编解码器 SUPPORT_SND_MP3
// videoCodecs 指示支持的视频编解码器 SUPPORT_VID_SORENSON
// pageUrl SWF文件被加载的页面的Url http:// somehost/sample.html
// objectEncoding AMF编码方法 AMF编码方法 kAMF3
// Call Message.
// The call method of the NetConnection object runs remote procedure calls (RPC) at the receiving end.
// The called RPC name is passed as a parameter to the call command.
type CallMessage struct {
CommandMessage
Object map[string]any `json:",omitempty"`
Optional map[string]any `json:",omitempty"`
}
func (msg *CallMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, msg.Object)
if msg.Optional != nil {
buf.Marshals(msg.Optional)
}
}
// func (msg *CallMessage) Encode3() []byte {
// var amf util.AMF
// amf.WriteByte(0)
// return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional)
// }
// Create Stream Message.
// The client sends this command to the server to create a logical channel for message communication The publishing of audio,
// video, and metadata is carried out over stream channel created using the createStream command.
/*
func (msg *CreateStreamMessage) Encode3() {
msg.Encode0()
buf := new(bytes.Buffer)
buf.WriteByte(0)
buf.Write(msg.RtmpBody)
msg.RtmpBody = buf.Bytes()
}*/
// The following commands can be sent on the NetStream by the client to the server:
// Play
// Play2
// DeleteStream
// CloseStream
// ReceiveAudio
// ReceiveVideo
// Publish
// Seek
// Pause
// Release(37)
// FCPublish
// Play Message
// The client sends this command to the server to play a stream. A playlist can also be created using this command multiple times
type PlayMessage struct {
CURDStreamMessage
StreamName string
Start float64
Duration float64
Reset bool
}
// 命令名 -> 命令名,设置为”play”
// 传输ID -> 0
// 命令对象
// 流名字 -> 要播放流的名字
// start -> 可选的参数,以秒为单位定义开始时间.默认值为 -2,表示用户首先尝试播放流名字段中定义的直播流.
// Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束
// Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush
func (msg *PlayMessage) Encode(buf IAMF) {
// if msg.Start > 0 {
// amf.writeNumber(msg.Start)
// }
// if msg.Duration > 0 {
// amf.writeNumber(msg.Duration)
// }
// amf.writeBool(msg.Reset)
buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000)
}
/*
func (msg *PlayMessage) Encode3() {
}*/
// Play2 Message
// Unlike the play command, play2 can switch to a different bit rate stream without changing the timeline of the content played. The
// server maintains multiple files for all supported bitrates that the client can request in play2.
type Play2Message struct {
CommandMessage
StartTime uint64
OldStreamName string
StreamName string
Duration uint64
Transition string
}
func (msg *Play2Message) Encode0() {
}
// Delete Stream Message
// NetStream sends the deleteStream command when the NetStream object is getting destroyed
type CURDStreamMessage struct {
CommandMessage
StreamId uint32
}
func (msg *CURDStreamMessage) GetStreamID() uint32 {
return msg.StreamId
}
func (msg *CURDStreamMessage) Encode0() {
}
type ReleaseStreamMessage struct {
CommandMessage
StreamName string
}
func (msg *ReleaseStreamMessage) Encode0() {
}
// Receive Audio Message
// NetStream sends the receiveAudio message to inform the server whether to send or not to send the audio to the client
type ReceiveAVMessage struct {
CommandMessage
BoolFlag bool
}
func (msg *ReceiveAVMessage) Encode0() {
}
// Publish Message
// The client sends the publish command to publish a named stream to the server. Using this name,
// any client can play this stream and receive the published audio, video, and data messages
type PublishMessage struct {
CURDStreamMessage
PublishingName string
PublishingType string
}
// 命令名 -> 命令名,设置为”publish”
// 传输ID -> 0
// 命令对象
// 发布名 -> 流发布的名字
// 发布类型 -> 设置为”live””record”或”append”.
// “record”:流被发布,并且数据被录制到一个新的文件,文件被存储到服务端的服务应用的目录的一个子目录下.如果文件已经存在则重写文件.
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
// “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType)
}
// Seek Message
// The client sends the seek command to seek the offset (in milliseconds) within a media file or playlist.
type SeekMessage struct {
CommandMessage
Milliseconds uint64
}
func (msg *SeekMessage) Encode0() {
}
// Pause Message
// The client sends the pause command to tell the server to pause or start playing.
type PauseMessage struct {
CommandMessage
Pause bool
Milliseconds uint64
}
// 命令名 -> 命令名,设置为”pause”
// 传输ID -> 0
// 命令对象 -> null
// Pause/Unpause Flag -> true 或者 false来指示暂停或者重新播放
// milliSeconds -> 流暂停或者重新开始所在的毫秒数.这个是客户端暂停的当前流时间.当回放已恢复时,服务器端值发送带有比这个值大的 timestamp 消息
func (msg *PauseMessage) Encode0() {
}
//
// Response Message. Server -> Response -> Client
//
// Response Connect Message
type ResponseConnectMessage struct {
CommandMessage
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
}
func (msg *ResponseConnectMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
}
/*
func (msg *ResponseConnectMessage) Encode3() {
}*/
// Response Call Message
type ResponseCallMessage struct {
CommandMessage
Object map[string]any
Response map[string]any
}
// func (msg *ResponseCallMessage) Encode0() []byte {
// return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response)
// }
// Response Create Stream Message
type ResponseCreateStreamMessage struct {
CommandMessage
Object any `json:",omitempty"`
StreamId uint32
}
func (msg *ResponseCreateStreamMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamId)
}
/*
func (msg *ResponseCreateStreamMessage) Encode3() {
}*/
// func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) {
// amf := util.AMF{chunk.Body}
// msg.CommandName = amf.ReadShortString()
// msg.TransactionId = uint64(amf.ReadNumber())
// amf.Unmarshal()
// msg.StreamId = uint32(amf.ReadNumber())
// }
// func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) {
// chunk.Body = chunk.Body[1:]
// msg.Decode0(chunk)
// }
// Response Play Message
type ResponsePlayMessage struct {
CommandMessage
Infomation map[string]any `json:",omitempty"`
StreamID uint32
}
func (msg *ResponsePlayMessage) GetStreamID() uint32 {
return msg.StreamID
}
func (msg *ResponsePlayMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.Infomation)
}
/*
func (msg *ResponsePlayMessage) Encode3() {
}*/
// func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) {
// amf := util.AMF{chunk.Body}
// msg.CommandName = amf.ReadShortString()
// msg.TransactionId = uint64(amf.ReadNumber())
// msg.Infomation = amf.ReadObject()
// }
// func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) {
// chunk.Body = chunk.Body[1:]
// msg.Decode0(chunk)
// }
// Response Publish Message
type ResponsePublishMessage struct {
CommandMessage
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
StreamID uint32
}
func (msg *ResponsePublishMessage) GetStreamID() uint32 {
return msg.StreamID
}
// 命令名 -> 命令名,设置为"OnStatus"
// 传输ID -> 0
// 属性 -> null
// 信息 -> level, code, description
func (msg *ResponsePublishMessage) Encode(buf IAMF) {
buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
}
/*
func (msg *ResponsePublishMessage) Encode3() {
}*/
// Response Seek Message
type ResponseSeekMessage struct {
CommandMessage
Description string
}
func (msg *ResponseSeekMessage) Encode0() {
}
//func (msg *ResponseSeekMessage) Encode3() {
//}
// Response Pause Message
type ResponsePauseMessage struct {
CommandMessage
Description string
}
// 命令名 -> 命令名,设置为"OnStatus"
// 传输ID -> 0
// 描述
func (msg *ResponsePauseMessage) Encode0() {
}
//func (msg *ResponsePauseMessage) Encode3() {
//}
// Response Message
type ResponseMessage struct {
CommandMessage
Properties map[string]any `json:",omitempty"`
Infomation map[string]any `json:",omitempty"`
Description string
}
// User Control Message 4.
// The client or the server sends this message to notify the peer about the user control events.
// For information about the message format, see Section 6.2.
// The following user control event types are supported:
// Stream Begin (=0)
// The server sends this event to notify the client that a stream has become functional and can be
// used for communication. By default, this event is sent on ID 0 after the application connect
// command is successfully received from the client. The event data is 4-byte and represents
// the stream ID of the stream that became functional.
type StreamIDMessage struct {
UserControlMessage
StreamID uint32
}
func (msg *StreamIDMessage) Encode(buffer IAMF) {
buffer.WriteUint16(msg.EventType)
msg.EventData = buffer.Malloc(4)
binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
}
// SetBuffer Length (=3)
// The client sends this event to inform the server of the buffer size (in milliseconds) that is
// used to buffer any data coming over a stream. This event is sent before the server starts |
// processing the stream. The first 4 bytes of the event data represent the stream ID and the next |
// 4 bytes represent the buffer length, in milliseconds.
type SetBufferMessage struct {
StreamIDMessage
Millisecond uint32
}
func (msg *SetBufferMessage) Encode(buf IAMF) {
buf.WriteUint16(msg.EventType)
msg.EventData = buf.Malloc(8)
binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
binary.BigEndian.PutUint32(msg.EventData[4:], msg.Millisecond)
}
// PingRequest (=6)
// The server sends this event to test whether the client is reachable. Event data is a 4-byte
// timestamp, representing the local server time when the server dispatched the command.
// The client responds with PingResponse on receiving MsgPingRequest.
type PingRequestMessage struct {
UserControlMessage
Timestamp uint32
}
func (msg *PingRequestMessage) Encode(buf IAMF) {
buf.WriteUint16(msg.EventType)
msg.EventData = buf.Malloc(4)
binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp)
}
func (msg *UserControlMessage) Encode(buf IAMF) {
buf.WriteUint16(msg.EventType)
}

View File

@@ -0,0 +1,345 @@
package pkg
import (
"bufio"
"encoding/binary"
"errors"
"io"
"net"
"runtime"
"sync/atomic"
"m7s.live/m7s/v5/pkg/util"
)
const (
SEND_CHUNK_SIZE_MESSAGE = "Send Chunk Size Message"
SEND_ACK_MESSAGE = "Send Acknowledgement Message"
SEND_ACK_WINDOW_SIZE_MESSAGE = "Send Window Acknowledgement Size Message"
SEND_SET_PEER_BANDWIDTH_MESSAGE = "Send Set Peer Bandwidth Message"
SEND_STREAM_BEGIN_MESSAGE = "Send Stream Begin Message"
SEND_SET_BUFFER_LENGTH_MESSAGE = "Send Set Buffer Lengh Message"
SEND_STREAM_IS_RECORDED_MESSAGE = "Send Stream Is Recorded Message"
SEND_PING_REQUEST_MESSAGE = "Send Ping Request Message"
SEND_PING_RESPONSE_MESSAGE = "Send Ping Response Message"
SEND_CONNECT_MESSAGE = "Send Connect Message"
SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message"
SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message"
SEND_PLAY_MESSAGE = "Send Play Message"
SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message"
SEND_PUBLISH_RESPONSE_MESSAGE = "Send Publish Response Message"
SEND_PUBLISH_START_MESSAGE = "Send Publish Start Message"
SEND_UNPUBLISH_RESPONSE_MESSAGE = "Send Unpublish Response Message"
SEND_AUDIO_MESSAGE = "Send Audio Message"
SEND_FULL_AUDIO_MESSAGE = "Send Full Audio Message"
SEND_VIDEO_MESSAGE = "Send Video Message"
SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message"
)
type BytesPool struct {
*util.Pool[[]byte]
ItemSize int
}
func (bp *BytesPool) Get(size int) []byte {
if size != bp.ItemSize {
bp.Pool = nil
return make([]byte, size)
}
ret := bp.Pool.Get()
if ret == nil {
return make([]byte, size)
}
return ret
}
type NetConnection struct {
*bufio.Reader `json:"-" yaml:"-"`
net.Conn `json:"-" yaml:"-"`
bandwidth uint32
readSeqNum uint32 // 当前读的字节
writeSeqNum uint32 // 当前写的字节
totalWrite uint32 // 总共写了多少字节
totalRead uint32 // 总共读了多少字节
WriteChunkSize int
readChunkSize int
incommingChunks map[uint32]*Chunk
ObjectEncoding float64
AppName string
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeader util.Buffer
bytePool BytesPool
writing atomic.Bool // false 可写true 不可写
}
func NewNetConnection(conn net.Conn) *NetConnection {
return &NetConnection{
Conn: conn,
Reader: bufio.NewReader(conn),
WriteChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
incommingChunks: make(map[uint32]*Chunk),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make(util.Buffer, 4),
chunkHeader: make(util.Buffer, 0, 16),
// bytePool: make(util.BytesPool, 17),
}
}
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
n, err = io.ReadFull(conn.Reader, buf)
if err == nil {
conn.readSeqNum += uint32(n)
}
return
}
func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID})
}
func (conn *NetConnection) SendUserControl(eventType uint16) error {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: eventType})
}
func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) error {
m := &ResponseCreateStreamMessage{}
m.CommandName = Response_Result
m.TransactionId = tid
m.StreamId = streamID
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
// func (conn *NetConnection) SendCommand(message string, args any) error {
// switch message {
// // case SEND_SET_BUFFER_LENGTH_MESSAGE:
// // if args != nil {
// // return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil")
// // }
// // m := new(SetBufferMessage)
// // m.EventType = RTMP_USER_SET_BUFFLEN
// // m.Millisecond = 100
// // m.StreamID = conn.streamID
// // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
// }
// return errors.New("send message no exist")
// }
func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
head, err := conn.ReadByte()
if err != nil {
return nil, err
}
conn.readSeqNum++
ChunkStreamID := uint32(head & 0x3f) // 0011 1111
ChunkType := head >> 6 // 1100 0000
// 如果块流ID为0,1的话,就需要计算.
ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID)
if err != nil {
return nil, errors.New("get chunk stream id error :" + err.Error())
}
// println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType)
chunk, ok := conn.incommingChunks[ChunkStreamID]
if ChunkType != 3 && ok && chunk.AVData.Length > 0 {
// 如果块类型不为3,那么这个rtmp的body应该为空.
return nil, errors.New("incompleteRtmpBody error")
}
if !ok {
chunk = &Chunk{}
conn.incommingChunks[ChunkStreamID] = chunk
}
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
return nil, errors.New("get chunk type error :" + err.Error())
}
msgLen := int(chunk.MessageLength)
needRead := conn.readChunkSize
if unRead := msgLen - chunk.AVData.Length; unRead < needRead {
needRead = unRead
}
mem := conn.bytePool.Get(needRead)
if n, err := conn.ReadFull(mem); err != nil {
conn.bytePool.Put(mem)
return nil, err
} else {
conn.readSeqNum += uint32(n)
}
chunk.AVData.Data = append(chunk.AVData.Data, mem)
if chunk.AVData.ReadFromBytes(mem); chunk.AVData.Length == msgLen {
chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp
msg = chunk
switch chunk.MessageTypeID {
case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
msg.AVData.Timestamp = chunk.ChunkHeader.ExtendTimestamp
default:
err = GetRtmpMessage(msg, msg.AVData.ToBytes())
msg.AVData.Recycle()
}
conn.incommingChunks[ChunkStreamID] = &Chunk{
ChunkHeader: chunk.ChunkHeader,
}
}
return
}
func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32, err error) {
chunkStreamID = csid
switch csid {
case 0:
{
u8, err := conn.ReadByte()
conn.readSeqNum++
if err != nil {
return 0, err
}
chunkStreamID = 64 + uint32(u8)
}
case 1:
{
u16_0, err1 := conn.ReadByte()
if err1 != nil {
return 0, err1
}
u16_1, err1 := conn.ReadByte()
if err1 != nil {
return 0, err1
}
conn.readSeqNum += 2
chunkStreamID = 64 + uint32(u16_0) + (uint32(u16_1) << 8)
}
}
return chunkStreamID, nil
}
func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) {
conn.tmpBuf.Reset()
b4 := conn.tmpBuf.Malloc(4)
b3 := b4[:3]
if chunkType == 3 {
// 3个字节的时间戳
} else {
// Timestamp 3 bytes
if _, err = conn.ReadFull(b3); err != nil {
return err
}
util.GetBE(b3, &h.Timestamp)
if chunkType != 2 {
if _, err = conn.ReadFull(b3); err != nil {
return err
}
util.GetBE(b3, &h.MessageLength)
// Message Type ID 1 bytes
if h.MessageTypeID, err = conn.ReadByte(); err != nil {
return err
}
conn.readSeqNum++
if chunkType == 0 {
// Message Stream ID 4bytes
if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID
return err
}
h.MessageStreamID = binary.LittleEndian.Uint32(b4)
}
}
}
// ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
if _, err = conn.ReadFull(b4); err != nil {
return err
}
util.GetBE(b4, &h.Timestamp)
}
if chunkType == 0 {
h.ExtendTimestamp = h.Timestamp
h.Timestamp = 0
}
return nil
}
func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
if conn.readSeqNum >= conn.bandwidth {
conn.totalRead += conn.readSeqNum
conn.readSeqNum = 0
err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalRead))
}
for msg == nil && err == nil {
if msg, err = conn.readChunk(); msg != nil && err == nil {
switch msg.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE:
conn.readChunkSize = int(msg.MsgData.(Uint32Message))
// RTMPPlugin.Info("msg read chunk size", zap.Int("readChunkSize", conn.readChunkSize))
case RTMP_MSG_ABORT:
delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message)))
case RTMP_MSG_ACK, RTMP_MSG_EDGE:
case RTMP_MSG_USER_CONTROL:
if _, ok := msg.MsgData.(*PingRequestMessage); ok {
conn.SendUserControl(RTMP_USER_PING_RESPONSE)
}
case RTMP_MSG_ACK_SIZE:
conn.bandwidth = uint32(msg.MsgData.(Uint32Message))
case RTMP_MSG_BANDWIDTH:
conn.bandwidth = msg.MsgData.(*SetPeerBandwidthMessage).AcknowledgementWindowsize
case RTMP_MSG_AMF0_COMMAND, RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
return msg, err
}
}
}
return
}
func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
if conn == nil {
return errors.New("connection is nil")
}
if conn.writeSeqNum > conn.bandwidth {
conn.totalWrite += conn.writeSeqNum
conn.writeSeqNum = 0
err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0)
}
for !conn.writing.CompareAndSwap(false, true) {
runtime.Gosched()
}
defer conn.writing.Store(false)
conn.tmpBuf.Reset()
amf := AMF{conn.tmpBuf}
if conn.ObjectEncoding == 0 {
msg.Encode(&amf)
} else {
amf := AMF3{AMF: amf}
msg.Encode(&amf)
}
conn.tmpBuf = amf.Buffer
head := newChunkHeader(t)
head.MessageLength = uint32(conn.tmpBuf.Len())
if sid, ok := msg.(HaveStreamID); ok {
head.MessageStreamID = sid.GetStreamID()
}
head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader)
for _, chunk := range conn.tmpBuf.Split(conn.WriteChunkSize) {
conn.sendChunk(chunk)
}
return nil
}
func (conn *NetConnection) sendChunk(writeBuffer ...[]byte) error {
if n, err := conn.Write(conn.chunkHeader); err != nil {
return err
} else {
conn.writeSeqNum += uint32(n)
}
buf := net.Buffers(writeBuffer)
n, err := buf.WriteTo(conn)
conn.writeSeqNum += uint32(n)
return err
}

182
plugin/rtmp/pkg/server.go Normal file
View File

@@ -0,0 +1,182 @@
package pkg
type NetStream struct {
*NetConnection
StreamID uint32
}
func (ns *NetStream) Begin() {
ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID)
}
// type RTMPSubscriber struct {
// RTMPSender
// }
// func (s *RTMPSubscriber) OnEvent(event any) {
// switch event.(type) {
// case engine.SEclose:
// s.Response(0, NetStream_Play_Stop, Level_Status)
// }
// s.RTMPSender.OnEvent(event)
// }
// func (config *RTMPConfig) ServeTCP(conn net.Conn) {
// defer conn.Close()
// logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String()))
// senders := make(map[uint32]*RTMPSubscriber)
// receivers := make(map[uint32]*RTMPReceiver)
// var err error
// logger.Info("conn")
// defer func() {
// ze := zap.Error(err)
// logger.Info("conn close", ze)
// for _, sender := range senders {
// sender.Stop(ze)
// }
// for _, receiver := range receivers {
// receiver.Stop(ze)
// }
// }()
// nc := NewNetConnection(conn)
// ctx, cancel := context.WithCancel(engine.Engine)
// defer cancel()
// /* Handshake */
// if err = nc.Handshake(); err != nil {
// logger.Error("handshake", zap.Error(err))
// return
// }
// var msg *Chunk
// var gstreamid uint32
// for {
// if msg, err = nc.RecvMessage(); err == nil {
// if msg.MessageLength <= 0 {
// continue
// }
// switch msg.MessageTypeID {
// case RTMP_MSG_AMF0_COMMAND:
// if msg.MsgData == nil {
// break
// }
// cmd := msg.MsgData.(Commander).GetCommand()
// logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
// switch cmd := msg.MsgData.(type) {
// case *CallMessage: //connect
// app := cmd.Object["app"] // 客户端要连接到的服务应用名
// objectEncoding := cmd.Object["objectEncoding"] // AMF编码方法
// switch v := objectEncoding.(type) {
// case float64:
// nc.objectEncoding = v
// default:
// nc.objectEncoding = 0
// }
// nc.appName = app.(string)
// logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
// err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
// nc.writeChunkSize = config.ChunkSize
// err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
// err = nc.SendMessage(RTMP_MSG_BANDWIDTH, &SetPeerBandwidthMessage{
// AcknowledgementWindowsize: uint32(512 << 10),
// LimitType: byte(2),
// })
// err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0)
// m := new(ResponseConnectMessage)
// m.CommandName = Response_Result
// m.TransactionId = 1
// m.Properties = map[string]any{
// "fmsVer": "monibuca/" + engine.Engine.Version,
// "capabilities": 31,
// "mode": 1,
// "Author": "dexter",
// }
// m.Infomation = map[string]any{
// "level": Level_Status,
// "code": NetConnection_Connect_Success,
// "objectEncoding": nc.objectEncoding,
// }
// err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// case *CommandMessage: // "createStream"
// gstreamid++
// logger.Info("createStream:", zap.Uint32("streamId", gstreamid))
// nc.ResponseCreateStream(cmd.TransactionId, gstreamid)
// case *CURDStreamMessage:
// if stream, ok := receivers[cmd.StreamId]; ok {
// stream.Stop()
// delete(senders, cmd.StreamId)
// }
// case *ReleaseStreamMessage:
// // m := &CommandMessage{
// // CommandName: "releaseStream_error",
// // TransactionId: cmd.TransactionId,
// // }
// // s := engine.Streams.Get(nc.appName + "/" + cmd.StreamName)
// // if s != nil && s.Publisher != nil {
// // if p, ok := s.Publisher.(*RTMPReceiver); ok {
// // // m.CommandName = "releaseStream_result"
// // p.Stop()
// // delete(receivers, p.StreamID)
// // }
// // }
// // err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// case *PublishMessage:
// receiver := &RTMPReceiver{
// NetStream: NetStream{
// NetConnection: nc,
// StreamID: cmd.StreamId,
// },
// }
// receiver.SetParentCtx(ctx)
// if !config.KeepAlive {
// receiver.SetIO(conn)
// }
// if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
// receivers[cmd.StreamId] = receiver
// receiver.Begin()
// err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status)
// } else {
// delete(receivers, cmd.StreamId)
// err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
// }
// case *PlayMessage:
// streamPath := nc.appName + "/" + cmd.StreamName
// sender := &RTMPSubscriber{}
// sender.NetStream = NetStream{
// nc,
// cmd.StreamId,
// }
// sender.SetParentCtx(ctx)
// if !config.KeepAlive {
// sender.SetIO(conn)
// }
// sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
// if RTMPPlugin.Subscribe(streamPath, sender) != nil {
// sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
// } else {
// senders[sender.StreamID] = sender
// sender.Begin()
// sender.Response(cmd.TransactionId, NetStream_Play_Reset, Level_Status)
// sender.Response(cmd.TransactionId, NetStream_Play_Start, Level_Status)
// go sender.PlayRaw()
// }
// }
// case RTMP_MSG_AUDIO:
// if r, ok := receivers[msg.MessageStreamID]; ok {
// r.ReceiveAudio(msg)
// } else {
// logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID))
// }
// case RTMP_MSG_VIDEO:
// if r, ok := receivers[msg.MessageStreamID]; ok {
// r.ReceiveVideo(msg)
// } else {
// logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID))
// }
// }
// } else if err == io.EOF || err == io.ErrUnexpectedEOF {
// logger.Info("rtmp client closed")
// return
// } else {
// logger.Warn("ReadMessage", zap.Error(err))
// return
// }
// }
// }

View File

@@ -1,7 +1,6 @@
package pkg
import (
"net"
"time"
. "m7s.live/m7s/v5/pkg"
@@ -10,13 +9,11 @@ import (
)
type RTMPVideo struct {
Timestamp uint32
net.Buffers
RTMPData
}
func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
var reader util.Buffers
reader.Buffers = avcc.Buffers
reader := avcc.Buffers
b0, err := reader.ReadByte()
if err != nil {
return err
@@ -120,9 +117,9 @@ func (avcc *RTMPVideo) parseAV1(track *AVTrack, reader *util.Buffers) (any, erro
return obus, nil
}
func (avcc *RTMPVideo) ToRaw(track *AVTrack) (any, error) {
var reader util.Buffers
reader.Buffers = avcc.Buffers
reader := avcc.Buffers
b0, err := reader.ReadByte()
if err != nil {
return nil, err

View File

@@ -13,7 +13,7 @@ import (
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
var Version = "v5.0.0"
type Server struct {
StartTime time.Time
context.Context