feat: add device system

This commit is contained in:
langhuihui
2024-09-23 08:20:09 +08:00
parent ddf8fc2c73
commit 8901f41384
23 changed files with 2102 additions and 1164 deletions

62
api.go
View File

@@ -543,3 +543,65 @@ func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (r
conf.ParseModifyFile(modified)
return
}
func (s *Server) GetDeviceList(ctx context.Context, req *emptypb.Empty) (res *pb.DeviceListResponse, err error) {
res = &pb.DeviceListResponse{}
for device := range s.Devices.Range {
res.Data = append(res.Data, &pb.DeviceInfo{
Name: device.Name,
CreateTime: timestamppb.New(device.CreatedAt),
UpdateTime: timestamppb.New(device.UpdatedAt),
Type: uint32(device.Type),
PullURL: device.PullURL,
ParentID: uint32(device.ParentID),
ID: uint32(device.ID),
})
}
return
}
func (s *Server) AddDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.SuccessResponse, err error) {
device := &Device{
server: s,
Name: req.Name,
Type: byte(req.Type),
PullURL: req.PullURL,
ParentID: uint(req.ParentID),
}
if s.DB == nil {
err = pkg.ErrNoDB
return
}
s.DB.Create(device)
s.Devices.Add(device)
res = &pb.SuccessResponse{}
return
}
func (s *Server) UpdateDevice(ctx context.Context, req *pb.DeviceInfo) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
target := &Device{}
target.ID = uint(req.ID)
target.Name = req.Name
target.PullURL = req.PullURL
target.ParentID = uint(req.ParentID)
target.Type = byte(req.Type)
s.DB.Save(target)
res = &pb.SuccessResponse{}
return
}
func (s *Server) RemoveDevice(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
return
}
s.DB.Delete(&Device{}, req.Id)
s.Devices.RemoveByKey(uint32(req.Id))
res = &pb.SuccessResponse{}
return
}

View File

