Compare commits

...

4 Commits

Author SHA1 Message Date
langhuihui
fe3ac4e3c5 feat: add ffmpeg plugin 2025-09-08 13:39:59 +08:00
langhuihui
438a8ddee1 fix: rtmp play write timeout 2025-09-08 13:28:52 +08:00
langhuihui
4e68cfccba fix: reorder udp 2025-09-08 13:00:04 +08:00
百川8488
21b3bd053a feat: 海康SDK插件 (#331) 2025-09-08 12:42:57 +08:00
78 changed files with 5340 additions and 9 deletions

View File

@@ -13,7 +13,6 @@ import (
_ "m7s.live/v5/plugin/flv"
_ "m7s.live/v5/plugin/gb28181"
_ "m7s.live/v5/plugin/logrotate"
_ "m7s.live/v5/plugin/monitor"
_ "m7s.live/v5/plugin/mp4"
mp4 "m7s.live/v5/plugin/mp4/pkg"
_ "m7s.live/v5/plugin/preview"

View File

@@ -9,7 +9,6 @@ import (
_ "m7s.live/v5/plugin/debug"
_ "m7s.live/v5/plugin/flv"
_ "m7s.live/v5/plugin/logrotate"
_ "m7s.live/v5/plugin/monitor"
_ "m7s.live/v5/plugin/rtmp"
_ "m7s.live/v5/plugin/rtsp"
_ "m7s.live/v5/plugin/test"

80
plugin/ffmpeg/README.md Normal file
View File

@@ -0,0 +1,80 @@
# FFmpeg 插件
FFmpeg 插件用于在 Monibuca 中创建和管理 FFmpeg 进程。该插件提供 gRPC 接口,允许用户创建、更新、查询、重启和关闭 FFmpeg 进程。所有 FFmpeg 进程配置将存储在数据库中,以便反复使用。
## 功能特性
- 创建 FFmpeg 进程配置并持久化存储
- 更新现有 FFmpeg 进程配置
- 查询 FFmpeg 进程(包括运行中和未运行的)
- 启动/重启 FFmpeg 进程
- 停止 FFmpeg 进程
- 在 task 框架中运行 FFmpeg 进程
## API 接口
### 创建 FFmpeg 进程
```
POST /ffmpeg/api/process
```
### 更新 FFmpeg 进程
```
PUT /ffmpeg/api/process/{id}
```
### 删除 FFmpeg 进程
```
DELETE /ffmpeg/api/process/{id}
```
### 获取 FFmpeg 进程列表
```
GET /ffmpeg/api/processes
```
### 获取单个 FFmpeg 进程详情
```
GET /ffmpeg/api/process/{id}
```
### 启动 FFmpeg 进程
```
POST /ffmpeg/api/process/{id}/start
```
### 停止 FFmpeg 进程
```
POST /ffmpeg/api/process/{id}/stop
```
### 重启 FFmpeg 进程
```
POST /ffmpeg/api/process/{id}/restart
```
## 配置选项
```yaml
ffmpeg:
# 数据库配置
db:
dsn: "ffmpeg.db"
dbtype: "sqlite"
# 默认 FFmpeg 路径
ffmpegPath: "ffmpeg"
# 进程管理配置
maxProcesses: 100
autoRestart: true
restartInterval: 5s
```

195
plugin/ffmpeg/api.go Normal file
View File

@@ -0,0 +1,195 @@
package ffmpeg
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/ffmpeg/pb"
)
// CreateProcess 创建 FFmpeg 进程
func (p *FFmpegPlugin) CreateProcess(ctx context.Context, req *pb.CreateProcessRequest) (*pb.ProcessResponse, error) {
process := &FFmpegProcess{
Description: req.Description,
Arguments: req.Arguments,
AutoStart: req.AutoStart,
Status: "stopped",
}
// 保存到数据库
if err := p.DB.Create(process).Error; err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create process: %v", err)
}
// 如果设置了自动启动,则启动进程
if process.AutoStart {
p.startProcess(process)
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
Data: &pb.FFmpegProcess{
Id: uint32(process.ID),
Description: process.Description,
Arguments: process.Arguments,
Status: process.Status,
Pid: int32(process.PID),
AutoStart: process.AutoStart,
CreatedAt: timestamppb.New(process.CreatedAt),
UpdatedAt: timestamppb.New(process.UpdatedAt),
},
}, nil
}
// UpdateProcess 更新 FFmpeg 进程
func (p *FFmpegPlugin) UpdateProcess(ctx context.Context, req *pb.UpdateProcessRequest) (*pb.ProcessResponse, error) {
process := &FFmpegProcess{}
if err := p.DB.First(process, req.Id).Error; err != nil {
return nil, status.Errorf(codes.NotFound, "Process not found: %v", err)
}
process.Description = req.Description
process.Arguments = req.Arguments
process.AutoStart = req.AutoStart
// 更新数据库
if err := p.DB.Save(process).Error; err != nil {
return nil, status.Errorf(codes.Internal, "Failed to update process: %v", err)
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
Data: &pb.FFmpegProcess{
Id: uint32(process.ID),
Description: process.Description,
Arguments: process.Arguments,
Status: process.Status,
Pid: int32(process.PID),
AutoStart: process.AutoStart,
CreatedAt: timestamppb.New(process.CreatedAt),
UpdatedAt: timestamppb.New(process.UpdatedAt),
},
}, nil
}
// DeleteProcess 删除 FFmpeg 进程
func (p *FFmpegPlugin) DeleteProcess(ctx context.Context, req *pb.ProcessIdRequest) (*pb.ProcessResponse, error) {
process := &FFmpegProcess{}
if err := p.DB.First(process, req.Id).Error; err != nil {
return nil, status.Errorf(codes.NotFound, "Process not found: %v", err)
}
if process, exists := p.processes.Get(uint(req.Id)); exists {
process.Stop(task.ErrStopByUser)
process.WaitStopped()
}
if err := p.DB.Delete(process).Error; err != nil {
return nil, status.Errorf(codes.Internal, "Failed to delete process: %v", err)
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
}, nil
}
// ListProcesses 获取 FFmpeg 进程列表
func (p *FFmpegPlugin) ListProcesses(ctx context.Context, req *emptypb.Empty) (*pb.ListProcessesResponse, error) {
var processes []FFmpegProcess
query := p.DB.Model(&FFmpegProcess{})
if err := query.Find(&processes).Error; err != nil {
return nil, status.Errorf(codes.Internal, "Failed to list processes: %v", err)
}
data := make([]*pb.FFmpegProcess, len(processes))
for i, process := range processes {
runtimeProcess, ok := p.processes.Get(process.ID)
if ok {
process = *runtimeProcess.process
}
data[i] = &pb.FFmpegProcess{
Id: uint32(process.ID),
Description: process.Description,
Arguments: process.Arguments,
AutoStart: process.AutoStart,
Status: process.Status,
Pid: int32(process.PID),
CreatedAt: timestamppb.New(process.CreatedAt),
UpdatedAt: timestamppb.New(process.UpdatedAt),
}
}
return &pb.ListProcessesResponse{
Code: 0,
Message: "success",
Data: data,
}, nil
}
// StartProcess 启动 FFmpeg 进程
func (p *FFmpegPlugin) StartProcess(ctx context.Context, req *pb.ProcessIdRequest) (*pb.ProcessResponse, error) {
process := &FFmpegProcess{}
if err := p.DB.First(process, req.Id).Error; err != nil {
return nil, status.Errorf(codes.NotFound, "Process not found: %v", err)
}
// 启动进程
if err := p.startProcess(process); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to start process: %v", err)
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
}, nil
}
// StopProcess 停止 FFmpeg 进程
func (p *FFmpegPlugin) StopProcess(ctx context.Context, req *pb.ProcessIdRequest) (*pb.ProcessResponse, error) {
// 停止进程
if process, exists := p.processes.Get(uint(req.Id)); exists {
process.Stop(task.ErrStopByUser)
process.WaitStopped()
} else {
return nil, pkg.ErrNotFound
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
}, nil
}
// RestartProcess 重启 FFmpeg 进程
func (p *FFmpegPlugin) RestartProcess(ctx context.Context, req *pb.ProcessIdRequest) (*pb.ProcessResponse, error) {
process := &FFmpegProcess{}
if err := p.DB.First(process, req.Id).Error; err != nil {
return nil, status.Errorf(codes.NotFound, "Process not found: %v", err)
}
// 如果进程正在运行,先停止它
if task, exists := p.processes.Get(uint(req.Id)); exists {
task.Stop(pkg.ErrRestart)
task.WaitStopped()
}
err := p.startProcess(process).WaitStarted()
if err != nil {
return nil, err
}
return &pb.ProcessResponse{
Code: 0,
Message: "success",
}, nil
}
// startProcess 启动进程的内部方法
func (p *FFmpegPlugin) startProcess(process *FFmpegProcess) *task.Task {
return p.AddTask(NewFFmpegTask(process))
}

35
plugin/ffmpeg/index.go Normal file
View File

@@ -0,0 +1,35 @@
package ffmpeg
import (
"m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/ffmpeg/pb"
)
var _ = m7s.InstallPlugin[FFmpegPlugin](m7s.PluginMeta{
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
})
type FFmpegPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
AutoRestart bool `default:"true"`
RestartInterval string `default:"5s"`
processes task.WorkCollection[uint, *FFmpegTask]
}
func (p *FFmpegPlugin) OnInit() error {
// 注册数据模型
p.DB.AutoMigrate(&FFmpegProcess{})
// 启动时加载自动启动的进程
var processes []*FFmpegProcess
p.DB.Where("auto_start = ?", true).Find(&processes)
for _, process := range processes {
p.startProcess(process)
}
return nil
}

19
plugin/ffmpeg/model.go Normal file
View File

@@ -0,0 +1,19 @@
package ffmpeg
import (
"gorm.io/gorm"
)
type FFmpegProcess struct {
gorm.Model
Description string `gorm:"size:500"` // 描述
Arguments string `gorm:"type:text"` // 参数 JSON
Status string `gorm:"-"` // 状态: stopped, running, error
PID int `gorm:"-"` // 进程 ID
AutoStart bool `gorm:"default:false"` // 是否自动启动
}
// TableName 设置表名
func (FFmpegProcess) TableName() string {
return "ffmpeg_processes"
}

View File

