diff --git a/alias.go b/alias.go index f5aff6b..3e9262d 100644 --- a/alias.go +++ b/alias.go @@ -121,7 +121,7 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque } else { s.Waiting.WakeUp(req.Alias, publisher) } - } else { + } else if ok { aliasInfo.Publisher = aliasStream } if aliasInfo.Publisher != nil { diff --git a/api.go b/api.go index 5d7af0d..407a4fc 100644 --- a/api.go +++ b/api.go @@ -556,6 +556,7 @@ func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedReque if s, ok := s.Streams.Get(req.StreamPath); ok { s.Speed = float64(req.Speed) s.Scale = float64(req.Speed) + s.Info("set stream speed", "speed", req.Speed) } return nil }) diff --git a/doc_CN/relay.md b/doc_CN/relay.md new file mode 100644 index 0000000..c1c856c --- /dev/null +++ b/doc_CN/relay.md @@ -0,0 +1,47 @@ +# 核心转发流程 + +## 发布者 + +发布者(Publisher) 是用来在服务器上向 RingBuffer 中写入音视频数据的对象。对外暴露 WriteVideo 和 WriteAudio 方法。 +在写入 WriteVideo 和 WriteAudio 时会创建 Track,解析数据,生成 ICodecCtx。启动发布只需要调用 Plugin 的 Publish 方法即可。 + +### 接受推流 + +rtmp、rtsp 等插件会监听一个端口用来接受推流。 + +### 从远端拉流 + +- 实现了 OnPullProxyAdd 方法的插件,可以从远端拉流。 +- 继承自 HTTPFilePuller 的插件,可以从 http 或文件中拉流。 + +### 从本地录像文件中拉流 + +继承自 RecordFilePuller 的插件,可以从本地录像文件中拉流。 + + +## 订阅者 + +订阅者(Subscriber) 是用来从 RingBuffer 中读取音视频数据的对象。订阅流分两个步骤: + +1. 调用 Plugin 的 Subscribe 方法,传入 StreamPath 和 Subscribe 配置。 +2. 调用 PlayBlock 方法,开始读取数据,这个方法会阻塞直到订阅结束。 + +之所以分两个步骤的原因是,第一步可能会失败(超时等),也可能需要等待第一步成功后进行一些交互工作。 +第一步会有一定时间的阻塞,会等待发布者(如果开始没有发布者)、会等待发布者的轨道创建完成。 + +### 接受拉流 + +例如 rtmp、rtsp 插件,会监听一个端口,来接受播放的请求。 + +### 向远端推流 + +- 实现了 OnPushProxyAdd 方法的插件,可以向远端推流。 + +### 写入本地文件 + +包含录像功能的插件,需要先订阅流,才能写入本地文件。 + +## 按需拉流(发布) + +由订阅者触发,当调用 plugin 的 OnSubscribe 时,会通知所有插件,有订阅的需求,此时插件可以响应这个需求,发布一个流。例如拉取录像流,就是这一类。此时必须要注意的是需要通过正则表达式配置,防止同时发布。 + diff --git a/pkg/util/collection.go b/pkg/util/collection.go index 9833020..7d7130a 100644 --- a/pkg/util/collection.go +++ b/pkg/util/collection.go @@ -130,9 +130,9 @@ func (c *Collection[K, T]) Get(key K) (item T, ok bool) { } if c.m != nil { item, ok = c.m[key] - return item, ok + return } - for _, item = range c.Items { + for _, item := range c.Items { if item.GetKey() == key { return item, true } @@ -145,7 +145,7 @@ func (c *Collection[K, T]) Find(f func(T) bool) (item T, ok bool) { c.L.RLock() defer c.L.RUnlock() } - for _, item = range c.Items { + for _, item := range c.Items { if f(item) { return item, true } diff --git a/pkg/util/collection_test.go b/pkg/util/collection_test.go new file mode 100644 index 0000000..e6fdc30 --- /dev/null +++ b/pkg/util/collection_test.go @@ -0,0 +1,170 @@ +package util + +import ( + "fmt" + "sync" + "testing" +) + +type Class struct { + name string + Id int +} + +func (n *Class) GetKey() string { + return n.name +} + +var cc Collection[string, *Class] + +func TestCollection(t *testing.T) { + + for i := 0; i < 10; i++ { + cc.Add(&Class{name: fmt.Sprintf("%d", i), Id: i}) + } + cc.RemoveByKey("1") + if item, ok := cc.Get("1"); ok { + fmt.Println(item) + } else { + fmt.Println("not found", item) + } +} + +// TestItem 是用于测试的结构体 +type TestItem struct { + ID string + Data string +} + +func (t TestItem) GetKey() string { + return t.ID +} + +func TestCollection_BasicOperations(t *testing.T) { + c := &Collection[string, TestItem]{ + L: &sync.RWMutex{}, + } + + // 测试 Add + item1 := TestItem{ID: "1", Data: "test1"} + c.Add(item1) + if c.Length != 1 { + t.Errorf("Expected length 1, got %d", c.Length) + } + + // 测试 Get + got, ok := c.Get("1") + if !ok || got != item1 { + t.Errorf("Expected to get item1, got %v", got) + } + + // 测试 AddUnique + ok = c.AddUnique(item1) + if ok || c.Length != 1 { + t.Error("AddUnique should not add duplicate item") + } + + // 测试 Set + item1Modified := TestItem{ID: "1", Data: "test1-modified"} + added := c.Set(item1Modified) + if added { + t.Error("Set should not return true for existing item") + } + got, _ = c.Get("1") + if got.Data != "test1-modified" { + t.Errorf("Expected modified data, got %s", got.Data) + } + + // 测试 Remove + if !c.Remove(item1) { + t.Error("Remove should return true for existing item") + } + if c.Length != 0 { + t.Errorf("Expected length 0 after remove, got %d", c.Length) + } +} + +func TestCollection_Events(t *testing.T) { + c := &Collection[string, TestItem]{} + + var addCalled, removeCalled bool + c.OnAdd(func(item TestItem) { + addCalled = true + if item.ID != "1" { + t.Errorf("Expected item ID 1, got %s", item.ID) + } + }) + + c.OnRemove(func(item TestItem) { + removeCalled = true + if item.ID != "1" { + t.Errorf("Expected item ID 1, got %s", item.ID) + } + }) + + item := TestItem{ID: "1", Data: "test"} + c.Add(item) + if !addCalled { + t.Error("Add listener was not called") + } + + c.Remove(item) + if !removeCalled { + t.Error("Remove listener was not called") + } +} + +func TestCollection_Search(t *testing.T) { + c := &Collection[string, TestItem]{} + items := []TestItem{ + {ID: "1", Data: "test1"}, + {ID: "2", Data: "test2"}, + {ID: "3", Data: "test1"}, + } + + for _, item := range items { + c.Add(item) + } + + // 测试 Find + found, ok := c.Find(func(item TestItem) bool { + return item.Data == "test1" + }) + if !ok || found.ID != "1" { + t.Error("Find should return first matching item") + } + + // 测试 Search + count := 0 + search := c.Search(func(item TestItem) bool { + return item.Data == "test1" + }) + search(func(item TestItem) bool { + count++ + return true + }) + if count != 2 { + t.Errorf("Search should find 2 items, found %d", count) + } +} + +func TestCollection_ConcurrentAccess(t *testing.T) { + c := &Collection[string, TestItem]{ + L: &sync.RWMutex{}, + } + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + item := TestItem{ID: string(rune(id)), Data: "test"} + c.Add(item) + }(i) + } + wg.Wait() + + if c.Length != 100 { + t.Errorf("Expected 100 items, got %d", c.Length) + } +} diff --git a/plugin/README.md b/plugin/README.md new file mode 100644 index 0000000..e60c3c3 --- /dev/null +++ b/plugin/README.md @@ -0,0 +1,306 @@ +# Plugin Development Guide + +## 1. Prerequisites + +### Development Tools +- Visual Studio Code +- Goland +- Cursor + +### Install gRPC +```shell +$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28 +$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 +``` + +### Install gRPC-Gateway +```shell +$ go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest +$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +``` + +### Project Setup +- Create a Go project, e.g., `MyPlugin` +- Create a `pkg` directory for exportable code +- Create a `pb` directory for gRPC proto files +- Create an `example` directory for testing the plugin + +> You can also create a directory `xxx` directly in the monibuca project's plugin folder to store your plugin code + +## 2. Create a Plugin + +```go +package plugin_myplugin +import ( + "m7s.live/v5" +) + +var _ = m7s.InstallPlugin[MyPlugin]() + +type MyPlugin struct { + m7s.Plugin + Foo string +} +``` +- `MyPlugin` struct is the plugin definition, `Foo` is a plugin property that can be configured in the configuration file +- Must embed `m7s.Plugin` struct to provide basic plugin functionality +- `m7s.InstallPlugin[MyPlugin](...)` registers the plugin so it can be loaded by monibuca + +### Provide Default Configuration +Example: +```go +const defaultConfig = m7s.DefaultYaml(`tcp: + listenaddr: :5554`) + +var _ = m7s.InstallPlugin[MyPlugin](defaultConfig) +``` + +## 3. Implement Event Callbacks (Optional) + +### Initialization Callback +```go +func (config *MyPlugin) OnInit() (err error) { + // Initialize things + return +} +``` +Used for plugin initialization after configuration is loaded. Return an error if initialization fails, and the plugin will be disabled. + +### TCP Request Callback +```go +func (config *MyPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask { + +} +``` +Called when receiving TCP connection requests if TCP listening port is configured. + +### UDP Request Callback +```go +func (config *MyPlugin) OnUDPConnect(conn *net.UDPConn) task.ITask { + +} +``` +Called when receiving UDP connection requests if UDP listening port is configured. + +### QUIC Request Callback +```go +func (config *MyPlugin) OnQUICConnect(quic.Connection) task.ITask { + +} +``` +Called when receiving QUIC connection requests if QUIC listening port is configured. + +## 4. HTTP Interface Callbacks + +### Legacy v4 Callback Style +```go +func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) { + // do something +} +``` +Accessible via `http://ip:port/myplugin/api/test1` + +### Route Mapping Configuration +This method supports parameterized routing: +```go +func (config *MyPlugin) RegisterHandler() map[string]http.HandlerFunc { + return map[string]http.HandlerFunc{ + "/test1/{streamPath...}": config.test1, + } +} +func (config *MyPlugin) test1(rw http.ResponseWriter, r *http.Request) { + streamPath := r.PathValue("streamPath") + // do something +} +``` + +## 5. Implement Push/Pull Clients + +### Implement Push Client +Push client needs to implement IPusher interface and pass the creation method to InstallPlugin. +```go +type Pusher struct { + pullCtx m7s.PullJob +} + +func (c *Pusher) GetPullJob() *m7s.PullJob { + return &c.pullCtx +} + +func NewPusher(_ config.Push) m7s.IPusher { + return &Pusher{} +} +var _ = m7s.InstallPlugin[MyPlugin](NewPusher) +``` + +### Implement Pull Client +Pull client needs to implement IPuller interface and pass the creation method to InstallPlugin. +The following Puller inherits from m7s.HTTPFilePuller for basic file and HTTP pulling: +```go +type Puller struct { + m7s.HTTPFilePuller +} + +func NewPuller(_ config.Pull) m7s.IPuller { + return &Puller{} +} +var _ = m7s.InstallPlugin[MyPlugin](NewPuller) +``` + +## 6. Implement gRPC Service + +### Create `myplugin.proto` in `pb` Directory +```proto +syntax = "proto3"; +import "google/api/annotations.proto"; +import "google/protobuf/empty.proto"; +package myplugin; +option go_package="m7s.live/v5/plugin/myplugin/pb"; + +service api { + rpc MyMethod (MyRequest) returns (MyResponse) { + option (google.api.http) = { + post: "/myplugin/api/bar" + body: "foo" + }; + } +} +message MyRequest { + string foo = 1; +} +message MyResponse { + string bar = 1; +} +``` + +### Generate gRPC Code +Add to VSCode task.json: +```json +{ + "type": "shell", + "label": "build pb myplugin", + "command": "protoc", + "args": [ + "-I.", + "-I${workspaceRoot}/pb", + "--go_out=.", + "--go_opt=paths=source_relative", + "--go-grpc_out=.", + "--go-grpc_opt=paths=source_relative", + "--grpc-gateway_out=.", + "--grpc-gateway_opt=paths=source_relative", + "myplugin.proto" + ], + "options": { + "cwd": "${workspaceRoot}/plugin/myplugin/pb" + } +} +``` +Or run command in pb directory: +```shell +protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative myplugin.proto +``` +Replace `$ProjectFileDir$` with the directory containing global pb files. + +### Implement gRPC Service +Create api.go: +```go +package plugin_myplugin +import ( + "context" + "m7s.live/m7s/v5" + "m7s.live/m7s/v5/plugin/myplugin/pb" +) + +func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.MyResponse, error) { + return &pb.MyResponse{Bar: req.Foo}, nil +} +``` + +### Register gRPC Service +```go +package plugin_myplugin +import ( + "m7s.live/v5" + "m7s.live/v5/plugin/myplugin/pb" +) + +var _ = m7s.InstallPlugin[MyPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler) + +type MyPlugin struct { + pb.UnimplementedApiServer + m7s.Plugin + Foo string +} +``` + +### Additional RESTful Endpoints +Same as v4: +```go +func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) { + // do something +} +``` +Accessible via GET request to `/myplugin/api/test1` + +## 7. Publishing Streams + +```go +publisher, err = p.Publish(streamPath, connectInfo) +``` +The last two parameters are optional. + +After obtaining the `publisher`, you can publish audio/video data using `publisher.WriteAudio` and `publisher.WriteVideo`. + +### Define Audio/Video Data +If existing audio/video data formats don't meet your needs, you can define custom formats by implementing this interface: +```go +IAVFrame interface { + GetAllocator() *util.ScalableMemoryAllocator + SetAllocator(*util.ScalableMemoryAllocator) + Parse(*AVTrack) error + ConvertCtx(codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) + Demux(codec.ICodecCtx) (any, error) + Mux(codec.ICodecCtx, *AVFrame) + GetTimestamp() time.Duration + GetCTS() time.Duration + GetSize() int + Recycle() + String() string + Dump(byte, io.Writer) +} +``` +> Define separate types for audio and video + +- GetAllocator/SetAllocator: Automatically implemented when embedding RecyclableMemory +- Parse: Identifies key frames, sequence frames, and other important information +- ConvertCtx: Called when protocol conversion is needed +- Demux: Called when audio/video data needs to be demuxed +- Mux: Called when audio/video data needs to be muxed +- Recycle: Automatically implemented when embedding RecyclableMemory +- String: Prints audio/video data information +- GetSize: Gets the size of audio/video data +- GetTimestamp: Gets the timestamp in nanoseconds +- GetCTS: Gets the Composition Time Stamp in nanoseconds (PTS = DTS+CTS) +- Dump: Prints binary audio/video data + +## 8. Subscribing to Streams +```go +var suber *m7s.Subscriber +suber, err = p.Subscribe(ctx,streamPath) +go m7s.PlayBlock(suber, handleAudio, handleVideo) +``` +Note that handleAudio and handleVideo are callback functions you need to implement. They take an audio/video format type as input and return an error. If the error is not nil, the subscription is terminated. + +## 9. Prometheus Integration +Just implement the Collector interface, and the system will automatically collect metrics from all plugins: +```go +func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) { + +} + +func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) { + +} +``` \ No newline at end of file diff --git a/plugin/snap/pkg/transform.go b/plugin/snap/pkg/transform.go index 1b2914c..a1e733d 100644 --- a/plugin/snap/pkg/transform.go +++ b/plugin/snap/pkg/transform.go @@ -58,7 +58,7 @@ func GetVideoFrame(streamPath string, server *m7s.Server) (pkg.AnnexB, *pkg.AVTr return pkg.AnnexB{}, nil, err } if track.ICodecCtx == nil { - return pkg.AnnexB{}, nil, fmt.Errorf("unsupported codec") + return pkg.AnnexB{}, nil, pkg.ErrUnsupportCodec } annexb.Mux(track.ICodecCtx, &reader.Value) diff --git a/server.go b/server.go index b128971..f0cf565 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "net" "net/http" "net/url" "os" @@ -14,6 +15,8 @@ import ( "strings" "time" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "github.com/shirou/gopsutil/v4/cpu" "google.golang.org/protobuf/proto" @@ -268,6 +271,7 @@ func (s *Server) Start() (err error) { "/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_, "/api/videotrack/sse/{streamPath...}": s.api_VideoTrack_SSE, "/api/audiotrack/sse/{streamPath...}": s.api_AudioTrack_SSE, + "/annexb/{streamPath...}": s.annexB, }) if s.config.DSN != "" { @@ -763,3 +767,41 @@ func (s *Server) AuthInterceptor() grpc.UnaryServerInterceptor { return handler(newCtx, req) } } + +func (s *Server) annexB(w http.ResponseWriter, r *http.Request) { + streamPath := r.PathValue("streamPath") + + if r.URL.RawQuery != "" { + streamPath += "?" + r.URL.RawQuery + } + var conf = s.config.Subscribe + conf.SubType = SubscribeTypeServer + conf.SubAudio = false + suber, err := s.SubscribeWithConfig(r.Context(), streamPath, conf) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var conn net.Conn + conn, err = suber.CheckWebSocket(w, r) + if err != nil { + return + } + if conn == nil { + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Transfer-Encoding", "identity") + w.WriteHeader(http.StatusOK) + } + + PlayBlock(suber, func(frame *pkg.AVFrame) (err error) { + return nil + }, func(frame *pkg.AnnexB) (err error) { + if conn != nil { + return wsutil.WriteServerMessage(conn, ws.OpBinary, util.ConcatBuffers(frame.Memory.Buffers)) + } + var buf net.Buffers + buf = append(buf, frame.Memory.Buffers...) + buf.WriteTo(w) + return nil + }) +}