fix: alias bug

This commit is contained in:
langhuihui
2025-01-14 18:56:58 +08:00
parent 8effa750c5
commit 413b83c215
8 changed files with 571 additions and 5 deletions

View File

@@ -121,7 +121,7 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
} else {
s.Waiting.WakeUp(req.Alias, publisher)
}
} else {
} else if ok {
aliasInfo.Publisher = aliasStream
}
if aliasInfo.Publisher != nil {

1
api.go
View File

@@ -556,6 +556,7 @@ func (s *Server) SetStreamSpeed(ctx context.Context, req *pb.SetStreamSpeedReque
if s, ok := s.Streams.Get(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
return nil
})

47
doc_CN/relay.md Normal file
View File

@@ -0,0 +1,47 @@
# 核心转发流程
## 发布者
发布者Publisher 是用来在服务器上向 RingBuffer 中写入音视频数据的对象。对外暴露 WriteVideo 和 WriteAudio 方法。
在写入 WriteVideo 和 WriteAudio 时会创建 Track解析数据生成 ICodecCtx。启动发布只需要调用 Plugin 的 Publish 方法即可。
### 接受推流
rtmp、rtsp 等插件会监听一个端口用来接受推流。
### 从远端拉流
- 实现了 OnPullProxyAdd 方法的插件,可以从远端拉流。
- 继承自 HTTPFilePuller 的插件,可以从 http 或文件中拉流。
### 从本地录像文件中拉流
继承自 RecordFilePuller 的插件,可以从本地录像文件中拉流。
## 订阅者
订阅者Subscriber 是用来从 RingBuffer 中读取音视频数据的对象。订阅流分两个步骤:
1. 调用 Plugin 的 Subscribe 方法,传入 StreamPath 和 Subscribe 配置。
2. 调用 PlayBlock 方法,开始读取数据,这个方法会阻塞直到订阅结束。
之所以分两个步骤的原因是,第一步可能会失败(超时等),也可能需要等待第一步成功后进行一些交互工作。
第一步会有一定时间的阻塞,会等待发布者(如果开始没有发布者)、会等待发布者的轨道创建完成。
### 接受拉流
例如 rtmp、rtsp 插件,会监听一个端口,来接受播放的请求。
### 向远端推流
- 实现了 OnPushProxyAdd 方法的插件,可以向远端推流。
### 写入本地文件
包含录像功能的插件,需要先订阅流,才能写入本地文件。
## 按需拉流(发布)
由订阅者触发,当调用 plugin 的 OnSubscribe 时,会通知所有插件,有订阅的需求,此时插件可以响应这个需求,发布一个流。例如拉取录像流,就是这一类。此时必须要注意的是需要通过正则表达式配置,防止同时发布。

View File

@@ -130,9 +130,9 @@ func (c *Collection[K, T]) Get(key K) (item T, ok bool) {
}
if c.m != nil {
item, ok = c.m[key]
return item, ok
return
}
for _, item = range c.Items {
for _, item := range c.Items {
if item.GetKey() == key {
return item, true
}
@@ -145,7 +145,7 @@ func (c *Collection[K, T]) Find(f func(T) bool) (item T, ok bool) {
c.L.RLock()
defer c.L.RUnlock()
}
for _, item = range c.Items {
for _, item := range c.Items {
if f(item) {
return item, true
}

170
pkg/util/collection_test.go Normal file
View File

@@ -0,0 +1,170 @@
package util
import (
"fmt"
"sync"
"testing"
)
type Class struct {
name string
Id int
}
func (n *Class) GetKey() string {
return n.name
}
var cc Collection[string, *Class]
func TestCollection(t *testing.T) {
for i := 0; i < 10; i++ {
cc.Add(&Class{name: fmt.Sprintf("%d", i), Id: i})
}
cc.RemoveByKey("1")
if item, ok := cc.Get("1"); ok {
fmt.Println(item)
} else {
fmt.Println("not found", item)
}
}
// TestItem 是用于测试的结构体
type TestItem struct {
ID string
Data string
}
func (t TestItem) GetKey() string {
return t.ID
}
func TestCollection_BasicOperations(t *testing.T) {
c := &Collection[string, TestItem]{
L: &sync.RWMutex{},
}
// 测试 Add
item1 := TestItem{ID: "1", Data: "test1"}
c.Add(item1)
if c.Length != 1 {
t.Errorf("Expected length 1, got %d", c.Length)
}
// 测试 Get
got, ok := c.Get("1")
if !ok || got != item1 {
t.Errorf("Expected to get item1, got %v", got)
}
// 测试 AddUnique
ok = c.AddUnique(item1)
if ok || c.Length != 1 {
t.Error("AddUnique should not add duplicate item")
}
// 测试 Set
item1Modified := TestItem{ID: "1", Data: "test1-modified"}
added := c.Set(item1Modified)
if added {
t.Error("Set should not return true for existing item")
}
got, _ = c.Get("1")
if got.Data != "test1-modified" {
t.Errorf("Expected modified data, got %s", got.Data)
}
// 测试 Remove
if !c.Remove(item1) {
t.Error("Remove should return true for existing item")
}
if c.Length != 0 {
t.Errorf("Expected length 0 after remove, got %d", c.Length)
}
}
func TestCollection_Events(t *testing.T) {
c := &Collection[string, TestItem]{}
var addCalled, removeCalled bool
c.OnAdd(func(item TestItem) {
addCalled = true
if item.ID != "1" {
t.Errorf("Expected item ID 1, got %s", item.ID)
}
})
c.OnRemove(func(item TestItem) {
removeCalled = true
if item.ID != "1" {
t.Errorf("Expected item ID 1, got %s", item.ID)
}
})
item := TestItem{ID: "1", Data: "test"}
c.Add(item)
if !addCalled {
t.Error("Add listener was not called")
}
c.Remove(item)
if !removeCalled {
t.Error("Remove listener was not called")
}
}
func TestCollection_Search(t *testing.T) {
c := &Collection[string, TestItem]{}
items := []TestItem{
{ID: "1", Data: "test1"},
{ID: "2", Data: "test2"},
{ID: "3", Data: "test1"},
}
for _, item := range items {
c.Add(item)
}
// 测试 Find
found, ok := c.Find(func(item TestItem) bool {
return item.Data == "test1"
})
if !ok || found.ID != "1" {
t.Error("Find should return first matching item")
}
// 测试 Search
count := 0
search := c.Search(func(item TestItem) bool {
return item.Data == "test1"
})
search(func(item TestItem) bool {
count++
return true
})
if count != 2 {
t.Errorf("Search should find 2 items, found %d", count)
}
}
func TestCollection_ConcurrentAccess(t *testing.T) {
c := &Collection[string, TestItem]{
L: &sync.RWMutex{},
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
item := TestItem{ID: string(rune(id)), Data: "test"}
c.Add(item)
}(i)
}
wg.Wait()
if c.Length != 100 {
t.Errorf("Expected 100 items, got %d", c.Length)
}
}

306
plugin/README.md Normal file
View File

@@ -0,0 +1,306 @@
# Plugin Development Guide
## 1. Prerequisites
### Development Tools
- Visual Studio Code
- Goland
- Cursor
### Install gRPC
```shell
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
```
### Install gRPC-Gateway
```shell
$ go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```
### Project Setup
- Create a Go project, e.g., `MyPlugin`
- Create a `pkg` directory for exportable code
- Create a `pb` directory for gRPC proto files
- Create an `example` directory for testing the plugin
> You can also create a directory `xxx` directly in the monibuca project's plugin folder to store your plugin code
## 2. Create a Plugin
```go
package plugin_myplugin
import (
"m7s.live/v5"
)
var _ = m7s.InstallPlugin[MyPlugin]()
type MyPlugin struct {
m7s.Plugin
Foo string
}
```
- `MyPlugin` struct is the plugin definition, `Foo` is a plugin property that can be configured in the configuration file
- Must embed `m7s.Plugin` struct to provide basic plugin functionality
- `m7s.InstallPlugin[MyPlugin](...)` registers the plugin so it can be loaded by monibuca
### Provide Default Configuration
Example:
```go
const defaultConfig = m7s.DefaultYaml(`tcp:
listenaddr: :5554`)
var _ = m7s.InstallPlugin[MyPlugin](defaultConfig)
```
## 3. Implement Event Callbacks (Optional)
### Initialization Callback
```go
func (config *MyPlugin) OnInit() (err error) {
// Initialize things
return
}
```
Used for plugin initialization after configuration is loaded. Return an error if initialization fails, and the plugin will be disabled.
### TCP Request Callback
```go
func (config *MyPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
}
```
Called when receiving TCP connection requests if TCP listening port is configured.
### UDP Request Callback
```go
func (config *MyPlugin) OnUDPConnect(conn *net.UDPConn) task.ITask {
}
```
Called when receiving UDP connection requests if UDP listening port is configured.
### QUIC Request Callback
```go
func (config *MyPlugin) OnQUICConnect(quic.Connection) task.ITask {
}
```
Called when receiving QUIC connection requests if QUIC listening port is configured.
## 4. HTTP Interface Callbacks
### Legacy v4 Callback Style
```go
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}
```
Accessible via `http://ip:port/myplugin/api/test1`
### Route Mapping Configuration
This method supports parameterized routing:
```go
func (config *MyPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/test1/{streamPath...}": config.test1,
}
}
func (config *MyPlugin) test1(rw http.ResponseWriter, r *http.Request) {
streamPath := r.PathValue("streamPath")
// do something
}
```
## 5. Implement Push/Pull Clients
### Implement Push Client
Push client needs to implement IPusher interface and pass the creation method to InstallPlugin.
```go
type Pusher struct {
pullCtx m7s.PullJob
}
func (c *Pusher) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func NewPusher(_ config.Push) m7s.IPusher {
return &Pusher{}
}
var _ = m7s.InstallPlugin[MyPlugin](NewPusher)
```
### Implement Pull Client
Pull client needs to implement IPuller interface and pass the creation method to InstallPlugin.
The following Puller inherits from m7s.HTTPFilePuller for basic file and HTTP pulling:
```go
type Puller struct {
m7s.HTTPFilePuller
}
func NewPuller(_ config.Pull) m7s.IPuller {
return &Puller{}
}
var _ = m7s.InstallPlugin[MyPlugin](NewPuller)
```
## 6. Implement gRPC Service
### Create `myplugin.proto` in `pb` Directory
```proto
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package myplugin;
option go_package="m7s.live/v5/plugin/myplugin/pb";
service api {
rpc MyMethod (MyRequest) returns (MyResponse) {
option (google.api.http) = {
post: "/myplugin/api/bar"
body: "foo"
};
}
}
message MyRequest {
string foo = 1;
}
message MyResponse {
string bar = 1;
}
```
### Generate gRPC Code
Add to VSCode task.json:
```json
{
"type": "shell",
"label": "build pb myplugin",
"command": "protoc",
"args": [
"-I.",
"-I${workspaceRoot}/pb",
"--go_out=.",
"--go_opt=paths=source_relative",
"--go-grpc_out=.",
"--go-grpc_opt=paths=source_relative",
"--grpc-gateway_out=.",
"--grpc-gateway_opt=paths=source_relative",
"myplugin.proto"
],
"options": {
"cwd": "${workspaceRoot}/plugin/myplugin/pb"
}
}
```
Or run command in pb directory:
```shell
protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative myplugin.proto
```
Replace `$ProjectFileDir$` with the directory containing global pb files.
### Implement gRPC Service
Create api.go:
```go
package plugin_myplugin
import (
"context"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/myplugin/pb"
)
func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.MyResponse, error) {
return &pb.MyResponse{Bar: req.Foo}, nil
}
```
### Register gRPC Service
```go
package plugin_myplugin
import (
"m7s.live/v5"
"m7s.live/v5/plugin/myplugin/pb"
)
var _ = m7s.InstallPlugin[MyPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)
type MyPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Foo string
}
```
### Additional RESTful Endpoints
Same as v4:
```go
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}
```
Accessible via GET request to `/myplugin/api/test1`
## 7. Publishing Streams
```go
publisher, err = p.Publish(streamPath, connectInfo)
```
The last two parameters are optional.
After obtaining the `publisher`, you can publish audio/video data using `publisher.WriteAudio` and `publisher.WriteVideo`.
### Define Audio/Video Data
If existing audio/video data formats don't meet your needs, you can define custom formats by implementing this interface:
```go
IAVFrame interface {
GetAllocator() *util.ScalableMemoryAllocator
SetAllocator(*util.ScalableMemoryAllocator)
Parse(*AVTrack) error
ConvertCtx(codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error)
Demux(codec.ICodecCtx) (any, error)
Mux(codec.ICodecCtx, *AVFrame)
GetTimestamp() time.Duration
GetCTS() time.Duration
GetSize() int
Recycle()
String() string
Dump(byte, io.Writer)
}
```
> Define separate types for audio and video
- GetAllocator/SetAllocator: Automatically implemented when embedding RecyclableMemory
- Parse: Identifies key frames, sequence frames, and other important information
- ConvertCtx: Called when protocol conversion is needed
- Demux: Called when audio/video data needs to be demuxed
- Mux: Called when audio/video data needs to be muxed
- Recycle: Automatically implemented when embedding RecyclableMemory
- String: Prints audio/video data information
- GetSize: Gets the size of audio/video data
- GetTimestamp: Gets the timestamp in nanoseconds
- GetCTS: Gets the Composition Time Stamp in nanoseconds (PTS = DTS+CTS)
- Dump: Prints binary audio/video data
## 8. Subscribing to Streams
```go
var suber *m7s.Subscriber
suber, err = p.Subscribe(ctx,streamPath)
go m7s.PlayBlock(suber, handleAudio, handleVideo)
```
Note that handleAudio and handleVideo are callback functions you need to implement. They take an audio/video format type as input and return an error. If the error is not nil, the subscription is terminated.
## 9. Prometheus Integration
Just implement the Collector interface, and the system will automatically collect metrics from all plugins:
```go
func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) {
}
func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) {
}
```

View File

@@ -58,7 +58,7 @@ func GetVideoFrame(streamPath string, server *m7s.Server) (pkg.AnnexB, *pkg.AVTr
return pkg.AnnexB{}, nil, err
}
if track.ICodecCtx == nil {
return pkg.AnnexB{}, nil, fmt.Errorf("unsupported codec")
return pkg.AnnexB{}, nil, pkg.ErrUnsupportCodec
}
annexb.Mux(track.ICodecCtx, &reader.Value)

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
"os"
@@ -14,6 +15,8 @@ import (
"strings"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/shirou/gopsutil/v4/cpu"
"google.golang.org/protobuf/proto"
@@ -268,6 +271,7 @@ func (s *Server) Start() (err error) {
"/api/stream/annexb/{streamPath...}": s.api_Stream_AnnexB_,
"/api/videotrack/sse/{streamPath...}": s.api_VideoTrack_SSE,
"/api/audiotrack/sse/{streamPath...}": s.api_AudioTrack_SSE,
"/annexb/{streamPath...}": s.annexB,
})
if s.config.DSN != "" {
@@ -763,3 +767,41 @@ func (s *Server) AuthInterceptor() grpc.UnaryServerInterceptor {
return handler(newCtx, req)
}
}
func (s *Server) annexB(w http.ResponseWriter, r *http.Request) {
streamPath := r.PathValue("streamPath")
if r.URL.RawQuery != "" {
streamPath += "?" + r.URL.RawQuery
}
var conf = s.config.Subscribe
conf.SubType = SubscribeTypeServer
conf.SubAudio = false
suber, err := s.SubscribeWithConfig(r.Context(), streamPath, conf)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var conn net.Conn
conn, err = suber.CheckWebSocket(w, r)
if err != nil {
return
}
if conn == nil {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Transfer-Encoding", "identity")
w.WriteHeader(http.StatusOK)
}
PlayBlock(suber, func(frame *pkg.AVFrame) (err error) {
return nil
}, func(frame *pkg.AnnexB) (err error) {
if conn != nil {
return wsutil.WriteServerMessage(conn, ws.OpBinary, util.ConcatBuffers(frame.Memory.Buffers))
}
var buf net.Buffers
buf = append(buf, frame.Memory.Buffers...)
buf.WriteTo(w)
return nil
})
}