@@ -0,0 +1,534 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: ffmpeg.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
pb "m7s.live/v5/pb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type FFmpegProcess struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
Arguments string `protobuf:"bytes,3,opt,name=arguments,proto3" json:"arguments,omitempty"`
Status string `protobuf:"bytes,4,opt,name=status,proto3" json:"status,omitempty"`
Pid int32 `protobuf:"varint,5,opt,name=pid,proto3" json:"pid,omitempty"`
AutoStart bool `protobuf:"varint,6,opt,name=autoStart,proto3" json:"autoStart,omitempty"`
CreatedAt *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=createdAt,proto3" json:"createdAt,omitempty"`
UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=updatedAt,proto3" json:"updatedAt,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *FFmpegProcess) Reset() {
*x = FFmpegProcess{}
mi := &file_ffmpeg_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *FFmpegProcess) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FFmpegProcess) ProtoMessage() {}
func (x *FFmpegProcess) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FFmpegProcess.ProtoReflect.Descriptor instead.
func (*FFmpegProcess) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{0}
}
func (x *FFmpegProcess) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *FFmpegProcess) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *FFmpegProcess) GetArguments() string {
if x != nil {
return x.Arguments
}
return ""
}
func (x *FFmpegProcess) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
func (x *FFmpegProcess) GetPid() int32 {
if x != nil {
return x.Pid
}
return 0
}
func (x *FFmpegProcess) GetAutoStart() bool {
if x != nil {
return x.AutoStart
}
return false
}
func (x *FFmpegProcess) GetCreatedAt() *timestamppb.Timestamp {
if x != nil {
return x.CreatedAt
}
return nil
}
func (x *FFmpegProcess) GetUpdatedAt() *timestamppb.Timestamp {
if x != nil {
return x.UpdatedAt
}
return nil
}
type CreateProcessRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Description string `protobuf:"bytes,1,opt,name=description,proto3" json:"description,omitempty"`
Arguments string `protobuf:"bytes,2,opt,name=arguments,proto3" json:"arguments,omitempty"`
AutoStart bool `protobuf:"varint,3,opt,name=autoStart,proto3" json:"autoStart,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CreateProcessRequest) Reset() {
*x = CreateProcessRequest{}
mi := &file_ffmpeg_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CreateProcessRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CreateProcessRequest) ProtoMessage() {}
func (x *CreateProcessRequest) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CreateProcessRequest.ProtoReflect.Descriptor instead.
func (*CreateProcessRequest) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{1}
}
func (x *CreateProcessRequest) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *CreateProcessRequest) GetArguments() string {
if x != nil {
return x.Arguments
}
return ""
}
func (x *CreateProcessRequest) GetAutoStart() bool {
if x != nil {
return x.AutoStart
}
return false
}
type UpdateProcessRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
Arguments string `protobuf:"bytes,3,opt,name=arguments,proto3" json:"arguments,omitempty"`
AutoStart bool `protobuf:"varint,4,opt,name=autoStart,proto3" json:"autoStart,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UpdateProcessRequest) Reset() {
*x = UpdateProcessRequest{}
mi := &file_ffmpeg_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UpdateProcessRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UpdateProcessRequest) ProtoMessage() {}
func (x *UpdateProcessRequest) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UpdateProcessRequest.ProtoReflect.Descriptor instead.
func (*UpdateProcessRequest) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{2}
}
func (x *UpdateProcessRequest) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *UpdateProcessRequest) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *UpdateProcessRequest) GetArguments() string {
if x != nil {
return x.Arguments
}
return ""
}
func (x *UpdateProcessRequest) GetAutoStart() bool {
if x != nil {
return x.AutoStart
}
return false
}
type ProcessIdRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ProcessIdRequest) Reset() {
*x = ProcessIdRequest{}
mi := &file_ffmpeg_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ProcessIdRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProcessIdRequest) ProtoMessage() {}
func (x *ProcessIdRequest) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ProcessIdRequest.ProtoReflect.Descriptor instead.
func (*ProcessIdRequest) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{3}
}
func (x *ProcessIdRequest) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
type ProcessResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data *FFmpegProcess `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ProcessResponse) Reset() {
*x = ProcessResponse{}
mi := &file_ffmpeg_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ProcessResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProcessResponse) ProtoMessage() {}
func (x *ProcessResponse) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ProcessResponse.ProtoReflect.Descriptor instead.
func (*ProcessResponse) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{4}
}
func (x *ProcessResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ProcessResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ProcessResponse) GetData() *FFmpegProcess {
if x != nil {
return x.Data
}
return nil
}
type ListProcessesResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data []*FFmpegProcess `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListProcessesResponse) Reset() {
*x = ListProcessesResponse{}
mi := &file_ffmpeg_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListProcessesResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListProcessesResponse) ProtoMessage() {}
func (x *ListProcessesResponse) ProtoReflect() protoreflect.Message {
mi := &file_ffmpeg_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListProcessesResponse.ProtoReflect.Descriptor instead.
func (*ListProcessesResponse) Descriptor() ([]byte, []int) {
return file_ffmpeg_proto_rawDescGZIP(), []int{5}
}
func (x *ListProcessesResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ListProcessesResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ListProcessesResponse) GetData() []*FFmpegProcess {
if x != nil {
return x.Data
}
return nil
}
var File_ffmpeg_proto protoreflect.FileDescriptor
const file_ffmpeg_proto_rawDesc = "" +
"\n" +
"\fffmpeg.proto\x12\x06ffmpeg\x1a\x1cgoogle/api/annotations.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\fglobal.proto\"\x9b\x02\n" +
"\rFFmpegProcess\x12\x0e\n" +
"\x02id\x18\x01 \x01(\rR\x02id\x12 \n" +
"\vdescription\x18\x02 \x01(\tR\vdescription\x12\x1c\n" +
"\targuments\x18\x03 \x01(\tR\targuments\x12\x16\n" +
"\x06status\x18\x04 \x01(\tR\x06status\x12\x10\n" +
"\x03pid\x18\x05 \x01(\x05R\x03pid\x12\x1c\n" +
"\tautoStart\x18\x06 \x01(\bR\tautoStart\x128\n" +
"\tcreatedAt\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\tcreatedAt\x128\n" +
"\tupdatedAt\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tupdatedAt\"t\n" +
"\x14CreateProcessRequest\x12 \n" +
"\vdescription\x18\x01 \x01(\tR\vdescription\x12\x1c\n" +
"\targuments\x18\x02 \x01(\tR\targuments\x12\x1c\n" +
"\tautoStart\x18\x03 \x01(\bR\tautoStart\"\x84\x01\n" +
"\x14UpdateProcessRequest\x12\x0e\n" +
"\x02id\x18\x01 \x01(\rR\x02id\x12 \n" +
"\vdescription\x18\x02 \x01(\tR\vdescription\x12\x1c\n" +
"\targuments\x18\x03 \x01(\tR\targuments\x12\x1c\n" +
"\tautoStart\x18\x04 \x01(\bR\tautoStart\"\"\n" +
"\x10ProcessIdRequest\x12\x0e\n" +
"\x02id\x18\x01 \x01(\rR\x02id\"j\n" +
"\x0fProcessResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12)\n" +
"\x04data\x18\x03 \x01(\v2\x15.ffmpeg.FFmpegProcessR\x04data\"p\n" +
"\x15ListProcessesResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12)\n" +
"\x04data\x18\x03 \x03(\v2\x15.ffmpeg.FFmpegProcessR\x04data2\xca\x05\n" +
"\x03Api\x12e\n" +
"\rCreateProcess\x12\x1c.ffmpeg.CreateProcessRequest\x1a\x17.ffmpeg.ProcessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17:\x01*\"\x12/ffmpeg/api/create\x12j\n" +
"\rUpdateProcess\x12\x1c.ffmpeg.UpdateProcessRequest\x1a\x17.ffmpeg.ProcessResponse\"\"\x82\xd3\xe4\x93\x02\x1c:\x01*\"\x17/ffmpeg/api/update/{id}\x12c\n" +
"\rDeleteProcess\x12\x18.ffmpeg.ProcessIdRequest\x1a\x17.global.SuccessResponse\"\x1f\x82\xd3\xe4\x93\x02\x19\"\x17/ffmpeg/api/delete/{id}\x12`\n" +
"\rListProcesses\x12\x16.google.protobuf.Empty\x1a\x1d.ffmpeg.ListProcessesResponse\"\x18\x82\xd3\xe4\x93\x02\x12\x12\x10/ffmpeg/api/list\x12a\n" +
"\fStartProcess\x12\x18.ffmpeg.ProcessIdRequest\x1a\x17.global.SuccessResponse\"\x1e\x82\xd3\xe4\x93\x02\x18\"\x16/ffmpeg/api/start/{id}\x12_\n" +
"\vStopProcess\x12\x18.ffmpeg.ProcessIdRequest\x1a\x17.global.SuccessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\"\x15/ffmpeg/api/stop/{id}\x12e\n" +
"\x0eRestartProcess\x12\x18.ffmpeg.ProcessIdRequest\x1a\x17.global.SuccessResponse\" \x82\xd3\xe4\x93\x02\x1a\"\x18/ffmpeg/api/restart/{id}B\x1eZ\x1cm7s.live/v5/plugin/ffmpeg/pbb\x06proto3"
var (
file_ffmpeg_proto_rawDescOnce sync.Once
file_ffmpeg_proto_rawDescData []byte
)
func file_ffmpeg_proto_rawDescGZIP() []byte {
file_ffmpeg_proto_rawDescOnce.Do(func() {
file_ffmpeg_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_ffmpeg_proto_rawDesc), len(file_ffmpeg_proto_rawDesc)))
})
return file_ffmpeg_proto_rawDescData
}
var file_ffmpeg_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_ffmpeg_proto_goTypes = []any{
(*FFmpegProcess)(nil), // 0: ffmpeg.FFmpegProcess
(*CreateProcessRequest)(nil), // 1: ffmpeg.CreateProcessRequest
(*UpdateProcessRequest)(nil), // 2: ffmpeg.UpdateProcessRequest
(*ProcessIdRequest)(nil), // 3: ffmpeg.ProcessIdRequest
(*ProcessResponse)(nil), // 4: ffmpeg.ProcessResponse
(*ListProcessesResponse)(nil), // 5: ffmpeg.ListProcessesResponse
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
(*emptypb.Empty)(nil), // 7: google.protobuf.Empty
(*pb.SuccessResponse)(nil), // 8: global.SuccessResponse
}
var file_ffmpeg_proto_depIdxs = []int32{
6, // 0: ffmpeg.FFmpegProcess.createdAt:type_name -> google.protobuf.Timestamp
6, // 1: ffmpeg.FFmpegProcess.updatedAt:type_name -> google.protobuf.Timestamp
0, // 2: ffmpeg.ProcessResponse.data:type_name -> ffmpeg.FFmpegProcess
0, // 3: ffmpeg.ListProcessesResponse.data:type_name -> ffmpeg.FFmpegProcess
1, // 4: ffmpeg.Api.CreateProcess:input_type -> ffmpeg.CreateProcessRequest
2, // 5: ffmpeg.Api.UpdateProcess:input_type -> ffmpeg.UpdateProcessRequest
3, // 6: ffmpeg.Api.DeleteProcess:input_type -> ffmpeg.ProcessIdRequest
7, // 7: ffmpeg.Api.ListProcesses:input_type -> google.protobuf.Empty
3, // 8: ffmpeg.Api.StartProcess:input_type -> ffmpeg.ProcessIdRequest
3, // 9: ffmpeg.Api.StopProcess:input_type -> ffmpeg.ProcessIdRequest
3, // 10: ffmpeg.Api.RestartProcess:input_type -> ffmpeg.ProcessIdRequest
4, // 11: ffmpeg.Api.CreateProcess:output_type -> ffmpeg.ProcessResponse
4, // 12: ffmpeg.Api.UpdateProcess:output_type -> ffmpeg.ProcessResponse
8, // 13: ffmpeg.Api.DeleteProcess:output_type -> global.SuccessResponse
5, // 14: ffmpeg.Api.ListProcesses:output_type -> ffmpeg.ListProcessesResponse
8, // 15: ffmpeg.Api.StartProcess:output_type -> global.SuccessResponse
8, // 16: ffmpeg.Api.StopProcess:output_type -> global.SuccessResponse
8, // 17: ffmpeg.Api.RestartProcess:output_type -> global.SuccessResponse
11, // [11:18] is the sub-list for method output_type
4, // [4:11] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_ffmpeg_proto_init() }
func file_ffmpeg_proto_init() {
if File_ffmpeg_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_ffmpeg_proto_rawDesc), len(file_ffmpeg_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_ffmpeg_proto_goTypes,
DependencyIndexes: file_ffmpeg_proto_depIdxs,
MessageInfos: file_ffmpeg_proto_msgTypes,
}.Build()
File_ffmpeg_proto = out.File
file_ffmpeg_proto_goTypes = nil
file_ffmpeg_proto_depIdxs = nil
}

View File