@@ -1,6 +1,8 @@
package m7s
import (
"fmt"
"gorm.io/gorm"
"m7s.live/m7s/v5/pkg/task"
)
@@ -20,15 +22,42 @@ const (
)
type (
IDevice interface {
task.ITask
Pull()
}
Device struct {
server *Server `gorm:"-:all"`
task.Work `gorm:"-:all"`
gorm.Model
ParentID uint
Type byte
StreamURL string
Status byte
Name string
PullURL string
ParentID uint
Type byte
Status byte
Handler IDevice `gorm:"-:all"`
}
DeviceManager struct {
task.Manager[uint32, *Device]
}
)
func (d *Device) GetStreamPath() string {
return fmt.Sprintf("device/%d/%d", d.Type, d.ID)
}
func (d *Device) Start() (err error) {
for plugin := range d.server.Plugins.Range {
if devicePlugin, ok := plugin.handler.(IDevicePlugin); ok {
task := devicePlugin.OnDeviceAdd(d)
if task != nil {
d.AddTask(task)
}
}
}
return
}
func (d *Device) Update() {
d.server.DB.Save(d)
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,10 @@ import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
package m7s;
package global;
option go_package="m7s.live/m7s/v5/pb";
service Global {
service api {
rpc SysInfo (google.protobuf.Empty) returns (SysInfoResponse) {
option (google.api.http) = {
get: "/api/sysinfo"
@@ -91,6 +91,29 @@ service Global {
body: "yaml"
};
}
rpc GetDeviceList (google.protobuf.Empty) returns (DeviceListResponse) {
option (google.api.http) = {
get: "/api/device/list"
};
}
rpc AddDevice (DeviceInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/add"
body: "*"
};
}
rpc RemoveDevice (RequestWithId) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/remove/{id}"
body: "*"
};
}
rpc UpdateDevice (DeviceInfo) returns (SuccessResponse) {
option (google.api.http) = {
post: "/api/device/update"
body: "*"
};
}
}
message GetConfigRequest {
@@ -260,7 +283,7 @@ message VideoTrackInfo {
message SuccessResponse {
int32 code = 1;
string msg = 2;
string message = 2;
}
message RequestWithId {
@@ -298,4 +321,21 @@ message SubscribersResponse {
int32 pageNum = 2;
int32 pageSize = 3;
repeated SubscriberSnapShot list = 4;
}
message DeviceListResponse {
int32 code = 1;
string message = 2;
repeated DeviceInfo data = 3;
}
message DeviceInfo {
uint32 ID = 1;
google.protobuf.Timestamp createTime = 2;
google.protobuf.Timestamp updateTime = 3; // 更新时间
uint32 parentID = 4; // 父设备ID
string name = 5; // 设备名称
uint32 type = 6; // 设备类型
uint32 status = 7; // 设备状态
string pullURL = 8; // 拉流地址
}

View File

@@ -19,10 +19,10 @@ import (
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// GlobalClient is the client API for Global service.
// ApiClient is the client API for Api service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GlobalClient interface {
type ApiClient interface {
SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error)
Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error)
Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
@@ -39,164 +39,204 @@ type GlobalClient interface {
GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error)
GetDeviceList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DeviceListResponse, error)
AddDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
RemoveDevice(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error)
UpdateDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error)
}
type globalClient struct {
type apiClient struct {
cc grpc.ClientConnInterface
}
func NewGlobalClient(cc grpc.ClientConnInterface) GlobalClient {
return &globalClient{cc}
func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
return &apiClient{cc}
}
func (c *globalClient) SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error) {
func (c *apiClient) SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error) {
out := new(SysInfoResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/SysInfo", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/SysInfo", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error) {
func (c *apiClient) Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error) {
out := new(SummaryResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/Summary", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/Summary", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) {
func (c *apiClient) Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/m7s.Global/Shutdown", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/Shutdown", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) {
func (c *apiClient) Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/m7s.Global/Restart", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/Restart", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) TaskTree(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*TaskTreeResponse, error) {
func (c *apiClient) TaskTree(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*TaskTreeResponse, error) {
out := new(TaskTreeResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/TaskTree", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/TaskTree", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error) {
func (c *apiClient) StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error) {
out := new(StreamListResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/StreamList", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/StreamList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) WaitList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*StreamWaitListResponse, error) {
func (c *apiClient) WaitList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*StreamWaitListResponse, error) {
out := new(StreamWaitListResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/WaitList", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/WaitList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) StreamInfo(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamInfoResponse, error) {
func (c *apiClient) StreamInfo(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamInfoResponse, error) {
out := new(StreamInfoResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/StreamInfo", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/StreamInfo", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) GetSubscribers(ctx context.Context, in *SubscribersRequest, opts ...grpc.CallOption) (*SubscribersResponse, error) {
func (c *apiClient) GetSubscribers(ctx context.Context, in *SubscribersRequest, opts ...grpc.CallOption) (*SubscribersResponse, error) {
out := new(SubscribersResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetSubscribers", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/GetSubscribers", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
func (c *apiClient) AudioTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
out := new(TrackSnapShotResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/AudioTrackSnap", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/AudioTrackSnap", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
func (c *apiClient) VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) {
out := new(TrackSnapShotResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/VideoTrackSnap", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/VideoTrackSnap", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) ChangeSubscribe(ctx context.Context, in *ChangeSubscribeRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) ChangeSubscribe(ctx context.Context, in *ChangeSubscribeRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/ChangeSubscribe", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/ChangeSubscribe", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/StopSubscribe", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/StopSubscribe", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
func (c *apiClient) GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
out := new(GetConfigResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetConfig", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/GetConfig", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
func (c *apiClient) GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
out := new(GetConfigResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetFormily", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/GetFormily", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
func (c *apiClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/ModifyConfig", in, out, opts...)
err := c.cc.Invoke(ctx, "/global.api/ModifyConfig", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GlobalServer is the server API for Global service.
// All implementations must embed UnimplementedGlobalServer
func (c *apiClient) GetDeviceList(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DeviceListResponse, error) {
out := new(DeviceListResponse)
err := c.cc.Invoke(ctx, "/global.api/GetDeviceList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) AddDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/AddDevice", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) RemoveDevice(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/RemoveDevice", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) UpdateDevice(ctx context.Context, in *DeviceInfo, opts ...grpc.CallOption) (*SuccessResponse, error) {
out := new(SuccessResponse)
err := c.cc.Invoke(ctx, "/global.api/UpdateDevice", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility
type GlobalServer interface {
type ApiServer interface {
SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error)
Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error)
Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error)
@@ -213,432 +253,536 @@ type GlobalServer interface {
GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error)
mustEmbedUnimplementedGlobalServer()
GetDeviceList(context.Context, *emptypb.Empty) (*DeviceListResponse, error)
AddDevice(context.Context, *DeviceInfo) (*SuccessResponse, error)
RemoveDevice(context.Context, *RequestWithId) (*SuccessResponse, error)
UpdateDevice(context.Context, *DeviceInfo) (*SuccessResponse, error)
mustEmbedUnimplementedApiServer()
}
// UnimplementedGlobalServer must be embedded to have forward compatible implementations.
type UnimplementedGlobalServer struct {
// UnimplementedApiServer must be embedded to have forward compatible implementations.
type UnimplementedApiServer struct {
}
func (UnimplementedGlobalServer) SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error) {
func (UnimplementedApiServer) SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SysInfo not implemented")
}
func (UnimplementedGlobalServer) Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error) {
func (UnimplementedApiServer) Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Summary not implemented")
}
func (UnimplementedGlobalServer) Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error) {
func (UnimplementedApiServer) Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Shutdown not implemented")
}
func (UnimplementedGlobalServer) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) {
func (UnimplementedApiServer) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented")
}
func (UnimplementedGlobalServer) TaskTree(context.Context, *emptypb.Empty) (*TaskTreeResponse, error) {
func (UnimplementedApiServer) TaskTree(context.Context, *emptypb.Empty) (*TaskTreeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TaskTree not implemented")
}
func (UnimplementedGlobalServer) StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error) {
func (UnimplementedApiServer) StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamList not implemented")
}
func (UnimplementedGlobalServer) WaitList(context.Context, *emptypb.Empty) (*StreamWaitListResponse, error) {
func (UnimplementedApiServer) WaitList(context.Context, *emptypb.Empty) (*StreamWaitListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method WaitList not implemented")
}
func (UnimplementedGlobalServer) StreamInfo(context.Context, *StreamSnapRequest) (*StreamInfoResponse, error) {
func (UnimplementedApiServer) StreamInfo(context.Context, *StreamSnapRequest) (*StreamInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamInfo not implemented")
}
func (UnimplementedGlobalServer) GetSubscribers(context.Context, *SubscribersRequest) (*SubscribersResponse, error) {
func (UnimplementedApiServer) GetSubscribers(context.Context, *SubscribersRequest) (*SubscribersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetSubscribers not implemented")
}
func (UnimplementedGlobalServer) AudioTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
func (UnimplementedApiServer) AudioTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AudioTrackSnap not implemented")
}
func (UnimplementedGlobalServer) VideoTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
func (UnimplementedApiServer) VideoTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VideoTrackSnap not implemented")
}
func (UnimplementedGlobalServer) ChangeSubscribe(context.Context, *ChangeSubscribeRequest) (*SuccessResponse, error) {
func (UnimplementedApiServer) ChangeSubscribe(context.Context, *ChangeSubscribeRequest) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ChangeSubscribe not implemented")
}
func (UnimplementedGlobalServer) StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error) {
func (UnimplementedApiServer) StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopSubscribe not implemented")
}
func (UnimplementedGlobalServer) GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
func (UnimplementedApiServer) GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetConfig not implemented")
}
func (UnimplementedGlobalServer) GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
func (UnimplementedApiServer) GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFormily not implemented")
}
func (UnimplementedGlobalServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error) {
func (UnimplementedApiServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ModifyConfig not implemented")
}
func (UnimplementedGlobalServer) mustEmbedUnimplementedGlobalServer() {}
func (UnimplementedApiServer) GetDeviceList(context.Context, *emptypb.Empty) (*DeviceListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetDeviceList not implemented")
}
func (UnimplementedApiServer) AddDevice(context.Context, *DeviceInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddDevice not implemented")
}
func (UnimplementedApiServer) RemoveDevice(context.Context, *RequestWithId) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveDevice not implemented")
}
func (UnimplementedApiServer) UpdateDevice(context.Context, *DeviceInfo) (*SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateDevice not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
// UnsafeGlobalServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to GlobalServer will
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
// result in compilation errors.
type UnsafeGlobalServer interface {
mustEmbedUnimplementedGlobalServer()
type UnsafeApiServer interface {
mustEmbedUnimplementedApiServer()
}
func RegisterGlobalServer(s grpc.ServiceRegistrar, srv GlobalServer) {
s.RegisterService(&Global_ServiceDesc, srv)
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
s.RegisterService(&Api_ServiceDesc, srv)
}
func _Global_SysInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_SysInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).SysInfo(ctx, in)
return srv.(ApiServer).SysInfo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/SysInfo",
FullMethod: "/global.api/SysInfo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).SysInfo(ctx, req.(*emptypb.Empty))
return srv.(ApiServer).SysInfo(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_Summary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_Summary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).Summary(ctx, in)
return srv.(ApiServer).Summary(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/Summary",
FullMethod: "/global.api/Summary",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).Summary(ctx, req.(*emptypb.Empty))
return srv.(ApiServer).Summary(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).Shutdown(ctx, in)
return srv.(ApiServer).Shutdown(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/Shutdown",
FullMethod: "/global.api/Shutdown",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).Shutdown(ctx, req.(*RequestWithId))
return srv.(ApiServer).Shutdown(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Global_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_Restart_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).Restart(ctx, in)
return srv.(ApiServer).Restart(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/Restart",
FullMethod: "/global.api/Restart",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).Restart(ctx, req.(*RequestWithId))
return srv.(ApiServer).Restart(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Global_TaskTree_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_TaskTree_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).TaskTree(ctx, in)
return srv.(ApiServer).TaskTree(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/TaskTree",
FullMethod: "/global.api/TaskTree",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).TaskTree(ctx, req.(*emptypb.Empty))
return srv.(ApiServer).TaskTree(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_StreamList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_StreamList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).StreamList(ctx, in)
return srv.(ApiServer).StreamList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/StreamList",
FullMethod: "/global.api/StreamList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StreamList(ctx, req.(*StreamListRequest))
return srv.(ApiServer).StreamList(ctx, req.(*StreamListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_WaitList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_WaitList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).WaitList(ctx, in)
return srv.(ApiServer).WaitList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/WaitList",
FullMethod: "/global.api/WaitList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).WaitList(ctx, req.(*emptypb.Empty))
return srv.(ApiServer).WaitList(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_StreamInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_StreamInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).StreamInfo(ctx, in)
return srv.(ApiServer).StreamInfo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/StreamInfo",
FullMethod: "/global.api/StreamInfo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StreamInfo(ctx, req.(*StreamSnapRequest))
return srv.(ApiServer).StreamInfo(ctx, req.(*StreamSnapRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_GetSubscribers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_GetSubscribers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SubscribersRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetSubscribers(ctx, in)
return srv.(ApiServer).GetSubscribers(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetSubscribers",
FullMethod: "/global.api/GetSubscribers",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetSubscribers(ctx, req.(*SubscribersRequest))
return srv.(ApiServer).GetSubscribers(ctx, req.(*SubscribersRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_AudioTrackSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_AudioTrackSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).AudioTrackSnap(ctx, in)
return srv.(ApiServer).AudioTrackSnap(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/AudioTrackSnap",
FullMethod: "/global.api/AudioTrackSnap",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).AudioTrackSnap(ctx, req.(*StreamSnapRequest))
return srv.(ApiServer).AudioTrackSnap(ctx, req.(*StreamSnapRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_VideoTrackSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_VideoTrackSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).VideoTrackSnap(ctx, in)
return srv.(ApiServer).VideoTrackSnap(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/VideoTrackSnap",
FullMethod: "/global.api/VideoTrackSnap",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).VideoTrackSnap(ctx, req.(*StreamSnapRequest))
return srv.(ApiServer).VideoTrackSnap(ctx, req.(*StreamSnapRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_ChangeSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_ChangeSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ChangeSubscribeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).ChangeSubscribe(ctx, in)
return srv.(ApiServer).ChangeSubscribe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/ChangeSubscribe",
FullMethod: "/global.api/ChangeSubscribe",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).ChangeSubscribe(ctx, req.(*ChangeSubscribeRequest))
return srv.(ApiServer).ChangeSubscribe(ctx, req.(*ChangeSubscribeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).StopSubscribe(ctx, in)
return srv.(ApiServer).StopSubscribe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/StopSubscribe",
FullMethod: "/global.api/StopSubscribe",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StopSubscribe(ctx, req.(*RequestWithId))
return srv.(ApiServer).StopSubscribe(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Global_GetConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_GetConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetConfig(ctx, in)
return srv.(ApiServer).GetConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetConfig",
FullMethod: "/global.api/GetConfig",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetConfig(ctx, req.(*GetConfigRequest))
return srv.(ApiServer).GetConfig(ctx, req.(*GetConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_GetFormily_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_GetFormily_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetFormily(ctx, in)
return srv.(ApiServer).GetFormily(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetFormily",
FullMethod: "/global.api/GetFormily",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetFormily(ctx, req.(*GetConfigRequest))
return srv.(ApiServer).GetFormily(ctx, req.(*GetConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_ModifyConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_ModifyConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ModifyConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).ModifyConfig(ctx, in)
return srv.(ApiServer).ModifyConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/ModifyConfig",
FullMethod: "/global.api/ModifyConfig",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).ModifyConfig(ctx, req.(*ModifyConfigRequest))
return srv.(ApiServer).ModifyConfig(ctx, req.(*ModifyConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
// Global_ServiceDesc is the grpc.ServiceDesc for Global service.
func _Api_GetDeviceList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).GetDeviceList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/GetDeviceList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetDeviceList(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Api_AddDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeviceInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).AddDevice(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/AddDevice",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).AddDevice(ctx, req.(*DeviceInfo))
}
return interceptor(ctx, in, info, handler)
}
func _Api_RemoveDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).RemoveDevice(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/RemoveDevice",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).RemoveDevice(ctx, req.(*RequestWithId))
}
return interceptor(ctx, in, info, handler)
}
func _Api_UpdateDevice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeviceInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).UpdateDevice(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/global.api/UpdateDevice",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).UpdateDevice(ctx, req.(*DeviceInfo))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Global_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.Global",
HandlerType: (*GlobalServer)(nil),
var Api_ServiceDesc = grpc.ServiceDesc{
ServiceName: "global.api",
HandlerType: (*ApiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SysInfo",
Handler: _Global_SysInfo_Handler,
Handler: _Api_SysInfo_Handler,
},
{
MethodName: "Summary",
Handler: _Global_Summary_Handler,
Handler: _Api_Summary_Handler,
},
{
MethodName: "Shutdown",
Handler: _Global_Shutdown_Handler,
Handler: _Api_Shutdown_Handler,
},
{
MethodName: "Restart",
Handler: _Global_Restart_Handler,
Handler: _Api_Restart_Handler,
},
{
MethodName: "TaskTree",
Handler: _Global_TaskTree_Handler,
Handler: _Api_TaskTree_Handler,
},
{
MethodName: "StreamList",
Handler: _Global_StreamList_Handler,
Handler: _Api_StreamList_Handler,
},
{
MethodName: "WaitList",
Handler: _Global_WaitList_Handler,
Handler: _Api_WaitList_Handler,
},
{
MethodName: "StreamInfo",
Handler: _Global_StreamInfo_Handler,
Handler: _Api_StreamInfo_Handler,
},
{
MethodName: "GetSubscribers",
Handler: _Global_GetSubscribers_Handler,
Handler: _Api_GetSubscribers_Handler,
},
{
MethodName: "AudioTrackSnap",
Handler: _Global_AudioTrackSnap_Handler,
Handler: _Api_AudioTrackSnap_Handler,
},
{
MethodName: "VideoTrackSnap",
Handler: _Global_VideoTrackSnap_Handler,
Handler: _Api_VideoTrackSnap_Handler,
},
{
MethodName: "ChangeSubscribe",
Handler: _Global_ChangeSubscribe_Handler,
Handler: _Api_ChangeSubscribe_Handler,
},
{
MethodName: "StopSubscribe",
Handler: _Global_StopSubscribe_Handler,
Handler: _Api_StopSubscribe_Handler,
},
{
MethodName: "GetConfig",
Handler: _Global_GetConfig_Handler,
Handler: _Api_GetConfig_Handler,
},
{
MethodName: "GetFormily",
Handler: _Global_GetFormily_Handler,
Handler: _Api_GetFormily_Handler,
},
{
MethodName: "ModifyConfig",
Handler: _Global_ModifyConfig_Handler,
Handler: _Api_ModifyConfig_Handler,
},
{
MethodName: "GetDeviceList",
Handler: _Api_GetDeviceList_Handler,
},
{
MethodName: "AddDevice",
Handler: _Api_AddDevice_Handler,
},
{
MethodName: "RemoveDevice",
Handler: _Api_RemoveDevice_Handler,
},
{
MethodName: "UpdateDevice",
Handler: _Api_UpdateDevice_Handler,
},
},
Streams: []grpc.StreamDesc{},

View File

@@ -18,6 +18,7 @@ var (
ErrUnsupportCodec = errors.New("unsupport codec")
ErrMuted = errors.New("muted")
ErrNoTrack = errors.New("no track")
ErrNoDB = errors.New("no db")
ErrLost = errors.New("lost")
ErrRecordSamePath = errors.New("record same path")

View File

@@ -182,6 +182,9 @@ func (mt *Job) run() {
switch tt := child.(type) {
case IChannelTask:
tt.Tick(rev.Interface())
if tt.IsStopped() {
mt.onChildDispose(child)
}
}
if !ok {
if mt.onChildDispose(child); child.checkRetry(child.StopReason()) {

View File

@@ -79,6 +79,7 @@ type (
Post(func() error) *Task
}
IChannelTask interface {
ITask
Tick(any)
}
TaskStarter interface {

View File

@@ -118,6 +118,10 @@ func (c *Collection[K, T]) RemoveByKey(key K) bool {
// item = reflect.New(reflect.TypeOf(item).Elem()).Interface().(T)
// return
// }
func (c *Collection[K, T]) Has(key K) bool {
_, ok := c.Get(key)
return ok
}
func (c *Collection[K, T]) Get(key K) (item T, ok bool) {
if c.L != nil {

View File

@@ -82,6 +82,10 @@ type (
IQUICPlugin interface {
OnQUICConnect(quic.Connection) task.ITask
}
IDevicePlugin interface {
OnDeviceAdd(device *Device) task.ITask
}
)
var plugins []PluginMeta
@@ -415,6 +419,9 @@ func (p *Plugin) OnSubscribe(sub *Subscriber) {
p.handler.Pull(sub.StreamPath, conf)
}
}
for device := range p.Server.Devices.Range {
device.Handler.Pull()
}
//if !avoidTrans {
// for reg, conf := range plugin.GetCommonConf().OnSub.Transform {
// if plugin.Meta.Transformer != nil {

View File

@@ -11,7 +11,7 @@ import (
)
type RTMPPlugin struct {
pb.UnimplementedRtmpServer
pb.UnimplementedApiServer
m7s.Plugin
ChunkSize int `default:"1024"`
KeepAlive bool
@@ -19,7 +19,7 @@ type RTMPPlugin struct {
}
var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp:
listenaddr: :1935`), &pb.Rtmp_ServiceDesc, pb.RegisterRtmpHandler, NewPusher, NewPuller)
listenaddr: :1935`), &pb.Api_ServiceDesc, pb.RegisterApiHandler, NewPusher, NewPuller)
type RTMPServer struct {
NetConnection

View File

@@ -80,25 +80,25 @@ func (x *PushRequest) GetRemoteURL() string {
var File_rtmp_proto protoreflect.FileDescriptor
var file_rtmp_proto_rawDesc = []byte{
0x0a, 0x0a, 0x72, 0x74, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d, 0x37,
0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e,
0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a,
0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b, 0x0a,
0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x32, 0x6c, 0x0a, 0x04, 0x72, 0x74,
0x6d, 0x70, 0x12, 0x64, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x10, 0x2e,
0x6d, 0x37, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x14, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2b, 0x22, 0x1e, 0x2f,
0x72, 0x74, 0x6d, 0x70, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e, 0x6d, 0x37, 0x73, 0x2e,
0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67,
0x69, 0x6e, 0x2f, 0x72, 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x0a, 0x0a, 0x72, 0x74, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x72, 0x74,
0x6d, 0x70, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61,
0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4b,
0x0a, 0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a,
0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1c, 0x0a,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x32, 0x6f, 0x0a, 0x03, 0x61,
0x70, 0x69, 0x12, 0x68, 0x0a, 0x07, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x75, 0x74, 0x12, 0x11, 0x2e,
0x72, 0x74, 0x6d, 0x70, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73,
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x31, 0x82, 0xd3, 0xe4, 0x93, 0x02,
0x2b, 0x22, 0x1e, 0x2f, 0x72, 0x74, 0x6d, 0x70, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73,
0x68, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a,
0x7d, 0x3a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x42, 0x20, 0x5a, 0x1e,
0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f,
0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x72, 0x74, 0x6d, 0x70, 0x2f, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -115,12 +115,12 @@ func file_rtmp_proto_rawDescGZIP() []byte {
var file_rtmp_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_rtmp_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: m7s.PushRequest
(*pb.SuccessResponse)(nil), // 1: m7s.SuccessResponse
(*PushRequest)(nil), // 0: rtmp.PushRequest
(*pb.SuccessResponse)(nil), // 1: global.SuccessResponse
}
var file_rtmp_proto_depIdxs = []int32{
0, // 0: m7s.rtmp.PushOut:input_type -> m7s.PushRequest
1, // 1: m7s.rtmp.PushOut:output_type -> m7s.SuccessResponse
0, // 0: rtmp.api.PushOut:input_type -> rtmp.PushRequest
1, // 1: rtmp.api.PushOut:output_type -> global.SuccessResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name

View File

@@ -31,7 +31,7 @@ var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, client RtmpClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func request_Api_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushRequest
var metadata runtime.ServerMetadata
@@ -61,7 +61,7 @@ func request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, cl
}
func local_request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, server RtmpServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func local_request_Api_PushOut_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq PushRequest
var metadata runtime.ServerMetadata
@@ -91,13 +91,13 @@ func local_request_Rtmp_PushOut_0(ctx context.Context, marshaler runtime.Marshal
}
// RegisterRtmpHandlerServer registers the http handlers for service Rtmp to "mux".
// UnaryRPC :call RtmpServer directly.
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterRtmpHandlerFromEndpoint instead.
func RegisterRtmpHandlerServer(ctx context.Context, mux *runtime.ServeMux, server RtmpServer) error {
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterApiHandlerFromEndpoint instead.
func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ApiServer) error {
mux.Handle("POST", pattern_Rtmp_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
@@ -105,12 +105,12 @@ func RegisterRtmpHandlerServer(ctx context.Context, mux *runtime.ServeMux, serve
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Rtmp/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/rtmp.Api/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Rtmp_PushOut_0(annotatedContext, inboundMarshaler, server, req, pathParams)
resp, md, err := local_request_Api_PushOut_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
@@ -118,16 +118,16 @@ func RegisterRtmpHandlerServer(ctx context.Context, mux *runtime.ServeMux, serve
return
}
forward_Rtmp_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterRtmpHandlerFromEndpoint is same as RegisterRtmpHandler but
// RegisterApiHandlerFromEndpoint is same as RegisterApiHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterRtmpHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
func RegisterApiHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
@@ -147,41 +147,41 @@ func RegisterRtmpHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux,
}()
}()
return RegisterRtmpHandler(ctx, mux, conn)
return RegisterApiHandler(ctx, mux, conn)
}
// RegisterRtmpHandler registers the http handlers for service Rtmp to "mux".
// RegisterApiHandler registers the http handlers for service Api to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterRtmpHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterRtmpHandlerClient(ctx, mux, NewRtmpClient(conn))
func RegisterApiHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterApiHandlerClient(ctx, mux, NewApiClient(conn))
}
// RegisterRtmpHandlerClient registers the http handlers for service Rtmp
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "RtmpClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "RtmpClient"
// RegisterApiHandlerClient registers the http handlers for service Api
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ApiClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ApiClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "RtmpClient" to call the correct interceptors.
func RegisterRtmpHandlerClient(ctx context.Context, mux *runtime.ServeMux, client RtmpClient) error {
// "ApiClient" to call the correct interceptors.
func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ApiClient) error {
mux.Handle("POST", pattern_Rtmp_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("POST", pattern_Api_PushOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Rtmp/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/rtmp.Api/PushOut", runtime.WithHTTPPathPattern("/rtmp/api/push/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Rtmp_PushOut_0(annotatedContext, inboundMarshaler, client, req, pathParams)
resp, md, err := request_Api_PushOut_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Rtmp_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_PushOut_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
@@ -189,9 +189,9 @@ func RegisterRtmpHandlerClient(ctx context.Context, mux *runtime.ServeMux, clien
}
var (
pattern_Rtmp_PushOut_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"rtmp", "api", "push", "streamPath"}, ""))
pattern_Api_PushOut_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"rtmp", "api", "push", "streamPath"}, ""))
)
var (
forward_Rtmp_PushOut_0 = runtime.ForwardResponseMessage
forward_Api_PushOut_0 = runtime.ForwardResponseMessage
)

View File

@@ -2,11 +2,11 @@ syntax = "proto3";
import "google/api/annotations.proto";
//import "google/protobuf/empty.proto";
import "global.proto";
package m7s;
package rtmp;
option go_package="m7s.live/m7s/v5/plugin/rtmp/pb";
service rtmp {
rpc PushOut (PushRequest) returns (SuccessResponse) {
service api {
rpc PushOut (PushRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/rtmp/api/push/{streamPath=**}"
body: "remoteURL"

View File

@@ -19,86 +19,86 @@ import (
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// RtmpClient is the client API for Rtmp service.
// ApiClient is the client API for Api service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RtmpClient interface {
type ApiClient interface {
PushOut(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
}
type rtmpClient struct {
type apiClient struct {
cc grpc.ClientConnInterface
}
func NewRtmpClient(cc grpc.ClientConnInterface) RtmpClient {
return &rtmpClient{cc}
func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
return &apiClient{cc}
}
func (c *rtmpClient) PushOut(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
func (c *apiClient) PushOut(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/m7s.rtmp/PushOut", in, out, opts...)
err := c.cc.Invoke(ctx, "/rtmp.api/PushOut", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RtmpServer is the server API for Rtmp service.
// All implementations must embed UnimplementedRtmpServer
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility
type RtmpServer interface {
type ApiServer interface {
PushOut(context.Context, *PushRequest) (*pb.SuccessResponse, error)
mustEmbedUnimplementedRtmpServer()
mustEmbedUnimplementedApiServer()
}
// UnimplementedRtmpServer must be embedded to have forward compatible implementations.
type UnimplementedRtmpServer struct {
// UnimplementedApiServer must be embedded to have forward compatible implementations.
type UnimplementedApiServer struct {
}
func (UnimplementedRtmpServer) PushOut(context.Context, *PushRequest) (*pb.SuccessResponse, error) {
func (UnimplementedApiServer) PushOut(context.Context, *PushRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PushOut not implemented")
}
func (UnimplementedRtmpServer) mustEmbedUnimplementedRtmpServer() {}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
// UnsafeRtmpServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to RtmpServer will
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
// result in compilation errors.
type UnsafeRtmpServer interface {
mustEmbedUnimplementedRtmpServer()
type UnsafeApiServer interface {
mustEmbedUnimplementedApiServer()
}
func RegisterRtmpServer(s grpc.ServiceRegistrar, srv RtmpServer) {
s.RegisterService(&Rtmp_ServiceDesc, srv)
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
s.RegisterService(&Api_ServiceDesc, srv)
}
func _Rtmp_PushOut_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _Api_PushOut_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PushRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RtmpServer).PushOut(ctx, in)
return srv.(ApiServer).PushOut(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.rtmp/PushOut",
FullMethod: "/rtmp.api/PushOut",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RtmpServer).PushOut(ctx, req.(*PushRequest))
return srv.(ApiServer).PushOut(ctx, req.(*PushRequest))
}
return interceptor(ctx, in, info, handler)
}
// Rtmp_ServiceDesc is the grpc.ServiceDesc for Rtmp service.
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Rtmp_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.rtmp",
HandlerType: (*RtmpServer)(nil),
var Api_ServiceDesc = grpc.ServiceDesc{
ServiceName: "rtmp.api",
HandlerType: (*ApiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "PushOut",
Handler: _Rtmp_PushOut_Handler,
Handler: _Api_PushOut_Handler,
},
},
Streams: []grpc.StreamDesc{},

49
plugin/rtsp/device.go Normal file
View File

@@ -0,0 +1,49 @@
package plugin_rtsp
import (
"time"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
. "m7s.live/m7s/v5/plugin/rtsp/pkg"
)
type RTSPDevice struct {
task.TickTask
conn Stream
device *m7s.Device
plugin *RTSPPlugin
}
func (d *RTSPDevice) Start() (err error) {
d.conn.NetConnection = new(NetConnection)
err = d.conn.Connect(d.device.PullURL)
if err != nil {
return
}
d.device.Status = m7s.DeviceStatusOnline
d.device.Update()
return d.TickTask.Start()
}
func (d *RTSPDevice) GetTickInterval() time.Duration {
return time.Second * 5
}
func (d *RTSPDevice) Pull() {
d.plugin.Pull(d.device.GetStreamPath(), config.Pull{URL: d.device.PullURL})
}
func (d *RTSPDevice) Tick(any) {
err := d.conn.Options()
if err != nil {
d.Stop(err)
}
}
func (d *RTSPDevice) Dispose() {
d.device.Status = m7s.DeviceStatusOffline
d.device.Update()
d.TickTask.Dispose()
}

View File

@@ -1,16 +1,11 @@
package plugin_rtsp
import (
"errors"
"fmt"
"m7s.live/m7s/v5/pkg/task"
"net"
"net/http"
"strconv"
"strings"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
. "m7s.live/m7s/v5/plugin/rtsp/pkg"
)
@@ -23,194 +18,18 @@ type RTSPPlugin struct {
m7s.Plugin
}
type RTSPServer struct {
*NetConnection
conf *RTSPPlugin
}
func (p *RTSPPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
ret := &RTSPServer{NetConnection: NewNetConnection(conn), conf: p}
ret.Logger = p.With("remote", conn.RemoteAddr().String())
return ret
}
func (task *RTSPServer) Go() (err error) {
var receiver *Receiver
var sender *Sender
var req *util.Request
var sendMode bool
for {
req, err = task.ReadRequest()
if err != nil {
return
}
if task.URL == nil {
task.URL = req.URL
task.Logger = task.With("url", task.URL.String())
task.UserAgent = req.Header.Get("User-Agent")
task.Info("connect", "userAgent", task.UserAgent)
}
//if !c.auth.Validate(req) {
// res := &tcp.Response{
// Status: "401 Unauthorized",
// Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}},
// Request: req,
// }
// if err = c.WriteResponse(res); err != nil {
// return err
// }
// continue
//}
// Receiver: OPTIONS > DESCRIBE > SETUP... > PLAY > TEARDOWN
// Sender: OPTIONS > ANNOUNCE > SETUP... > RECORD > TEARDOWN
switch req.Method {
case MethodOptions:
res := &util.Response{
Header: map[string][]string{
"Public": {"OPTIONS, SETUP, TEARDOWN, DESCRIBE, PLAY, PAUSE, ANNOUNCE, RECORD"},
},
Request: req,
}
if err = task.WriteResponse(res); err != nil {
return
}
case MethodAnnounce:
if req.Header.Get("Content-Type") != "application/sdp" {
err = errors.New("wrong content type")
return
}
task.SDP = string(req.Body) // for info
var medias []*Media
if medias, err = UnmarshalSDP(req.Body); err != nil {
return
}
receiver = &Receiver{}
receiver.NetConnection = task.NetConnection
if receiver.Publisher, err = task.conf.Publish(task, strings.TrimPrefix(task.URL.Path, "/")); err != nil {
receiver = nil
err = task.WriteResponse(&util.Response{
StatusCode: 500, Status: err.Error(),
})
return
}
if err = receiver.SetMedia(medias); err != nil {
return
}
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
receiver.Publisher.OnDispose(func() {
task.Stop(receiver.Publisher.StopReason())
})
case MethodDescribe:
sendMode = true
sender = &Sender{}
sender.NetConnection = task.NetConnection
sender.Subscriber, err = task.conf.Subscribe(task, strings.TrimPrefix(task.URL.Path, "/"))
if err != nil {
res := &util.Response{
StatusCode: http.StatusBadRequest,
Status: err.Error(),
Request: req,
}
_ = task.WriteResponse(res)
return
}
res := &util.Response{
Header: map[string][]string{
"Content-Type": {"application/sdp"},
},
Request: req,
}
// convert tracks to real output medias
var medias []*Media
if medias, err = sender.GetMedia(); err != nil {
return
}
if res.Body, err = MarshalSDP(task.SessionName, medias); err != nil {
return
}
task.SDP = string(res.Body) // for info
if err = task.WriteResponse(res); err != nil {
return
}
case MethodSetup:
tr := req.Header.Get("Transport")
res := &util.Response{
Header: map[string][]string{},
Request: req,
}
const transport = "RTP/AVP/TCP;unicast;interleaved="
if strings.HasPrefix(tr, transport) {
task.Session = util.RandomString(10)
if sendMode {
if i := reqTrackID(req); i >= 0 {
tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1)
res.Header.Set("Transport", tr)
} else {
res.Status = "400 Bad Request"
}
} else {
res.Header.Set("Transport", tr[:len(transport)+3])
}
} else {
res.Status = "461 Unsupported transport"
}
if err = task.WriteResponse(res); err != nil {
return
}
case MethodRecord:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
err = receiver.Receive()
return
case MethodPlay:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
err = sender.Send()
return
case MethodTeardown:
res := &util.Response{Request: req}
_ = task.WriteResponse(res)
return
default:
task.Warn("unsupported method", "method", req.Method)
}
func (p *RTSPPlugin) OnDeviceAdd(device *m7s.Device) task.ITask {
if device.Type != m7s.DeviceTypeRTSP {
return nil
}
}
func reqTrackID(req *util.Request) int {
var s string
if req.URL.RawQuery != "" {
s = req.URL.RawQuery
} else {
s = req.URL.Path
}
if i := strings.LastIndexByte(s, '='); i > 0 {
if i, err := strconv.Atoi(s[i+1:]); err == nil {
return i
}
}
return -1
ret := &RTSPDevice{device: device, plugin: p}
ret.Logger = p.With("device", device.Name)
device.Handler = ret
return ret
}

View File

@@ -1,11 +1,7 @@
package rtsp
import (
"crypto/tls"
"m7s.live/m7s/v5/pkg/config"
"net"
"net/url"
"strings"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/util"
@@ -24,43 +20,11 @@ type Client struct {
}
func (c *Client) Start() (err error) {
var rtspURL *url.URL
if c.direction == DIRECTION_PULL {
rtspURL, err = url.Parse(c.pullCtx.RemoteURL)
err = c.NetConnection.Connect(c.pullCtx.RemoteURL)
} else {
rtspURL, err = url.Parse(c.pushCtx.RemoteURL)
err = c.NetConnection.Connect(c.pushCtx.RemoteURL)
}
if err != nil {
return
}
//ps := strings.Split(u.Path, "/")
//if len(ps) < 3 {
// return errors.New("illegal rtsp url")
//}
istls := rtspURL.Scheme == "rtsps"
if strings.Count(rtspURL.Host, ":") == 0 {
if istls {
rtspURL.Host += ":443"
} else {
rtspURL.Host += ":554"
}
}
var conn net.Conn
if istls {
var tlsconn *tls.Conn
tlsconn, err = tls.Dial("tcp", rtspURL.Host, &tls.Config{})
conn = tlsconn
} else {
conn, err = net.Dial("tcp", rtspURL.Host)
}
if err != nil {
return
}
c.conn = conn
c.URL = rtspURL
c.UserAgent = "monibuca" + m7s.Version
c.auth = util.NewAuth(c.URL.User)
c.Backchannel = true
return
}

View File

@@ -1,11 +1,13 @@
package rtsp
import (
"crypto/tls"
"encoding/binary"
"net"
"net/url"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@@ -108,6 +110,38 @@ const (
StatePlay
)
func (c *NetConnection) Connect(remoteURL string) (err error) {
rtspURL, err := url.Parse(remoteURL)
if err != nil {
return
}
istls := rtspURL.Scheme == "rtsps"
if strings.Count(rtspURL.Host, ":") == 0 {
if istls {
rtspURL.Host += ":443"
} else {
rtspURL.Host += ":554"
}
}
var conn net.Conn
if istls {
var tlsconn *tls.Conn
tlsconn, err = tls.Dial("tcp", rtspURL.Host, &tls.Config{})
conn = tlsconn
} else {
conn, err = net.Dial("tcp", rtspURL.Host)
}
if err != nil {
return
}
c.conn = conn
c.URL = rtspURL
c.UserAgent = "monibuca" + m7s.Version
c.auth = util.NewAuth(c.URL.User)
c.Backchannel = true
return
}
func (c *NetConnection) WriteRequest(req *util.Request) (err error) {
if req.Proto == "" {
req.Proto = ProtoRTSP

198
plugin/rtsp/server.go Normal file
View File

@@ -0,0 +1,198 @@
package plugin_rtsp
import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"m7s.live/m7s/v5/pkg/util"
. "m7s.live/m7s/v5/plugin/rtsp/pkg"
)
type RTSPServer struct {
*NetConnection
conf *RTSPPlugin
}
func (task *RTSPServer) Go() (err error) {
var receiver *Receiver
var sender *Sender
var req *util.Request
var sendMode bool
for {
req, err = task.ReadRequest()
if err != nil {
return
}
if task.URL == nil {
task.URL = req.URL
task.Logger = task.With("url", task.URL.String())
task.UserAgent = req.Header.Get("User-Agent")
task.Info("connect", "userAgent", task.UserAgent)
}
//if !c.auth.Validate(req) {
// res := &tcp.Response{
// Status: "401 Unauthorized",
// Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}},
// Request: req,
// }
// if err = c.WriteResponse(res); err != nil {
// return err
// }
// continue
//}
// Receiver: OPTIONS > DESCRIBE > SETUP... > PLAY > TEARDOWN
// Sender: OPTIONS > ANNOUNCE > SETUP... > RECORD > TEARDOWN
switch req.Method {
case MethodOptions:
res := &util.Response{
Header: map[string][]string{
"Public": {"OPTIONS, SETUP, TEARDOWN, DESCRIBE, PLAY, PAUSE, ANNOUNCE, RECORD"},
},
Request: req,
}
if err = task.WriteResponse(res); err != nil {
return
}
case MethodAnnounce:
if req.Header.Get("Content-Type") != "application/sdp" {
err = errors.New("wrong content type")
return
}
task.SDP = string(req.Body) // for info
var medias []*Media
if medias, err = UnmarshalSDP(req.Body); err != nil {
return
}
receiver = &Receiver{}
receiver.NetConnection = task.NetConnection
if receiver.Publisher, err = task.conf.Publish(task, strings.TrimPrefix(task.URL.Path, "/")); err != nil {
receiver = nil
err = task.WriteResponse(&util.Response{
StatusCode: 500, Status: err.Error(),
})
return
}
if err = receiver.SetMedia(medias); err != nil {
return
}
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
receiver.Publisher.OnDispose(func() {
task.Stop(receiver.Publisher.StopReason())
})
case MethodDescribe:
sendMode = true
sender = &Sender{}
sender.NetConnection = task.NetConnection
sender.Subscriber, err = task.conf.Subscribe(task, strings.TrimPrefix(task.URL.Path, "/"))
if err != nil {
res := &util.Response{
StatusCode: http.StatusBadRequest,
Status: err.Error(),
Request: req,
}
_ = task.WriteResponse(res)
return
}
res := &util.Response{
Header: map[string][]string{
"Content-Type": {"application/sdp"},
},
Request: req,
}
// convert tracks to real output medias
var medias []*Media
if medias, err = sender.GetMedia(); err != nil {
return
}
if res.Body, err = MarshalSDP(task.SessionName, medias); err != nil {
return
}
task.SDP = string(res.Body) // for info
if err = task.WriteResponse(res); err != nil {
return
}
case MethodSetup:
tr := req.Header.Get("Transport")
res := &util.Response{
Header: map[string][]string{},
Request: req,
}
const transport = "RTP/AVP/TCP;unicast;interleaved="
if strings.HasPrefix(tr, transport) {
task.Session = util.RandomString(10)
if sendMode {
if i := reqTrackID(req); i >= 0 {
tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1)
res.Header.Set("Transport", tr)
} else {
res.Status = "400 Bad Request"
}
} else {
res.Header.Set("Transport", tr[:len(transport)+3])
}
} else {
res.Status = "461 Unsupported transport"
}
if err = task.WriteResponse(res); err != nil {
return
}
case MethodRecord:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
err = receiver.Receive()
return
case MethodPlay:
res := &util.Response{Request: req}
if err = task.WriteResponse(res); err != nil {
return
}
err = sender.Send()
return
case MethodTeardown:
res := &util.Response{Request: req}
_ = task.WriteResponse(res)
return
default:
task.Warn("unsupported method", "method", req.Method)
}
}
}
func reqTrackID(req *util.Request) int {
var s string
if req.URL.RawQuery != "" {
s = req.URL.RawQuery
} else {
s = req.URL.Path
}
if i := strings.LastIndexByte(s, '='); i > 0 {
if i, err := strconv.Atoi(s[i+1:]); err == nil {
return i
}
}
return -1
}

View File

@@ -130,6 +130,7 @@ type Publisher struct {
Subscribers SubscriberCollection
GOP int
OnSeek func(time.Duration)
Device *Device
dumpFile *os.File
}
@@ -163,6 +164,15 @@ func (p *Publisher) Start() (err error) {
}
s.Streams.Set(p)
p.Info("publish")
if device, ok := s.Devices.Find(func(device *Device) bool {
return device.GetStreamPath() == p.StreamPath
}); ok {
p.Device = device
if device.Status == DeviceStatusOnline {
device.Status = DeviceStatusPulling
device.Update()
}
}
p.audioReady = util.NewPromiseWithTimeout(p, time.Second*5)
p.videoReady = util.NewPromiseWithTimeout(p, time.Second*5)
if p.Dump {
@@ -204,6 +214,10 @@ func (p *PublishTimeout) Start() error {
func (p *PublishTimeout) Dispose() {
p.Publisher.TimeoutTimer.Stop()
if p.Publisher.Device != nil && p.Publisher.Device.Status == DeviceStatusPulling && p.Publisher.Plugin.Server.Devices.Has(p.Publisher.Device.GetTaskID()) {
p.Publisher.Device.Status = DeviceStatusOnline
p.Publisher.Device.Update()
}
}
func (p *PublishTimeout) Tick(any) {

View File

@@ -61,7 +61,7 @@ type (
baseTsAudio, baseTsVideo time.Duration
}
Server struct {
pb.UnimplementedGlobalServer
pb.UnimplementedApiServer
Plugin
ServerConfig
Plugins util.Collection[string, *Plugin]
@@ -216,14 +216,14 @@ func (s *Server) Start() (err error) {
if tcpConf.ListenAddr != "" {
var opts []grpc.ServerOption
s.grpcServer = grpc.NewServer(opts...)
pb.RegisterGlobalServer(s.grpcServer, s)
pb.RegisterApiServer(s.grpcServer, s)
s.grpcClientConn, err = grpc.DialContext(s.Context, tcpConf.ListenAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
s.Error("failed to dial", "error", err)
return
}
if err = pb.RegisterGlobalHandler(s.Context, mux, s.grpcClientConn); err != nil {
if err = pb.RegisterApiHandler(s.Context, mux, s.grpcClientConn); err != nil {
s.Error("register handler faild", "error", err)
return
}
@@ -238,6 +238,7 @@ func (s *Server) Start() (err error) {
s.AddTaskLazy(&s.Pulls)
s.AddTaskLazy(&s.Pushs)
s.AddTaskLazy(&s.Transforms)
s.AddTaskLazy(&s.Devices)
for _, plugin := range plugins {
plugin.Init(s, cg[strings.ToLower(plugin.Name)])
}
@@ -256,6 +257,15 @@ func (s *Server) Start() (err error) {
}
}
}
if s.DB != nil {
s.DB.AutoMigrate(&Device{})
var devices []*Device
s.DB.Find(&devices)
for _, device := range devices {
device.server = s
s.Devices.Add(device)
}
}
return nil
})
return