
- Refactor frame converter implementation - Update mp4 track to use ICodex - General refactoring and code improvements 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
15 KiB
Plugin Development Guide
1. Prerequisites
Development Tools
- Visual Studio Code
- Goland
- Cursor
- CodeBuddy
- Trae
- Qoder
- Claude Code
- Kiro
- Windsurf
Install gRPC
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Install gRPC-Gateway
$ go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Project Setup
- Create a Go project, e.g.,
MyPlugin
- Create a
pkg
directory for exportable code - Create a
pb
directory for gRPC proto files - Create an
example
directory for testing the plugin
You can also create a directory
xxx
directly in the monibuca project's plugin folder to store your plugin code
2. Create a Plugin
package plugin_myplugin
import (
"m7s.live/v5"
)
var _ = m7s.InstallPlugin[MyPlugin]()
type MyPlugin struct {
m7s.Plugin
Foo string
}
MyPlugin
struct is the plugin definition,Foo
is a plugin property that can be configured in the configuration file- Must embed
m7s.Plugin
struct to provide basic plugin functionality m7s.InstallPlugin[MyPlugin](...)
registers the plugin so it can be loaded by monibuca
Provide Default Configuration
Example:
const defaultConfig = m7s.DefaultYaml(`tcp:
listenaddr: :5554`)
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
DefaultYaml: defaultConfig,
})
3. Implement Event Callbacks (Optional)
Initialization Callback
func (config *MyPlugin) Start() (err error) {
// Initialize things
return
}
Used for plugin initialization after configuration is loaded. Return an error if initialization fails, and the plugin will be disabled.
TCP Request Callback
func (config *MyPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
}
Called when receiving TCP connection requests if TCP listening port is configured.
UDP Request Callback
func (config *MyPlugin) OnUDPConnect(conn *net.UDPConn) task.ITask {
}
Called when receiving UDP connection requests if UDP listening port is configured.
QUIC Request Callback
func (config *MyPlugin) OnQUICConnect(quic.Connection) task.ITask {
}
Called when receiving QUIC connection requests if QUIC listening port is configured.
4. HTTP Interface Callbacks
Legacy v4 Callback Style
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}
Accessible via http://ip:port/myplugin/api/test1
Route Mapping Configuration
This method supports parameterized routing:
func (config *MyPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/test1/{streamPath...}": config.test1,
}
}
func (config *MyPlugin) test1(rw http.ResponseWriter, r *http.Request) {
streamPath := r.PathValue("streamPath")
// do something
}
5. Implement Push/Pull Clients
Implement Push Client
Push client needs to implement IPusher interface and pass the creation method to InstallPlugin.
type Pusher struct {
task.Task
pushJob m7s.PushJob
}
func (c *Pusher) GetPushJob() *m7s.PushJob {
return &c.pushJob
}
func NewPusher(_ config.Push) m7s.IPusher {
return &Pusher{}
}
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
NewPusher: NewPusher,
})
Implement Pull Client
Pull client needs to implement IPuller interface and pass the creation method to InstallPlugin. The following Puller inherits from m7s.HTTPFilePuller for basic file and HTTP pulling. You need to override the Start method for specific pulling logic:
type Puller struct {
m7s.HTTPFilePuller
}
func NewPuller(_ config.Pull) m7s.IPuller {
return &Puller{}
}
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
NewPuller: NewPuller,
})
6. Implement gRPC Service
Create myplugin.proto
in pb
Directory
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package myplugin;
option go_package="m7s.live/v5/plugin/myplugin/pb";
service api {
rpc MyMethod (MyRequest) returns (MyResponse) {
option (google.api.http) = {
post: "/myplugin/api/bar"
body: "foo"
};
}
}
message MyRequest {
string foo = 1;
}
message MyResponse {
string bar = 1;
}
Generate gRPC Code
Add to VSCode task.json:
{
"type": "shell",
"label": "build pb myplugin",
"command": "protoc",
"args": [
"-I.",
"-I${workspaceRoot}/pb",
"--go_out=.",
"--go_opt=paths=source_relative",
"--go-grpc_out=.",
"--go-grpc_opt=paths=source_relative",
"--grpc-gateway_out=.",
"--grpc-gateway_opt=paths=source_relative",
"myplugin.proto"
],
"options": {
"cwd": "${workspaceRoot}/plugin/myplugin/pb"
}
}
Or run command in pb directory:
protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative myplugin.proto
Replace $ProjectFileDir$
with the directory containing global pb files.
Implement gRPC Service
Create api.go:
package plugin_myplugin
import (
"context"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/plugin/myplugin/pb"
)
func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.MyResponse, error) {
return &pb.MyResponse{Bar: req.Foo}, nil
}
Register gRPC Service
package plugin_myplugin
import (
"m7s.live/v5"
"m7s.live/v5/plugin/myplugin/pb"
)
var _ = m7s.InstallPlugin[MyPlugin](m7s.PluginMeta{
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
})
type MyPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Foo string
}
Additional RESTful Endpoints
Same as v4:
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
// do something
}
Accessible via GET request to /myplugin/api/test1
7. Publishing Streams
publisher, err := p.Publish(ctx, streamPath)
The ctx
parameter is required, streamPath
parameter is required.
Writing Audio/Video Data
The old WriteAudio
and WriteVideo
methods have been replaced with a more structured writer pattern using generics:
Create Writers
// Audio writer
audioWriter := m7s.NewPublishAudioWriter[*AudioFrame](publisher, allocator)
// Video writer
videoWriter := m7s.NewPublishVideoWriter[*VideoFrame](publisher, allocator)
// Combined audio/video writer
writer := m7s.NewPublisherWriter[*AudioFrame, *VideoFrame](publisher, allocator)
Write Frames
// Set timestamp and write audio frame
writer.AudioFrame.SetTS32(timestamp)
err := writer.NextAudio()
// Set timestamp and write video frame
writer.VideoFrame.SetTS32(timestamp)
err := writer.NextVideo()
Write Custom Data
// For custom data frames
err := publisher.WriteData(data IDataFrame)
Define Audio/Video Data
If existing audio/video data formats don't meet your needs, you can define custom formats by implementing this interface:
IAVFrame interface {
GetSample() *Sample
GetSize() int
CheckCodecChange() error
Demux() error // demux to raw format
Mux(*Sample) error // mux from origin format
Recycle()
String() string
}
Define separate types for audio and video
The methods serve the following purposes:
- GetSample: Gets the Sample object containing codec context and raw data
- GetSize: Gets the size of audio/video data
- CheckCodecChange: Checks if the codec has changed
- Demux: Demuxes audio/video data to raw format for use by other formats
- Mux: Muxes from original format to custom audio/video data format
- Recycle: Recycles resources, automatically implemented when embedding RecyclableMemory
- String: Prints audio/video data information
Memory Management
The new pattern includes built-in memory management:
util.ScalableMemoryAllocator
- For efficient memory allocation- Frame recycling through
Recycle()
method - Automatic memory pool management
8. Subscribing to Streams
var suber *m7s.Subscriber
suber, err = p.Subscribe(ctx,streamPath)
go m7s.PlayBlock(suber, handleAudio, handleVideo)
Note that handleAudio and handleVideo are callback functions you need to implement. They take an audio/video format type as input and return an error. If the error is not nil, the subscription is terminated.
9. Working with H26xFrame for Raw Stream Data
9.1 Understanding H26xFrame Structure
The H26xFrame
struct is used for handling H.264/H.265 raw stream data:
type H26xFrame struct {
pkg.Sample
}
Key characteristics:
- Inherits from
pkg.Sample
- contains codec context, memory management, and timing - Uses
Raw.(*pkg.Nalus)
to store NALU (Network Abstraction Layer Unit) data - Supports both H.264 (AVC) and H.265 (HEVC) formats
- Uses efficient memory allocators for zero-copy operations
9.2 Creating H26xFrame for Publishing
import (
"m7s.live/v5"
"m7s.live/v5/pkg/format"
"m7s.live/v5/pkg/util"
"time"
)
// Create publisher with H26xFrame support
func publishRawH264Stream(streamPath string, h264Frames [][]byte) error {
// Get publisher
publisher, err := p.Publish(streamPath)
if err != nil {
return err
}
// Create memory allocator
allocator := util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
defer allocator.Recycle()
// Create writer for H26xFrame
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
// Set up H264 codec context
writer.VideoFrame.ICodecCtx = &format.H264{}
// Publish multiple frames
// Note: This is a demonstration of multi-frame writing. In actual scenarios,
// frames should be written gradually as they are received from the video source.
startTime := time.Now()
for i, frameData := range h264Frames {
// Create H26xFrame for each frame
frame := writer.VideoFrame
// Set timestamp with proper interval
frame.Timestamp = startTime.Add(time.Duration(i) * time.Second / 30) // 30 FPS
// Write NALU data
nalus := frame.GetNalus()
// if frameData is a single NALU, otherwise need to loop
p := nalus.GetNextPointer()
mem := frame.NextN(len(frameData))
copy(mem, frameData)
p.PushOne(mem)
// Publish frame
if err := writer.NextVideo(); err != nil {
return err
}
}
return nil
}
// Example usage with continuous streaming
func continuousH264Publishing(streamPath string, frameSource <-chan []byte, stopChan <-chan struct{}) error {
// Get publisher
publisher, err := p.Publish(streamPath)
if err != nil {
return err
}
defer publisher.Dispose()
// Create memory allocator
allocator := util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
defer allocator.Recycle()
// Create writer for H26xFrame
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
// Set up H264 codec context
writer.VideoFrame.ICodecCtx = &format.H264{}
startTime := time.Now()
frameCount := 0
for {
select {
case frameData := <-frameSource:
// Create H26xFrame for each frame
frame := writer.VideoFrame
// Set timestamp with proper interval
frame.Timestamp = startTime.Add(time.Duration(frameCount) * time.Second / 30) // 30 FPS
// Write NALU data
nalus := frame.GetNalus()
mem := frame.NextN(len(frameData))
copy(mem, frameData)
// Publish frame
if err := writer.NextVideo(); err != nil {
return err
}
frameCount++
case <-stopChan:
// Stop publishing
return nil
}
}
}
9.3 Processing H26xFrame (Transform Pattern)
type MyTransform struct {
m7s.DefaultTransformer
Writer *m7s.PublishWriter[*format.RawAudio, *format.H26xFrame]
}
func (t *MyTransform) Go() {
defer t.Dispose()
for video := range t.Video {
if err := t.processH26xFrame(video); err != nil {
t.Error("process frame failed", "error", err)
break
}
}
}
func (t *MyTransform) processH26xFrame(video *format.H26xFrame) error {
// Copy frame metadata
copyVideo := t.Writer.VideoFrame
copyVideo.ICodecCtx = video.ICodecCtx
*copyVideo.BaseSample = *video.BaseSample
nalus := copyVideo.GetNalus()
// Process each NALU unit
for nalu := range video.Raw.(*pkg.Nalus).RangePoint {
p := nalus.GetNextPointer()
mem := copyVideo.NextN(nalu.Size)
nalu.CopyTo(mem)
// Example: Filter or modify specific NALU types
if video.FourCC() == codec.FourCC_H264 {
switch codec.ParseH264NALUType(mem[0]) {
case codec.NALU_IDR_Picture, codec.NALU_Non_IDR_Picture:
// Process video frame NALUs
// Example: Apply transformations, filters, etc.
case codec.NALU_SPS, codec.NALU_PPS:
// Process parameter set NALUs
}
} else if video.FourCC() == codec.FourCC_H265 {
switch codec.ParseH265NALUType(mem[0]) {
case h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL:
// Process H.265 IDR frames
}
}
// Push processed NALU
p.PushOne(mem)
}
return t.Writer.NextVideo()
}
9.4 Common NALU Types for H.264/H.265
H.264 NALU Types
const (
NALU_Non_IDR_Picture = 1 // Non-IDR picture (P-frames)
NALU_IDR_Picture = 5 // IDR picture (I-frames)
NALU_SEI = 6 // Supplemental enhancement information
NALU_SPS = 7 // Sequence parameter set
NALU_PPS = 8 // Picture parameter set
)
// Parse NALU type from first byte
naluType := codec.ParseH264NALUType(mem[0])
H.265 NALU Types
// Parse H.265 NALU type from first byte
naluType := codec.ParseH265NALUType(mem[0])
9.5 Memory Management Best Practices
// Use memory allocators for efficient operations
allocator := util.NewScalableMemoryAllocator(1 << 20) // 1MB initial size
defer allocator.Recycle()
// When processing multiple frames, reuse the same allocator
writer := m7s.NewPublisherWriter[*format.RawAudio, *format.H26xFrame](publisher, allocator)
9.6 Error Handling and Validation
func processFrame(video *format.H26xFrame) error {
// Check codec changes
if err := video.CheckCodecChange(); err != nil {
return err
}
// Validate frame data
if video.Raw == nil {
return fmt.Errorf("empty frame data")
}
// Process NALUs safely
nalus, ok := video.Raw.(*pkg.Nalus)
if !ok {
return fmt.Errorf("invalid NALUs format")
}
// Process frame...
return nil
}
10. Prometheus Integration
Just implement the Collector interface, and the system will automatically collect metrics from all plugins:
func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) {
}
func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) {
}