@@ -0,0 +1,756 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: ffmpeg.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Api_CreateProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq CreateProcessRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.CreateProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_CreateProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq CreateProcessRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.CreateProcess(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_UpdateProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq UpdateProcessRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.UpdateProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_UpdateProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq UpdateProcessRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.UpdateProcess(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_DeleteProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.DeleteProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_DeleteProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.DeleteProcess(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_ListProcesses_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.ListProcesses(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_ListProcesses_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.ListProcesses(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_StartProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.StartProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_StartProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.StartProcess(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_StopProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.StopProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_StopProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.StopProcess(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_RestartProcess_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := client.RestartProcess(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_RestartProcess_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ProcessIdRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["id"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id")
}
protoReq.Id, err = runtime.Uint32(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err)
}
msg, err := server.RestartProcess(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterApiHandlerFromEndpoint instead.
func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ApiServer) error {
mux.Handle("POST", pattern_Api_CreateProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/CreateProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/create"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_CreateProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_CreateProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdateProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/UpdateProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/update/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_UpdateProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_UpdateProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_DeleteProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/DeleteProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/delete/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_DeleteProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_DeleteProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_ListProcesses_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/ListProcesses", runtime.WithHTTPPathPattern("/ffmpeg/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_ListProcesses_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_ListProcesses_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_StartProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/StartProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/start/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_StartProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StartProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_StopProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/StopProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/stop/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_StopProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StopProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RestartProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/ffmpeg.Api/RestartProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/restart/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_RestartProcess_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_RestartProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterApiHandlerFromEndpoint is same as RegisterApiHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterApiHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterApiHandler(ctx, mux, conn)
}
// RegisterApiHandler registers the http handlers for service Api to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterApiHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterApiHandlerClient(ctx, mux, NewApiClient(conn))
}
// RegisterApiHandlerClient registers the http handlers for service Api
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ApiClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ApiClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "ApiClient" to call the correct interceptors.
func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ApiClient) error {
mux.Handle("POST", pattern_Api_CreateProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/CreateProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/create"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_CreateProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_CreateProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_UpdateProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/UpdateProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/update/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_UpdateProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_UpdateProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_DeleteProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/DeleteProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/delete/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_DeleteProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_DeleteProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_ListProcesses_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/ListProcesses", runtime.WithHTTPPathPattern("/ffmpeg/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_ListProcesses_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_ListProcesses_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_StartProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/StartProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/start/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_StartProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StartProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_StopProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/StopProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/stop/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_StopProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StopProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_RestartProcess_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/ffmpeg.Api/RestartProcess", runtime.WithHTTPPathPattern("/ffmpeg/api/restart/{id}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_RestartProcess_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_RestartProcess_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Api_CreateProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"ffmpeg", "api", "create"}, ""))
pattern_Api_UpdateProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"ffmpeg", "api", "update", "id"}, ""))
pattern_Api_DeleteProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"ffmpeg", "api", "delete", "id"}, ""))
pattern_Api_ListProcesses_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"ffmpeg", "api", "list"}, ""))
pattern_Api_StartProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"ffmpeg", "api", "start", "id"}, ""))
pattern_Api_StopProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"ffmpeg", "api", "stop", "id"}, ""))
pattern_Api_RestartProcess_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"ffmpeg", "api", "restart", "id"}, ""))
)
var (
forward_Api_CreateProcess_0 = runtime.ForwardResponseMessage
forward_Api_UpdateProcess_0 = runtime.ForwardResponseMessage
forward_Api_DeleteProcess_0 = runtime.ForwardResponseMessage
forward_Api_ListProcesses_0 = runtime.ForwardResponseMessage
forward_Api_StartProcess_0 = runtime.ForwardResponseMessage
forward_Api_StopProcess_0 = runtime.ForwardResponseMessage
forward_Api_RestartProcess_0 = runtime.ForwardResponseMessage
)

View File

@@ -0,0 +1,101 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "global.proto";
package ffmpeg;
option go_package="m7s.live/v5/plugin/ffmpeg/pb";
service Api {
// 创建 FFmpeg 进程
rpc CreateProcess (CreateProcessRequest) returns (ProcessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/create"
body: "*"
};
}
// 更新 FFmpeg 进程
rpc UpdateProcess (UpdateProcessRequest) returns (ProcessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/update/{id}"
body: "*"
};
}
// 删除 FFmpeg 进程
rpc DeleteProcess (ProcessIdRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/delete/{id}"
};
}
// 获取 FFmpeg 进程列表
rpc ListProcesses (google.protobuf.Empty) returns (ListProcessesResponse) {
option (google.api.http) = {
get: "/ffmpeg/api/list"
};
}
// 启动 FFmpeg 进程
rpc StartProcess (ProcessIdRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/start/{id}"
};
}
// 停止 FFmpeg 进程
rpc StopProcess (ProcessIdRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/stop/{id}"
};
}
// 重启 FFmpeg 进程
rpc RestartProcess (ProcessIdRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/ffmpeg/api/restart/{id}"
};
}
}
message FFmpegProcess {
uint32 id = 1;
string description = 2;
string arguments = 3;
string status = 4;
int32 pid = 5;
bool autoStart = 6;
google.protobuf.Timestamp createdAt = 7;
google.protobuf.Timestamp updatedAt = 8;
}
message CreateProcessRequest {
string description = 1;
string arguments = 2;
bool autoStart = 3;
}
message UpdateProcessRequest {
uint32 id = 1;
string description = 2;
string arguments = 3;
bool autoStart = 4;
}
message ProcessIdRequest {
uint32 id = 1;
}
message ProcessResponse {
uint32 code = 1;
string message = 2;
FFmpegProcess data = 3;
}
message ListProcessesResponse {
uint32 code = 1;
string message = 2;
repeated FFmpegProcess data = 3;
}

View File

@@ -0,0 +1,365 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: ffmpeg.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
pb "m7s.live/v5/pb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
Api_CreateProcess_FullMethodName = "/ffmpeg.Api/CreateProcess"
Api_UpdateProcess_FullMethodName = "/ffmpeg.Api/UpdateProcess"
Api_DeleteProcess_FullMethodName = "/ffmpeg.Api/DeleteProcess"
Api_ListProcesses_FullMethodName = "/ffmpeg.Api/ListProcesses"
Api_StartProcess_FullMethodName = "/ffmpeg.Api/StartProcess"
Api_StopProcess_FullMethodName = "/ffmpeg.Api/StopProcess"
Api_RestartProcess_FullMethodName = "/ffmpeg.Api/RestartProcess"
)
// ApiClient is the client API for Api service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ApiClient interface {
// 创建 FFmpeg 进程
CreateProcess(ctx context.Context, in *CreateProcessRequest, opts ...grpc.CallOption) (*ProcessResponse, error)
// 更新 FFmpeg 进程
UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*ProcessResponse, error)
// 删除 FFmpeg 进程
DeleteProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
// 获取 FFmpeg 进程列表
ListProcesses(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListProcessesResponse, error)
// 启动 FFmpeg 进程
StartProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
// 停止 FFmpeg 进程
StopProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
// 重启 FFmpeg 进程
RestartProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
}
type apiClient struct {
cc grpc.ClientConnInterface
}
func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
return &apiClient{cc}
}
func (c *apiClient) CreateProcess(ctx context.Context, in *CreateProcessRequest, opts ...grpc.CallOption) (*ProcessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ProcessResponse)
err := c.cc.Invoke(ctx, Api_CreateProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*ProcessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ProcessResponse)
err := c.cc.Invoke(ctx, Api_UpdateProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) DeleteProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, Api_DeleteProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) ListProcesses(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ListProcessesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListProcessesResponse)
err := c.cc.Invoke(ctx, Api_ListProcesses_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) StartProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, Api_StartProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) StopProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, Api_StopProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) RestartProcess(ctx context.Context, in *ProcessIdRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, Api_RestartProcess_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility.
type ApiServer interface {
// 创建 FFmpeg 进程
CreateProcess(context.Context, *CreateProcessRequest) (*ProcessResponse, error)
// 更新 FFmpeg 进程
UpdateProcess(context.Context, *UpdateProcessRequest) (*ProcessResponse, error)
// 删除 FFmpeg 进程
DeleteProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error)
// 获取 FFmpeg 进程列表
ListProcesses(context.Context, *emptypb.Empty) (*ListProcessesResponse, error)
// 启动 FFmpeg 进程
StartProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error)
// 停止 FFmpeg 进程
StopProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error)
// 重启 FFmpeg 进程
RestartProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error)
mustEmbedUnimplementedApiServer()
}
// UnimplementedApiServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedApiServer struct{}
func (UnimplementedApiServer) CreateProcess(context.Context, *CreateProcessRequest) (*ProcessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateProcess not implemented")
}
func (UnimplementedApiServer) UpdateProcess(context.Context, *UpdateProcessRequest) (*ProcessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateProcess not implemented")
}
func (UnimplementedApiServer) DeleteProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteProcess not implemented")
}
func (UnimplementedApiServer) ListProcesses(context.Context, *emptypb.Empty) (*ListProcessesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListProcesses not implemented")
}
func (UnimplementedApiServer) StartProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartProcess not implemented")
}
func (UnimplementedApiServer) StopProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopProcess not implemented")
}
func (UnimplementedApiServer) RestartProcess(context.Context, *ProcessIdRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RestartProcess not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
// result in compilation errors.
type UnsafeApiServer interface {
mustEmbedUnimplementedApiServer()
}
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
// If the following call pancis, it indicates UnimplementedApiServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Api_ServiceDesc, srv)
}
func _Api_CreateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateProcessRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).CreateProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_CreateProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).CreateProcess(ctx, req.(*CreateProcessRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateProcessRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).UpdateProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_UpdateProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_DeleteProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProcessIdRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).DeleteProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_DeleteProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).DeleteProcess(ctx, req.(*ProcessIdRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_ListProcesses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).ListProcesses(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_ListProcesses_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).ListProcesses(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_StartProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProcessIdRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).StartProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_StartProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StartProcess(ctx, req.(*ProcessIdRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_StopProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProcessIdRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).StopProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_StopProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StopProcess(ctx, req.(*ProcessIdRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_RestartProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProcessIdRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).RestartProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_RestartProcess_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).RestartProcess(ctx, req.(*ProcessIdRequest))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Api_ServiceDesc = grpc.ServiceDesc{
ServiceName: "ffmpeg.Api",
HandlerType: (*ApiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CreateProcess",
Handler: _Api_CreateProcess_Handler,
},
{
MethodName: "UpdateProcess",
Handler: _Api_UpdateProcess_Handler,
},
{
MethodName: "DeleteProcess",
Handler: _Api_DeleteProcess_Handler,
},
{
MethodName: "ListProcesses",
Handler: _Api_ListProcesses_Handler,
},
{
MethodName: "StartProcess",
Handler: _Api_StartProcess_Handler,
},
{
MethodName: "StopProcess",
Handler: _Api_StopProcess_Handler,
},
{
MethodName: "RestartProcess",
Handler: _Api_RestartProcess_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "ffmpeg.proto",
}

66
plugin/ffmpeg/process.go Normal file
View File

@@ -0,0 +1,66 @@
package ffmpeg
import (
"fmt"
"os/exec"
"m7s.live/v5/pkg/task"
)
type FFmpegTask struct {
task.Task
process *FFmpegProcess
cmd *exec.Cmd
}
func NewFFmpegTask(process *FFmpegProcess) *FFmpegTask {
t := &FFmpegTask{
process: process,
}
return t
}
func (t *FFmpegTask) GetKey() uint {
return t.process.ID
}
func (t *FFmpegTask) Start() error {
// 解析参数
args := []string{}
if t.process.Arguments != "" {
// 这里应该解析JSON格式的参数
// 为简化实现我们直接将Arguments作为参数传递
args = append(args, t.process.Arguments)
}
// 创建命令
t.cmd = exec.Command("ffmpeg", args...)
// 启动进程
err := t.cmd.Start()
if err != nil {
t.process.Status = "error"
return err
}
// 更新进程信息
t.process.PID = t.cmd.Process.Pid
t.process.Status = "running"
t.OnStop(t.cmd.Process.Kill)
return nil
}
func (t *FFmpegTask) Run() error {
if t.cmd == nil || t.cmd.Process == nil {
return fmt.Errorf("FFmpeg process not started")
}
// 等待进程结束
err := t.cmd.Wait()
if err != nil {
t.process.Status = "error"
return err
}
t.process.Status = "stopped"
return nil
}

45
plugin/hiksdk/client.go Normal file
View File

@@ -0,0 +1,45 @@
package plugin_hiksdk
import (
"m7s.live/v5"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
)
const (
DIRECTION_PULL = "pull"
DIRECTION_PUSH = "push"
)
type ClientPlugin struct {
task.Job
conf *HikPlugin
pullCtx m7s.PullJob
pushCtx m7s.PushJob
direction string
}
func (c *ClientPlugin) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *ClientPlugin) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}
func NewPuller(_ config.Pull) m7s.IPuller {
client := &ClientPlugin{
direction: DIRECTION_PULL,
}
client.SetDescription(task.OwnerTypeKey, "HikPuller")
return client
}
func NewPusher() m7s.IPusher {
client := &ClientPlugin{
direction: DIRECTION_PUSH,
}
client.SetDescription(task.OwnerTypeKey, "HikPusher")
return client
}

80
plugin/hiksdk/device.go Normal file
View File

@@ -0,0 +1,80 @@
package plugin_hiksdk
import (
"fmt"
"strings"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/hiksdk/pkg"
"github.com/prometheus/client_golang/prometheus"
)
type HikDevice struct {
task.Job
IP string
UserName string
Password string
Port int
Device pkg.Device
Conf *HikPlugin
}
func (d *HikDevice) Start() (err error) {
info := pkg.DeviceInfo{
IP: d.IP,
UserName: d.UserName,
Password: d.Password,
Port: d.Port,
}
d.Device = pkg.NewHKDevice(info)
return
}
func (d *HikDevice) Run() (err error) {
if _, err := d.Device.Login(); err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("success login")
}
deviceInfo, err := d.Device.GetDeiceInfo() // 获取设备参数
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(deviceInfo)
}
channelNames, err := d.Device.GetChannelName() // 获取通道
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("通道:", channelNames)
}
d.AutoPullStream()
return
}
func (d *HikDevice) AutoPullStream() {
deviceInfo, _ := d.Device.GetDeiceInfo() // 获取设备参数
channelNames, _ := d.Device.GetChannelName() // 获取通道
for i := 1; i <= int(deviceInfo.ByChanNum); i++ {
d.PullStream(deviceInfo.DeviceID, channelNames[i], i)
}
}
func (d *HikDevice) PullStream(ifname string, channelName string, channelId int) error {
// 生成流路径
ifname = strings.ReplaceAll(ifname, "-", "_")
streamPath := fmt.Sprintf("%s/%s", ifname, channelName)
receiver := &pkg.PSReceiver{}
receiver.Device = d.Device
receiver.ChannelId = channelId
receiver.Publisher, _ = d.Conf.Publish(d.Conf, streamPath)
go d.Conf.RunTask(receiver)
return nil
}
func (d *HikDevice) Describe(ch chan<- *prometheus.Desc) {
d.Device.Logout()
}

File diff suppressed because it is too large Load Diff

54
plugin/hiksdk/index.go Normal file
View File

@@ -0,0 +1,54 @@
package plugin_hiksdk
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
m7s "m7s.live/v5"
"m7s.live/v5/plugin/hiksdk/pkg"
)
type HikPlugin struct {
m7s.Plugin
Clients []Client `yaml:"client,omitempty"` //共享的通道格式为GBID流地址
list []HikDevice
}
type Client struct {
IP string `yaml:"ip"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
Port int `yaml:"port"`
}
var _ = m7s.InstallPlugin[HikPlugin](m7s.PluginMeta{
NewPuller: NewPuller,
NewPusher: NewPusher,
})
func init() {
fmt.Println("success 初始化海康SDK")
pkg.InitHikSDK()
}
func (hik *HikPlugin) Start() (err error) {
for i, client := range hik.Clients {
fmt.Printf("Client[%d]: IP=%s, UserName=%s, Password=%s, Port=%d\n", i, client.IP, client.UserName, client.Password, client.Port)
device := HikDevice{
IP: client.IP,
UserName: client.UserName,
Password: client.Password,
Port: client.Port,
Conf: hik,
}
hik.list = append(hik.list, device)
}
for _, device := range hik.list {
go hik.AddTask(&device)
}
return
}
func (hik *HikPlugin) Describe(ch chan<- *prometheus.Desc) {
pkg.HKExit()
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,26 @@
#ȱʡ<C8B1><CAA1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̨
#FATAL<41><4C>ERROR<4F><52>WARN<52><4E>INFO<46><4F>DEBUG <20><><EFBFBD>ȼ<EFBFBD>˳<EFBFBD><CBB3> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD>͸<EFBFBD>ģ<EFBFBD>鶼ƥ<E9B6BC><EFBFBD><E4A3AC>ô<EFBFBD><C3B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
log4j.rootLogger=DEBUG, stdout
#log4j.rootLogger=DEBUG
##hlog.async=false
##hlog.secret.show=true
##hlog.secret.encrypt=false
#log4j.logger<65><72><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD>ɼ<EFBFBD><C9BC><EFBFBD><EFBFBD>𼰲ɼ<F0BCB0B2><C9BC><EFBFBD><EFBFBD>ݣ<EFBFBD>Threshold<6C><64><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
<><D3A6><EFBFBD>ڿ<EFBFBD><DABF><EFBFBD>̨
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d][%t][%-5p]- %m%n
log4j.logger.NPQ=ERROR, NPQ
log4j.appender.NPQ=org.apache.log4j.RollingFileAppender
log4j.appender.NPQ.File=./NPQLog/NPQ.log
log4j.appender.NPQ.MaxFileSize=80MB
log4j.appender.NPQ.MaxBackupIndex=12
log4j.appender.NPQ.Append=true
log4j.appender.NPQ.Threshold=TRACE
log4j.appender.NPQ.layout=org.apache.log4j.PatternLayout
log4j.appender.NPQ.layout.ConversionPattern=[%d][%t][%-5p]- %m%n
log4j.additivity.NPQ = false
#<23><><EFBFBD><EFBFBD>һλ<D2BB>޸<EFBFBD>Ϊtrue <20>ȿ<EFBFBD><C8BF>Կ<EFBFBD><D4BF><EFBFBD>̨<EFBFBD><CCA8><EFBFBD><EFBFBD><EFBFBD>ֿ<EFBFBD><D6BF><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD>

Binary file not shown.

View File

@@ -0,0 +1,506 @@
package pkg
/*
#cgo CFLAGS: -I../include
// Linux平台的链接配置
#cgo linux LDFLAGS: -L../lib/Linux -lHCCore -lhpr -lhcnetsdk
// Windows平台的链接配置
#cgo windows LDFLAGS: -L../lib/Windows -lHCCore -lHCNetSDK
#include <stdio.h>
#include <stdlib.h>
#include "HCNetSDK.h"
extern void AlarmCallBack(LONG lCommand, NET_DVR_ALARMER *pAlarmer, char *pAlarmInfo, DWORD dwBufLen, void* pUser);
extern void RealDataCallBack_V30(LONG lRealHandle, DWORD dwDataType, BYTE *pBuffer, DWORD dwBufSize, void *pUser);
*/
import "C"
import (
"errors"
"fmt"
"log"
"strings"
"sync"
"unsafe"
"golang.org/x/text/encoding/simplifiedchinese"
)
// 全局指针映射用于安全地在CGO中传递Go指针
var (
pointerMap = make(map[uintptr]*Receiver)
pointerMutex sync.RWMutex
pointerCounter uintptr = 1
)
// 存储Go指针返回一个唯一的标识符
func storePointer(receiver *Receiver) uintptr {
pointerMutex.Lock()
defer pointerMutex.Unlock()
id := pointerCounter
pointerCounter++
pointerMap[id] = receiver
return id
}
// 根据标识符获取Go指针
func getPointer(id uintptr) *Receiver {
pointerMutex.RLock()
defer pointerMutex.RUnlock()
return pointerMap[id]
}
// 删除存储的指针
func removePointer(id uintptr) {
pointerMutex.Lock()
defer pointerMutex.Unlock()
delete(pointerMap, id)
}
/*************************参数配置命令 begin*******************************/
//用于NET_DVR_SetDVRConfig和NET_DVR_GetDVRConfig,注意其对应的配置结构
const (
NET_DVR_GET_DEVICECFG = 100 // 获取设备参数
NET_DVR_SET_DEVICECFG = 101 // 设置设备参数
NET_DVR_GET_NETCFG = 102 // 获取网络参数
NET_DVR_SET_NETCFG = 103 // 设置网络参数
NET_DVR_GET_PICCFG = 104 // 获取图象参数
NET_DVR_SET_PICCFG = 105 // 设置图象参数
NET_DVR_GET_COMPRESSCFG = 106 // 获取压缩参数
NET_DVR_SET_COMPRESSCFG = 107 // 设置压缩参数
NET_DVR_GET_RECORDCFG = 108 // 获取录像时间参数
NET_DVR_SET_RECORDCFG = 109 // 设置录像时间参数
NET_DVR_GET_DECODERCFG = 110 // 获取解码器参数
NET_DVR_SET_DECODERCFG = 111 // 设置解码器参数
NET_DVR_GET_RS232CFG = 112 // 获取232串口参数
NET_DVR_SET_RS232CFG = 113 // 设置232串口参数
NET_DVR_GET_ALARMINCFG = 114 // 获取报警输入参数
NET_DVR_SET_ALARMINCFG = 115 // 设置报警输入参数
NET_DVR_GET_ALARMOUTCFG = 116 // 获取报警输出参数
NET_DVR_SET_ALARMOUTCFG = 117 // 设置报警输出参数
NET_DVR_GET_TIMECFG = 118 // 获取DVR时间
NET_DVR_SET_TIMECFG = 119 // 设置DVR时间
NET_DVR_GET_PREVIEWCFG = 120 // 获取预览参数
NET_DVR_SET_PREVIEWCFG = 121 // 设置预览参数
NET_DVR_GET_VIDEOOUTCFG = 122 // 获取视频输出参数
NET_DVR_SET_VIDEOOUTCFG = 123 // 设置视频输出参数
NET_DVR_GET_USERCFG = 124 // 获取用户参数
NET_DVR_SET_USERCFG = 125 // 设置用户参数
NET_DVR_GET_EXCEPTIONCFG = 126 // 获取异常参数
NET_DVR_SET_EXCEPTIONCFG = 127 // 设置异常参数
NET_DVR_GET_ZONEANDDST = 128 // 获取时区和夏时制参数
NET_DVR_SET_ZONEANDDST = 129 // 设置时区和夏时制参数
NET_DVR_GET_DEVICECFG_V40 = 1100 // 获取设备参数
NET_DVR_SET_PTZPOS = 292 //云台设置PTZ位置
NET_DVR_GET_PTZPOS = 293 //云台获取PTZ位置
NET_DVR_GET_PTZSCOPE = 294 //云台获取PTZ范围
)
// strcpy safely copies a Go string to a C character array
func strcpy(dst unsafe.Pointer, src string, maxLen int) {
if len(src) >= maxLen {
copy((*[1 << 30]byte)(dst)[:maxLen-1], src)
(*[1 << 30]byte)(dst)[maxLen-1] = 0
} else {
copy((*[1 << 30]byte)(dst)[:len(src)], src)
(*[1 << 30]byte)(dst)[len(src)] = 0
}
}
// GBK → UTF-8
func GBKToUTF8(b []byte) (string, error) {
r, err := simplifiedchinese.GBK.NewDecoder().Bytes(b)
return string(r), err
}
// UTF-8 → GBK
func UTF8ToGBK(s string) ([]byte, error) {
return simplifiedchinese.GBK.NewEncoder().Bytes([]byte(s))
}
//export AlarmCallBack
func AlarmCallBack(command C.LONG, alarm *C.NET_DVR_ALARMER, info *C.char, len C.DWORD, user unsafe.Pointer) {
fmt.Println("receive alarm")
}
//export RealDataCallBack_V30
func RealDataCallBack_V30(lRealHandle C.LONG, dwDataType C.DWORD, pBuffer *C.BYTE, dwBufSize C.DWORD, pUser unsafe.Pointer) {
// 从指针映射中获取Receiver
receiverID := uintptr(pUser)
receiver := getPointer(receiverID)
if receiver == nil {
fmt.Println("Error: receiver not found for ID", receiverID)
return
}
size := int(dwBufSize)
if size > 0 && pBuffer != nil {
// 将C指针转换为Go的byte切片
buffer := (*[1 << 30]C.BYTE)(unsafe.Pointer(pBuffer))[:size:size]
// 使用unsafe.Pointer高效转换[]C.BYTE为[]byte
goBuffer := (*[1 << 30]byte)(unsafe.Pointer(&buffer[0]))[:len(buffer):len(buffer)]
receiver.ReadPSData(goBuffer)
}
}
type HKDevice struct {
ip string
port int
username string
password string
loginId int
alarmHandle int
lRealHandle int
byChanNum int
receiverID uintptr // 存储指针映射ID
}
// InitHikSDK hk sdk init
func InitHikSDK() {
// 初始化SDK
C.NET_DVR_Init()
C.NET_DVR_SetConnectTime(2000, 5)
C.NET_DVR_SetReconnect(10000, 1)
}
// HKExit hk sdk clean
func HKExit() {
C.NET_DVR_Cleanup()
}
// NewHKDevice new hk-device instance
func NewHKDevice(info DeviceInfo) Device {
return &HKDevice{
ip: info.IP,
port: info.Port,
username: info.UserName,
password: info.Password}
}
// Login hk device loin
func (device *HKDevice) Login() (int, error) {
// init data
var deviceInfoV30 C.NET_DVR_DEVICEINFO_V30
ip := C.CString(device.ip)
usr := C.CString(device.username)
passwd := C.CString(device.password)
defer func() {
C.free(unsafe.Pointer(ip))
C.free(unsafe.Pointer(usr))
C.free(unsafe.Pointer(passwd))
}()
device.loginId = int(C.NET_DVR_Login_V30(ip, C.WORD(device.port), usr, passwd,
(*C.NET_DVR_DEVICEINFO_V30)(unsafe.Pointer(&deviceInfoV30)),
))
// 正确地将_Ctype_BYTE数组转换为string
serialNumber := string(unsafe.Slice(&deviceInfoV30.sSerialNumber[0], len(deviceInfoV30.sSerialNumber)))
// 去除可能的空字符
serialNumber = strings.Trim(serialNumber, "\x00")
fmt.Println("设备序列号:", serialNumber)
fmt.Println("登录成功,设备信息已获取")
if device.loginId < 0 {
return -1, device.HKErr("login ")
}
log.Println("login success")
return device.loginId, nil
}
// Logout hk device logout
func (device *HKDevice) Logout() error {
C.NET_DVR_Logout_V30(C.LONG(device.loginId))
if err := device.HKErr("NVRLogout"); err != nil {
return err
}
return nil
}
// Login hk device loin
func (device *HKDevice) LoginV4() (int, error) {
// init data
var deviceInfoV40 C.NET_DVR_DEVICEINFO_V40
var userLoginInfo C.NET_DVR_USER_LOGIN_INFO
// 使用strcpy函数将字符串复制到C结构体的字符数组中
strcpy(unsafe.Pointer(&userLoginInfo.sDeviceAddress[0]), device.ip, len(userLoginInfo.sDeviceAddress))
userLoginInfo.wPort = C.WORD(device.port)
strcpy(unsafe.Pointer(&userLoginInfo.sUserName[0]), device.username, len(userLoginInfo.sUserName))
strcpy(unsafe.Pointer(&userLoginInfo.sPassword[0]), device.password, len(userLoginInfo.sPassword))
// 正确调用NET_DVR_Login_V40函数
device.loginId = int(C.NET_DVR_Login_V40(&userLoginInfo, (*C.NET_DVR_DEVICEINFO_V40)(unsafe.Pointer(&deviceInfoV40))))
// 正确地将_Ctype_BYTE数组转换为string
serialNumber := string(unsafe.Slice(&deviceInfoV40.struDeviceV30.sSerialNumber[0], len(deviceInfoV40.struDeviceV30.sSerialNumber)))
// 去除可能的空字符
serialNumber = strings.Trim(serialNumber, "\x00")
// fmt.Println("设备序列号:", serialNumber)
// bySupportDev5是一个字节值而不是数组直接输出其数值
// fmt.Println("支持的设备类型标志:", int(deviceInfoV40.bySupportDev5))
// fmt.Println("登录成功,设备信息已获取")
if device.loginId < 0 {
return -1, device.HKErr("login ")
}
log.Println("login success")
return device.loginId, nil
}
func (device *HKDevice) GetDeiceInfo() (*DeviceInfo, error) {
// BOOL NET_DVR_GetDVRConfig(LONG lUserID, DWORD dwCommand,LONG lChannel, LPVOID lpOutBuffer, DWORD dwOutBufferSize, LPDWORD lpBytesReturned);
var deviceInfo C.NET_DVR_DEVICECFG
var bytesReturned C.DWORD
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_DEVICECFG), C.LONG(0), (C.LPVOID)(unsafe.Pointer(&deviceInfo)), C.DWORD(unsafe.Sizeof(deviceInfo)), &bytesReturned) != C.TRUE {
// fmt.Println("获取设备信息失败")
}
sDVRName := string(unsafe.Slice(&deviceInfo.sDVRName[0], len(deviceInfo.sDVRName)))
sSerialNumber := string(unsafe.Slice(&deviceInfo.sSerialNumber[0], len(deviceInfo.sSerialNumber)))
// 清理字符串中的空字符和空格
sDVRName = strings.TrimRight(sDVRName, "\x00")
sDVRName = strings.TrimSpace(sDVRName)
sSerialNumber = strings.TrimRight(sSerialNumber, "\x00")
sSerialNumber = strings.TrimSpace(sSerialNumber)
sDVRName, _ = GBKToUTF8([]byte(sDVRName))
device.byChanNum = int(deviceInfo.byChanNum)
return &DeviceInfo{
IP: device.ip,
Port: device.port,
UserName: device.username,
Password: device.password,
DeviceID: sSerialNumber,
DeviceName: sDVRName,
ByChanNum: int(deviceInfo.byChanNum),
}, nil
}
// 获取通道名称,俯仰角,横滚角
func (device *HKDevice) GetChannelName() (map[int]string, error) {
channelNames := make(map[int]string)
for i := 1; i <= int(device.byChanNum); i++ {
var channelInfo C.NET_DVR_PICCFG
var bytesReturned C.DWORD
var sDVRName string
// if C.NET_DVR_GetChannelInfo(C.LONG(device.loginId), C.LONG(i), (*C.NET_DVR_CHANNELINFO)(unsafe.Pointer(&channelInfo))) != C.TRUE {
// return nil, device.HKErr("get channel info")
// }
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PICCFG), C.LONG(i), (C.LPVOID)(unsafe.Pointer(&channelInfo)), C.DWORD(unsafe.Sizeof(channelInfo)), &bytesReturned) != C.TRUE {
// fmt.Println("获取通道名称失败")
// return nil, device.HKErr("get device info")
sDVRName = "camera" + fmt.Sprintf("%d", i)
} else {
sDVRName = string(unsafe.Slice(&channelInfo.sChanName[0], len(channelInfo.sChanName)))
// 清理字符串中的空字符和空格
sDVRName = strings.TrimRight(sDVRName, "\x00")
sDVRName = strings.TrimSpace(sDVRName)
sDVRName, _ = GBKToUTF8([]byte(sDVRName))
}
channelNames[i] = sDVRName
}
return channelNames, nil
}
// 获取通道名称,俯仰角,横滚角
func (device *HKDevice) GetChannelPTZ(channel int) {
var ptzPos C.NET_DVR_PTZPOS
var ptzScope C.NET_DVR_PTZSCOPE
var bytesReturned C.DWORD
// if C.NET_DVR_GetChannelInfo(C.LONG(device.loginId), C.LONG(i), (*C.NET_DVR_CHANNELINFO)(unsafe.Pointer(&channelInfo))) != C.TRUE {
// return nil, device.HKErr("get channel info")
// }
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PTZPOS), C.LONG(channel), (C.LPVOID)(unsafe.Pointer(&ptzPos)), C.DWORD(unsafe.Sizeof(ptzPos)), &bytesReturned) != C.TRUE {
// fmt.Println("获取PTZ位置信息失败")
}
// fmt.Println("PTZ位置信息:", ptzPos)
if C.NET_DVR_GetDVRConfig(C.LONG(device.loginId), C.DWORD(NET_DVR_GET_PTZSCOPE), C.LONG(channel), (C.LPVOID)(unsafe.Pointer(&ptzScope)), C.DWORD(unsafe.Sizeof(ptzScope)), &bytesReturned) != C.TRUE {
// fmt.Println("获取PTZ范围信息失败")
}
fmt.Println("PTZ范围信息:", ptzScope)
// 计算PTZ位置 - 显示原始值用于调试
fmt.Println("原始PTZ值 - wPanPos:", ptzPos.wPanPos, "wPanPosMin:", ptzScope.wPanPosMin, "wPanPosMax:", ptzScope.wPanPosMax)
fmt.Println("原始PTZ值 - wTiltPos:", ptzPos.wTiltPos, "wTiltPosMin:", ptzScope.wTiltPosMin, "wTiltPosMax:", ptzScope.wTiltPosMax)
fmt.Println("原始PTZ值 - wZoomPos:", ptzPos.wZoomPos, "wZoomPosMin:", ptzScope.wZoomPosMin, "wZoomPosMax:", ptzScope.wZoomPosMax)
// 计算差值
deltaPan := ptzScope.wPanPosMax - ptzScope.wPanPosMin
deltaTilt := ptzScope.wTiltPosMax - ptzScope.wTiltPosMin
deltaZoom := ptzScope.wZoomPosMax - ptzScope.wZoomPosMin
fmt.Println("差值计算 - deltaPan:", deltaPan, "deltaTilt:", deltaTilt, "deltaZoom:", deltaZoom)
fmt.Println("位置差值 - Pan:", ptzPos.wPanPos-ptzScope.wPanPosMin, "Tilt:", ptzPos.wTiltPos-ptzScope.wTiltPosMin, "Zoom:", ptzPos.wZoomPos-ptzScope.wZoomPosMin)
// 添加除零检查和边界处理
// 计算水平位置 (Pan)
if deltaPan > 0 {
// 计算实际比例
panRatio := float64(ptzPos.wPanPos-ptzScope.wPanPosMin) / float64(deltaPan)
fmt.Println("Pan比例:", panRatio)
// 计算Pan位置0-360度
panPos := int(panRatio * 360)
// 确保结果在0-360范围内
if panPos < 0 {
panPos = 0
} else if panPos > 360 {
panPos = 360
}
ptzPos.wPanPos = C.WORD(panPos)
fmt.Println("计算后Pan位置:", panPos)
} else {
ptzPos.wPanPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ水平范围无效设置为默认值")
}
// 计算垂直位置 (Tilt)
if deltaTilt > 0 {
// 计算实际比例
tiltRatio := float64(ptzPos.wTiltPos-ptzScope.wTiltPosMin) / float64(deltaTilt)
fmt.Println("Tilt比例:", tiltRatio)
// 计算Tilt位置0-360度
tiltPos := int(tiltRatio * 90)
// 确保结果在0-360范围内
if tiltPos < 0 {
tiltPos = 0
} else if tiltPos > 90 {
tiltPos = 90
}
ptzPos.wTiltPos = C.WORD(tiltPos)
fmt.Println("计算后Tilt位置:", tiltPos)
} else {
ptzPos.wTiltPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ垂直范围无效设置为默认值")
}
// 计算缩放位置 (Zoom)
if deltaZoom > 0 {
// 计算实际比例
zoomRatio := float64(ptzPos.wZoomPos-ptzScope.wZoomPosMin) / float64(deltaZoom)
fmt.Println("Zoom比例:", zoomRatio)
// 计算Zoom位置0-100%
zoomPos := int(zoomRatio * 100)
// 确保结果在0-100范围内
if zoomPos < 0 {
zoomPos = 0
} else if zoomPos > 100 {
zoomPos = 100
}
ptzPos.wZoomPos = C.WORD(zoomPos)
fmt.Println("计算后Zoom位置:", zoomPos)
} else {
ptzPos.wZoomPos = 0 // 当范围无效时设置默认值
fmt.Println("警告: PTZ缩放范围无效设置为默认值")
}
fmt.Println("PTZ位置信息:", ptzPos)
}
func (device *HKDevice) SetAlarmCallBack() error { //监听报警信息
if C.NET_DVR_SetDVRMessageCallBack_V30(C.MSGCallBack(C.AlarmCallBack), C.NULL) != C.TRUE {
return device.HKErr(device.ip + ":set alarm callback")
}
return nil
}
func (device *HKDevice) StartListenAlarmMsg() error {
var struAlarmParam C.NET_DVR_SETUPALARM_PARAM
// 根据平台使用正确的类型
struAlarmParam.dwSize = C.DWORD(unsafe.Sizeof(struAlarmParam))
struAlarmParam.byAlarmInfoType = 0
// 转换为正确的类型
device.alarmHandle = int(C.NET_DVR_SetupAlarmChan_V41(C.LONG(device.loginId), &struAlarmParam))
if device.alarmHandle < 0 {
return device.HKErr("setup alarm chan")
}
return nil
}
func (device *HKDevice) StopListenAlarmMsg() error {
if C.NET_DVR_CloseAlarmChan_V30(C.LONG(device.alarmHandle)) != C.TRUE {
return device.HKErr("stop alarm chan")
}
return nil
}
// HKErr Detect success of operation
func (device *HKDevice) HKErr(operation string) error {
errno := int64(C.NET_DVR_GetLastError())
if errno > 0 {
reMsg := fmt.Sprintf("%s:%s摄像头失败,失败代码号:%d", device.ip, operation, errno)
return errors.New(reMsg)
}
return nil
}
// // Login hk device loin
func (device *HKDevice) PTZControlWithSpeed(dwPTZCommand, dwStop, dwSpeed int) (bool, error) {
// init data
if C.NET_DVR_PTZControlWithSpeed(C.LONG(device.loginId), C.DWORD(dwPTZCommand), C.DWORD(dwStop), C.DWORD(dwSpeed)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControlWithSpeed_Other(lChannel, dwPTZCommand, dwStop, dwSpeed int) (bool, error) {
if C.NET_DVR_PTZControlWithSpeed_Other(C.LONG(device.loginId), C.LONG(lChannel), C.DWORD(dwPTZCommand), C.DWORD(dwStop), C.DWORD(dwSpeed)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControl(dwPTZCommand, dwStop int) (bool, error) {
// init data
if C.NET_DVR_PTZControl(C.LONG(device.loginId), C.DWORD(dwPTZCommand), C.DWORD(dwStop)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
// // Login hk device loin
func (device *HKDevice) PTZControl_Other(lChannel, dwPTZCommand, dwStop int) (bool, error) {
// init data
if C.NET_DVR_PTZControl_Other(C.LONG(device.loginId), C.LONG(lChannel), C.DWORD(dwPTZCommand), C.DWORD(dwStop)) != C.TRUE {
return false, nil
}
log.Println("control success")
return true, nil
}
func (device *HKDevice) RealPlay_V40(ChannelId int, receiver *Receiver) (int, error) {
previewInfo := C.NET_DVR_PREVIEWINFO{}
previewInfo.hPlayWnd = nil
previewInfo.lChannel = C.LONG(ChannelId)
previewInfo.dwStreamType = C.DWORD(0)
previewInfo.dwLinkMode = C.DWORD(0)
previewInfo.bBlocked = C.DWORD(0)
previewInfo.byProtoType = C.BYTE(0)
// 存储receiver指针并获取安全ID
receiverID := storePointer(receiver)
device.receiverID = receiverID
//LONG NET_DVR_RealPlay_V40(LONG lUserID, LPNET_DVR_PREVIEWINFO lpPreviewInfo, REALDATACALLBACK fRealDataCallBack_V30, void* pUser);
device.lRealHandle = int(C.NET_DVR_RealPlay_V40(C.LONG(device.loginId), &previewInfo, C.REALDATACALLBACK(C.RealDataCallBack_V30), unsafe.Pointer(receiverID)))
log.Println("Play success", device.lRealHandle)
return device.lRealHandle, nil
}
// StopRealPlay 停止预览
func (device *HKDevice) StopRealPlay() {
if device.lRealHandle != 0 {
C.NET_DVR_StopRealPlay(C.LONG(device.lRealHandle))
device.lRealHandle = 0
// 清理指针映射
if device.receiverID != 0 {
removePointer(device.receiverID)
device.receiverID = 0
}
log.Println("Stop preview success")
}
}

416
plugin/hiksdk/pkg/audio.go Normal file
View File

@@ -0,0 +1,416 @@
package pkg
import (
"encoding/hex"
"fmt"
"strings"
"time"
"unsafe"
"github.com/bluenviron/mediacommon/pkg/bits"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
. "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
type RTPData struct {
Sample
Packets util.ReuseArray[rtp.Packet]
}
func (r *RTPData) Recycle() {
r.RecyclableMemory.Recycle()
r.Packets.Reset()
}
func (r *RTPData) String() (s string) {
for p := range r.Packets.RangePoint {
s += fmt.Sprintf("t: %d, s: %d, p: %02X %d\n", p.Timestamp, p.SequenceNumber, p.Payload[0:2], len(p.Payload))
}
return
}
func (r *RTPData) GetSize() (s int) {
for p := range r.Packets.RangePoint {
s += p.MarshalSize()
}
return
}
type (
RTPCtx struct {
webrtc.RTPCodecParameters
Fmtp map[string]string
SequenceNumber uint16
SSRC uint32
}
PCMACtx struct {
RTPCtx
*codec.PCMACtx
}
PCMUCtx struct {
RTPCtx
*codec.PCMUCtx
}
OPUSCtx struct {
RTPCtx
*codec.OPUSCtx
}
AACCtx struct {
RTPCtx
*codec.AACCtx
SizeLength int // 通常为13
IndexLength int
IndexDeltaLength int
}
IRTPCtx interface {
GetRTPCodecParameter() webrtc.RTPCodecParameters
}
)
func (r *RTPCtx) ParseFmtpLine(cp *webrtc.RTPCodecParameters) {
r.RTPCodecParameters = *cp
r.Fmtp = make(map[string]string)
kvs := strings.Split(r.SDPFmtpLine, ";")
for _, kv := range kvs {
if kv = strings.TrimSpace(kv); kv == "" {
continue
}
if key, value, found := strings.Cut(kv, "="); found {
r.Fmtp[strings.TrimSpace(key)] = strings.TrimSpace(value)
}
}
}
func (r *RTPCtx) GetInfo() string {
return r.GetRTPCodecParameter().SDPFmtpLine
}
func (r *AACCtx) GetInfo() string {
return r.AACCtx.GetInfo()
}
func (r *OPUSCtx) GetInfo() string {
return r.OPUSCtx.GetInfo()
}
func (r *RTPCtx) GetRTPCodecParameter() webrtc.RTPCodecParameters {
return r.RTPCodecParameters
}
func (r *RTPData) Append(ctx *RTPCtx, ts uint32, payload []byte) *rtp.Packet {
ctx.SequenceNumber++
r.Packets = append(r.Packets, rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: ts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: payload,
})
return &r.Packets[len(r.Packets)-1]
}
var _ IAVFrame = (*AudioFrame)(nil)
type AudioFrame struct {
RTPData
}
func (r *AudioFrame) Parse(data IAVFrame) (err error) {
input := data.(*AudioFrame)
r.Packets = append(r.Packets[:0], input.Packets...)
return
}
func payloadLengthInfoDecode(buf []byte) (int, int, error) {
lb := len(buf)
l := 0
n := 0
for {
if (lb - n) == 0 {
return 0, 0, fmt.Errorf("not enough bytes")
}
b := buf[n]
n++
l += int(b)
if b != 255 {
break
}
}
return l, n, nil
}
func (r *AudioFrame) Demux() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
data := r.GetAudioData()
// 从编解码器上下文获取 MimeType
var mimeType string
if rtpCtx, ok := r.ICodecCtx.(IRTPCtx); ok {
mimeType = rtpCtx.GetRTPCodecParameter().MimeType
}
switch mimeType {
case "audio/MP4A-LATM":
var fragments util.Memory
var fragmentsExpected int
var fragmentsSize int
for packet := range r.Packets.RangePoint {
if len(packet.Payload) == 0 {
continue
}
if packet.Padding {
packet.Padding = false
}
buf := packet.Payload
if fragments.Size == 0 {
pl, n, err := payloadLengthInfoDecode(buf)
if err != nil {
return err
}
buf = buf[n:]
bl := len(buf)
if pl <= bl {
data.PushOne(buf[:pl])
// there could be other data, due to otherDataPresent. Ignore it.
} else {
if pl > 5*1024 {
fragments = util.Memory{} // discard pending fragments
return fmt.Errorf("access unit size (%d) is too big, maximum is %d",
pl, 5*1024)
}
fragments.PushOne(buf)
fragmentsSize = pl
fragmentsExpected = pl - bl
continue
}
} else {
bl := len(buf)
if fragmentsExpected > bl {
fragments.PushOne(buf)
fragmentsExpected -= bl
continue
}
fragments.PushOne(buf[:fragmentsExpected])
// there could be other data, due to otherDataPresent. Ignore it.
data.Push(fragments.Buffers...)
if fragments.Size != fragmentsSize {
return fmt.Errorf("fragmented AU size is not correct %d != %d", data.Size, fragmentsSize)
}
fragments = util.Memory{}
}
}
case "audio/MPEG4-GENERIC":
var fragments util.Memory
for packet := range r.Packets.RangePoint {
if len(packet.Payload) < 2 {
continue
}
auHeaderLen := util.ReadBE[int](packet.Payload[:2])
if auHeaderLen == 0 {
data.PushOne(packet.Payload)
} else {
dataLens, err := r.readAUHeaders(r.ICodecCtx.(*AACCtx), packet.Payload[2:], auHeaderLen)
if err != nil {
return err
}
payload := packet.Payload[2:]
pos := auHeaderLen >> 3
if (auHeaderLen % 8) != 0 {
pos++
}
payload = payload[pos:]
if fragments.Size == 0 {
if packet.Marker {
for _, dataLen := range dataLens {
if len(payload) < int(dataLen) {
return fmt.Errorf("invalid data len %d", dataLen)
}
data.PushOne(payload[:dataLen])
payload = payload[dataLen:]
}
} else {
if len(dataLens) != 1 {
return fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.PushOne(payload)
}
} else {
if len(dataLens) != 1 {
return fmt.Errorf("a fragmented packet can only contain one AU")
}
fragments.PushOne(payload)
if !packet.Header.Marker {
continue
}
if uint64(fragments.Size) != dataLens[0] {
return fmt.Errorf("fragmented AU size is not correct %d != %d", dataLens[0], fragments.Size)
}
data.Push(fragments.Buffers...)
fragments = util.Memory{}
}
}
break
}
default:
for packet := range r.Packets.RangePoint {
data.PushOne(packet.Payload)
}
}
return nil
}
func (r *AudioFrame) Mux(from *Sample) (err error) {
data := from.Raw.(*AudioData)
var ctx *RTPCtx
var lastPacket *rtp.Packet
switch base := from.GetBase().(type) {
case *codec.AACCtx:
var c *AACCtx
if r.ICodecCtx == nil {
c = &AACCtx{}
c.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
c.AACCtx = base
c.MimeType = "audio/MPEG4-GENERIC"
c.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=%s", hex.EncodeToString(c.ConfigBytes))
c.IndexLength = 3
c.IndexDeltaLength = 3
c.SizeLength = 13
c.RTPCtx.Channels = uint16(base.GetChannels())
c.PayloadType = 97
c.ClockRate = uint32(base.CodecData.SampleRate())
r.ICodecCtx = c
} else {
c = r.ICodecCtx.(*AACCtx)
}
ctx = &c.RTPCtx
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
//AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度又因为单个auheader字节长度2字节所以再除以2就是auheader的个数。
auHeaderLen := []byte{0x00, 0x10, (byte)((data.Size & 0x1fe0) >> 5), (byte)((data.Size & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3
for reader := data.NewReader(); reader.Length > 0; {
payloadLen := MTUSize
if reader.Length+4 < MTUSize {
payloadLen = reader.Length + 4
}
mem := r.NextN(payloadLen)
copy(mem, auHeaderLen)
reader.Read(mem[4:])
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
return
case *codec.PCMACtx:
if r.ICodecCtx == nil {
var ctx PCMACtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMACtx = base
ctx.MimeType = webrtc.MimeTypePCMA
ctx.PayloadType = 8
ctx.ClockRate = uint32(ctx.SampleRate)
r.ICodecCtx = &ctx
} else {
ctx = &r.ICodecCtx.(*PCMACtx).RTPCtx
}
case *codec.PCMUCtx:
if r.ICodecCtx == nil {
var ctx PCMUCtx
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
ctx.PCMUCtx = base
ctx.MimeType = webrtc.MimeTypePCMU
ctx.PayloadType = 0
ctx.ClockRate = uint32(ctx.SampleRate)
r.ICodecCtx = &ctx
} else {
ctx = &r.ICodecCtx.(*PCMUCtx).RTPCtx
}
}
pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second)
if reader := data.NewReader(); reader.Length > MTUSize {
for reader.Length > 0 {
payloadLen := MTUSize
if reader.Length < MTUSize {
payloadLen = reader.Length
}
mem := r.NextN(payloadLen)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
lastPacket.Header.Marker = true
return
}
func (r *AudioFrame) readAUHeaders(ctx *AACCtx, buf []byte, headersLen int) ([]uint64, error) {
firstRead := false
count := 0
for i := 0; i < headersLen; {
if i == 0 {
i += ctx.SizeLength
i += ctx.IndexLength
} else {
i += ctx.SizeLength
i += ctx.IndexDeltaLength
}
count++
}
dataLens := make([]uint64, count)
pos := 0
i := 0
for headersLen > 0 {
dataLen, err := bits.ReadBits(buf, &pos, ctx.SizeLength)
if err != nil {
return nil, err
}
headersLen -= ctx.SizeLength
if !firstRead {
firstRead = true
if ctx.IndexLength > 0 {
auIndex, err := bits.ReadBits(buf, &pos, ctx.IndexLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexLength
if auIndex != 0 {
return nil, fmt.Errorf("AU-index different than zero is not supported")
}
}
} else if ctx.IndexDeltaLength > 0 {
auIndexDelta, err := bits.ReadBits(buf, &pos, ctx.IndexDeltaLength)
if err != nil {
return nil, err
}
headersLen -= ctx.IndexDeltaLength
if auIndexDelta != 0 {
return nil, fmt.Errorf("AU-index-delta different than zero is not supported")
}
}
dataLens[i] = dataLen
i++
}
return dataLens, nil
}

View File

@@ -0,0 +1,28 @@
package pkg
type Device interface {
Login() (int, error)
LoginV4() (int, error)
GetDeiceInfo() (*DeviceInfo, error)
GetChannelName() (map[int]string, error)
Logout() error
SetAlarmCallBack() error
StartListenAlarmMsg() error
StopListenAlarmMsg() error
PTZControlWithSpeed(dwPTZCommand, dwStop, dwSpeed int) (bool, error)
PTZControlWithSpeed_Other(lChannel, dwPTZCommand, dwStop, dwSpeed int) (bool, error)
PTZControl(dwPTZCommand, dwStop int) (bool, error)
PTZControl_Other(lChannel, dwPTZCommand, dwStop int) (bool, error)
GetChannelPTZ(channel int)
RealPlay_V40(ChannelId int,receiver *Receiver) (int, error)
StopRealPlay()
}
type DeviceInfo struct {
IP string
Port int
UserName string
Password string
DeviceID string //序列号
DeviceName string //DVR名称
ByChanNum int //通道数量
}

View File

@@ -0,0 +1,164 @@
package pkg
import (
"fmt"
"io"
mpegps "m7s.live/v5/pkg/format/ps"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
)
type ChanReader chan []byte
func (r ChanReader) Read(buf []byte) (n int, err error) {
b, ok := <-r
if !ok {
return 0, io.EOF
}
copy(buf, b)
return len(b), nil
}
type Receiver struct {
task.Task
*util.BufReader
SSRC uint32 // RTP SSRC
PSMouth chan []byte // 直接处理PS数据的通道
psBuffer []byte // PS数据缓冲区用于处理跨包的PS起始码
}
type PSReceiver struct {
Device Device // 设备
ChannelId int // 通道号
Receiver
mpegps.MpegPsDemuxer
}
func (p *PSReceiver) Start() error {
err := p.Receiver.Start()
if err == nil {
p.Using(p.Publisher)
}
// 初始化播放控制通道(如果未初始化)
p.Device.RealPlay_V40(p.ChannelId, &p.Receiver)
return err
}
func (p *PSReceiver) Run() error {
p.MpegPsDemuxer.Allocator = util.NewScalableMemoryAllocator(1 << util.MinPowerOf2)
p.Using(p.MpegPsDemuxer.Allocator)
// 确保Publisher已设置
if p.MpegPsDemuxer.Publisher == nil {
return fmt.Errorf("Publisher未设置")
}
err := p.MpegPsDemuxer.Feed(p.BufReader)
return err
}
func (p *PSReceiver) Dispose() {
p.Device.StopRealPlay()
// 停止设备播放
}
func (p *Receiver) Start() (err error) {
p.PSMouth = make(chan []byte, 500) // 增加PS数据通道缓冲区到500避免数据丢失
psReader := (ChanReader)(p.PSMouth) // 直接使用PS数据通道
p.BufReader = util.NewBufReader(psReader)
return
}
func (p *Receiver) ReadPSData(data util.Buffer) (err error) {
// 将新数据添加到缓冲区
p.psBuffer = append(p.psBuffer, data...)
// 处理缓冲区中的完整PS包
for {
syncedData, remaining := p.extractSynchronizedPSData(p.psBuffer)
if syncedData == nil {
// 没有找到完整的PS包保留剩余数据等待更多数据
p.psBuffer = remaining
break
}
// 发送同步后的PS数据到处理通道
select {
case p.PSMouth <- syncedData:
// 成功发送数据到PS处理通道
// fmt.Printf("发送同步PS数据到通道长度: %d\n", len(syncedData))
default:
// 通道满了,跳过这个数据包
fmt.Printf("PS通道满了跳过数据包当前缓冲区大小: %d/%d\n", len(p.PSMouth), cap(p.PSMouth))
// 跳过当前数据包,但不返回错误,避免阻塞
}
// 更新缓冲区为剩余数据
p.psBuffer = remaining
}
return nil
}
// extractSynchronizedPSData 从缓冲区中提取同步的PS数据包
func (p *Receiver) extractSynchronizedPSData(buffer []byte) ([]byte, []byte) {
if len(buffer) < 4 {
return nil, buffer // 数据不足,返回所有数据等待更多
}
// 寻找PS起始码
startIndex := -1
for i := 0; i <= len(buffer)-4; i++ {
if buffer[i] == 0x00 && buffer[i+1] == 0x00 && buffer[i+2] == 0x01 {
// 检查第四个字节是否为有效的PS起始码
startCode := uint32(buffer[i])<<24 | uint32(buffer[i+1])<<16 |
uint32(buffer[i+2])<<8 | uint32(buffer[i+3])
switch startCode {
case mpegps.StartCodePS, mpegps.StartCodeVideo, mpegps.StartCodeVideo1,
mpegps.StartCodeVideo2, mpegps.StartCodeAudio, mpegps.StartCodeMAP,
mpegps.StartCodeSYS, mpegps.PrivateStreamCode:
startIndex = i
// fmt.Println("在数据源头找到PS起始码:", fmt.Sprintf("0x%08x", startCode))
goto found
}
}
}
found:
if startIndex == -1 {
// 没有找到有效起始码
if len(buffer) > 3 {
// 保留最后3个字节丢弃其余数据
return nil, buffer[len(buffer)-3:]
}
return nil, buffer
}
// 寻找下一个起始码来确定当前包的结束位置
nextStartIndex := -1
for i := startIndex + 4; i <= len(buffer)-4; i++ {
if buffer[i] == 0x00 && buffer[i+1] == 0x00 && buffer[i+2] == 0x01 {
startCode := uint32(buffer[i])<<24 | uint32(buffer[i+1])<<16 |
uint32(buffer[i+2])<<8 | uint32(buffer[i+3])
switch startCode {
case mpegps.StartCodePS, mpegps.StartCodeVideo, mpegps.StartCodeVideo1,
mpegps.StartCodeVideo2, mpegps.StartCodeAudio, mpegps.StartCodeMAP,
mpegps.StartCodeSYS, mpegps.PrivateStreamCode:
nextStartIndex = i
goto nextFound
}
}
}
nextFound:
if nextStartIndex == -1 {
// 没有找到下一个起始码,返回从当前起始码到缓冲区末尾的所有数据
return buffer[startIndex:], nil
}
// 返回从当前起始码到下一个起始码之间的数据
return buffer[startIndex:nextStartIndex], buffer[nextStartIndex:]
}

493
plugin/hiksdk/pkg/video.go Normal file
View File

@@ -0,0 +1,493 @@
package pkg
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"slices"
"time"
"unsafe"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/codec/h265parser"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
. "m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/util"
)
type (
H26xCtx struct {
RTPCtx
seq uint16
dtsEst util.DTSEstimator
}
H264Ctx struct {
H26xCtx
*codec.H264Ctx
}
H265Ctx struct {
H26xCtx
*codec.H265Ctx
DONL bool
}
AV1Ctx struct {
RTPCtx
*codec.AV1Ctx
}
VP9Ctx struct {
RTPCtx
}
VideoFrame struct {
RTPData
}
)
var (
_ IAVFrame = (*VideoFrame)(nil)
_ IVideoCodecCtx = (*H264Ctx)(nil)
_ IVideoCodecCtx = (*H265Ctx)(nil)
_ IVideoCodecCtx = (*AV1Ctx)(nil)
)
const (
H265_NALU_AP = h265parser.NAL_UNIT_UNSPECIFIED_48
H265_NALU_FU = h265parser.NAL_UNIT_UNSPECIFIED_49
startBit = 1 << 7
endBit = 1 << 6
MTUSize = 1460
)
func (r *VideoFrame) Parse(data IAVFrame) (err error) {
input := data.(*VideoFrame)
r.Packets = append(r.Packets[:0], input.Packets...)
return
}
func (r *VideoFrame) Recycle() {
r.RecyclableMemory.Recycle()
r.Packets.Reset()
}
func (r *VideoFrame) CheckCodecChange() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
old := r.ICodecCtx
// 解复用数据
if err = r.Demux(); err != nil {
return
}
// 处理时间戳和序列号
pts := r.Packets[0].Timestamp
nalus := r.Raw.(*Nalus)
switch ctx := old.(type) {
case *H264Ctx:
dts := ctx.dtsEst.Feed(pts)
r.SetDTS(time.Duration(dts))
r.SetPTS(time.Duration(pts))
// 检查 SPS、PPS 和 IDR 帧
var sps, pps []byte
var hasSPSPPS bool
for nalu := range nalus.RangePoint {
nalType := codec.ParseH264NALUType(nalu.Buffers[0][0])
switch nalType {
case h264parser.NALU_SPS:
sps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h264parser.NALU_PPS:
pps = nalu.ToBytes()
defer nalus.Remove(nalu)
case codec.NALU_IDR_Picture:
r.IDR = true
}
}
// 如果发现新的 SPS/PPS更新编解码器上下文
if hasSPSPPS = sps != nil && pps != nil; hasSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) {
var newCodecData h264parser.CodecData
if newCodecData, err = h264parser.NewCodecDataFromSPSAndPPS(sps, pps); err != nil {
return
}
newCtx := &H264Ctx{
H26xCtx: ctx.H26xCtx,
H264Ctx: &codec.H264Ctx{
CodecData: newCodecData,
},
}
// 保持原有的 RTP 参数
if oldCtx, ok := old.(*H264Ctx); ok {
newCtx.RTPCtx = oldCtx.RTPCtx
}
r.ICodecCtx = newCtx
} else {
// 如果是 IDR 帧但没有 SPS/PPS需要插入
if r.IDR && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 {
spsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, spsRTP, ppsRTP)
}
}
// 更新序列号
for p := range r.Packets.RangePoint {
p.SequenceNumber = ctx.seq
ctx.seq++
}
case *H265Ctx:
dts := ctx.dtsEst.Feed(pts)
r.SetDTS(time.Duration(dts))
r.SetPTS(time.Duration(pts))
// 检查 VPS、SPS、PPS 和 IDR 帧
var vps, sps, pps []byte
var hasVPSSPSPPS bool
for nalu := range nalus.RangePoint {
switch codec.ParseH265NALUType(nalu.Buffers[0][0]) {
case h265parser.NAL_UNIT_VPS:
vps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_SPS:
sps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_PPS:
pps = nalu.ToBytes()
defer nalus.Remove(nalu)
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_IDR_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_IDR_N_LP,
h265parser.NAL_UNIT_CODED_SLICE_CRA:
r.IDR = true
}
}
// 如果发现新的 VPS/SPS/PPS更新编解码器上下文
if hasVPSSPSPPS = vps != nil && sps != nil && pps != nil; hasVPSSPSPPS && (len(ctx.Record) == 0 || !bytes.Equal(vps, ctx.VPS()) || !bytes.Equal(sps, ctx.SPS()) || !bytes.Equal(pps, ctx.PPS())) {
var newCodecData h265parser.CodecData
if newCodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(vps, sps, pps); err != nil {
return
}
newCtx := &H265Ctx{
H26xCtx: ctx.H26xCtx,
H265Ctx: &codec.H265Ctx{
CodecData: newCodecData,
},
}
// 保持原有的 RTP 参数
if oldCtx, ok := old.(*H265Ctx); ok {
newCtx.RTPCtx = oldCtx.RTPCtx
}
r.ICodecCtx = newCtx
} else {
// 如果是 IDR 帧但没有 VPS/SPS/PPS需要插入
if r.IDR && len(ctx.VPS()) > 0 && len(ctx.SPS()) > 0 && len(ctx.PPS()) > 0 {
vpsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.VPS(),
}
spsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.SPS(),
}
ppsRTP := rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: ctx.SequenceNumber,
Timestamp: pts,
SSRC: ctx.SSRC,
PayloadType: uint8(ctx.PayloadType),
},
Payload: ctx.PPS(),
}
r.Packets = slices.Insert(r.Packets, 0, vpsRTP, spsRTP, ppsRTP)
}
}
// 更新序列号
for p := range r.Packets.RangePoint {
p.SequenceNumber = ctx.seq
ctx.seq++
}
}
return
}
func (h264 *H264Ctx) GetInfo() string {
return h264.SDPFmtpLine
}
func (h265 *H265Ctx) GetInfo() string {
return h265.SDPFmtpLine
}
func (av1 *AV1Ctx) GetInfo() string {
return av1.SDPFmtpLine
}
func (r *VideoFrame) Mux(baseFrame *Sample) error {
// 获取编解码器上下文
codecCtx := r.ICodecCtx
if codecCtx == nil {
switch base := baseFrame.GetBase().(type) {
case *codec.H264Ctx:
var ctx H264Ctx
ctx.H264Ctx = base
ctx.PayloadType = 96
ctx.MimeType = webrtc.MimeTypeH264
ctx.ClockRate = 90000
spsInfo := ctx.SPSInfo
ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc)
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
codecCtx = &ctx
case *codec.H265Ctx:
var ctx H265Ctx
ctx.H265Ctx = base
ctx.PayloadType = 98
ctx.MimeType = webrtc.MimeTypeH265
ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS()), base64.StdEncoding.EncodeToString(ctx.PPS()), base64.StdEncoding.EncodeToString(ctx.VPS()))
ctx.ClockRate = 90000
ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx)))
codecCtx = &ctx
}
r.ICodecCtx = codecCtx
}
// 获取时间戳信息
pts := uint32(baseFrame.GetPTS())
switch c := codecCtx.(type) {
case *H264Ctx:
ctx := &c.RTPCtx
var lastPacket *rtp.Packet
if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 {
r.Append(ctx, pts, c.SPS())
r.Append(ctx, pts, c.PPS())
}
for nalu := range baseFrame.Raw.(*Nalus).RangePoint {
if reader := nalu.NewReader(); reader.Length > MTUSize {
payloadLen := MTUSize
if reader.Length+1 < payloadLen {
payloadLen = reader.Length + 1
}
//fu-a
mem := r.NextN(payloadLen)
reader.Read(mem[1:])
fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f
mem[0], mem[1] = fuaHead, naluType|startBit
lastPacket = r.Append(ctx, pts, mem)
for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) {
if reader.Length+2 < payloadLen {
payloadLen = reader.Length + 2
}
mem = r.NextN(payloadLen)
reader.Read(mem[2:])
mem[0], mem[1] = fuaHead, naluType
}
lastPacket.Payload[1] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
}
lastPacket.Header.Marker = true
case *H265Ctx:
ctx := &c.RTPCtx
var lastPacket *rtp.Packet
if baseFrame.IDR && len(c.RecordInfo.SPS) > 0 && len(c.RecordInfo.PPS) > 0 && len(c.RecordInfo.VPS) > 0 {
r.Append(ctx, pts, c.VPS())
r.Append(ctx, pts, c.SPS())
r.Append(ctx, pts, c.PPS())
}
for nalu := range baseFrame.Raw.(*Nalus).RangePoint {
if reader := nalu.NewReader(); reader.Length > MTUSize {
var b0, b1 byte
_ = reader.ReadByteTo(&b0, &b1)
//fu
naluType := byte(codec.ParseH265NALUType(b0))
b0 = (byte(H265_NALU_FU) << 1) | (b0 & 0b10000001)
payloadLen := MTUSize
if reader.Length+3 < payloadLen {
payloadLen = reader.Length + 3
}
mem := r.NextN(payloadLen)
reader.Read(mem[3:])
mem[0], mem[1], mem[2] = b0, b1, naluType|startBit
lastPacket = r.Append(ctx, pts, mem)
for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) {
if reader.Length+3 < payloadLen {
payloadLen = reader.Length + 3
}
mem = r.NextN(payloadLen)
reader.Read(mem[3:])
mem[0], mem[1], mem[2] = b0, b1, naluType
}
lastPacket.Payload[2] |= endBit
} else {
mem := r.NextN(reader.Length)
reader.Read(mem)
lastPacket = r.Append(ctx, pts, mem)
}
}
lastPacket.Header.Marker = true
}
return nil
}
func (r *VideoFrame) Demux() (err error) {
if len(r.Packets) == 0 {
return ErrSkip
}
switch c := r.ICodecCtx.(type) {
case *H264Ctx:
nalus := r.GetNalus()
nalu := nalus.GetNextPointer()
var naluType codec.H264NALUType
gotNalu := func() {
if nalu.Size > 0 {
nalu = nalus.GetNextPointer()
}
}
for packet := range r.Packets.RangePoint {
if len(packet.Payload) < 2 {
continue
}
if packet.Padding {
packet.Padding = false
}
if len(packet.Payload) == 0 {
continue
}
b0 := packet.Payload[0]
if t := codec.ParseH264NALUType(b0); t < 24 {
nalu.PushOne(packet.Payload)
gotNalu()
} else {
offset := t.Offset()
switch t {
case codec.NALU_STAPA, codec.NALU_STAPB:
if len(packet.Payload) <= offset {
return fmt.Errorf("invalid nalu size %d", len(packet.Payload))
}
for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); {
if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize {
nalu.PushOne(buffer.ReadN(nextSize))
gotNalu()
} else {
return fmt.Errorf("invalid nalu size %d", nextSize)
}
}
case codec.NALU_FUA, codec.NALU_FUB:
b1 := packet.Payload[1]
if util.Bit1(b1, 0) {
naluType.Parse(b1)
nalu.PushOne([]byte{naluType.Or(b0 & 0x60)})
}
if nalu.Size > 0 {
nalu.PushOne(packet.Payload[offset:])
} else {
continue
}
if util.Bit1(b1, 1) {
gotNalu()
}
default:
return fmt.Errorf("unsupported nalu type %d", t)
}
}
}
nalus.Reduce()
return nil
case *H265Ctx:
nalus := r.GetNalus()
nalu := nalus.GetNextPointer()
gotNalu := func() {
if nalu.Size > 0 {
nalu = nalus.GetNextPointer()
}
}
for _, packet := range r.Packets {
if len(packet.Payload) == 0 {
continue
}
b0 := packet.Payload[0]
if t := codec.ParseH265NALUType(b0); t < H265_NALU_AP {
nalu.PushOne(packet.Payload)
gotNalu()
} else {
var buffer = util.Buffer(packet.Payload)
switch t {
case H265_NALU_AP:
buffer.ReadUint16()
if c.DONL {
buffer.ReadUint16()
}
for buffer.CanRead() {
nalu.PushOne(buffer.ReadN(int(buffer.ReadUint16())))
gotNalu()
}
if c.DONL {
buffer.ReadByte()
}
case H265_NALU_FU:
if buffer.Len() < 3 {
return io.ErrShortBuffer
}
first3 := buffer.ReadN(3)
fuHeader := first3[2]
if c.DONL {
buffer.ReadUint16()
}
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
nalu.PushOne([]byte{first3[0]&0b10000001 | (naluType << 1), first3[1]})
}
nalu.PushOne(buffer)
if util.Bit1(fuHeader, 1) {
gotNalu()
}
default:
return fmt.Errorf("unsupported nalu type %d", t)
}
}
}
nalus.Reduce()
return nil
}
return ErrUnsupportCodec
}

15
plugin/hiksdk/readme.md Normal file
View File

@@ -0,0 +1,15 @@
需要将lib下的linux或windows下的文件复制到main.go同级目录下,才能正常运行
配置文件
hik:
loglevel: debug //根据实际情况修改
client:
- ip: "172.16.9.35"
username: "admin"
password: "123456"
port: 8000
- ip: "172.16.9.200"
username: "admin"
password: "123456"
port: 8000

View File

@@ -449,11 +449,11 @@ func (nc *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
if sid, ok := msg.(HaveStreamID); ok {
head.MessageStreamID = sid.GetStreamID()
}
nc.SetWriteDeadline(time.Now().Add(time.Second * 5)) // 设置写入超时时间为5秒
return nc.sendChunk(util.NewMemory(nc.tmpBuf), head, RTMP_CHUNK_HEAD_12)
}
func (nc *NetConnection) sendChunk(mem util.Memory, head *ChunkHeader, headType byte) (err error) {
nc.SetWriteDeadline(time.Now().Add(time.Second * 5)) // 设置写入超时时间为5秒
head.WriteTo(headType, &nc.chunkHeaderBuf)
defer func(reuse net.Buffers) {
nc.sendBuffers = reuse

View File

@@ -16,7 +16,6 @@ type IRTPReader interface {
type RTPUDPReader struct {
io.Reader
RTPReorder[*rtp.Packet]
UdpCacheSize int
}
func NewRTPUDPReader(r io.Reader) *RTPUDPReader {
@@ -24,11 +23,11 @@ func NewRTPUDPReader(r io.Reader) *RTPUDPReader {
}
func (r *RTPUDPReader) Read(packet *rtp.Packet) error {
for {
ordered := r.Pop()
var ordered *rtp.Packet
for ordered == nil {
ordered = r.Pop()
if ordered != nil {
*packet = *ordered
return nil
break
}
var buf [MTUSize]byte
var pack rtp.Packet
@@ -40,8 +39,10 @@ func (r *RTPUDPReader) Read(packet *rtp.Packet) error {
if err != nil {
return err
}
r.Push(pack.SequenceNumber, &pack)
ordered = r.Push(pack.SequenceNumber, &pack)
}
*packet = *ordered
return nil
}
type RTPTCPReader struct {