feat: add config api

This commit is contained in:
langhuihui
2024-05-11 15:11:46 +08:00
parent 0b0fd72504
commit de76bde317
14 changed files with 1389 additions and 261 deletions

99
api.go
View File

@@ -2,6 +2,7 @@ package m7s
import (
"context"
"encoding/json"
"errors"
"net"
"net/http"
@@ -14,8 +15,10 @@ import (
. "github.com/shirou/gopsutil/v3/net"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v3"
"m7s.live/m7s/v5/pb"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
)
@@ -74,13 +77,29 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest
Success: err == nil,
}, err
}
// /api/stream/list
func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
s.Call(func() {
var streams []*pb.StreamSummay
for _, publisher := range s.Streams.Items {
var tracks []string
var bps int32
if !publisher.VideoTrack.IsEmpty() {
bps += int32(publisher.VideoTrack.AVTrack.BPS)
tracks = append(tracks, publisher.VideoTrack.FourCC().String())
}
if !publisher.AudioTrack.IsEmpty() {
bps += int32(publisher.AudioTrack.AVTrack.BPS)
tracks = append(tracks, publisher.AudioTrack.FourCC().String())
}
streams = append(streams, &pb.StreamSummay{
Path: publisher.StreamPath,
State: int32(publisher.State),
StartTime: timestamppb.New(publisher.StartTime),
Subscribers: int32(len(publisher.Subscribers)),
Tracks: tracks,
Bps: bps,
Type: publisher.Plugin.Meta.Name,
})
}
res = &pb.StreamListResponse{List: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize}
@@ -142,3 +161,81 @@ func (s *Server) Summary(context.Context, *emptypb.Empty) (res *pb.SummaryRespon
})
return
}
// /api/config/json/{name}
func (s *Server) api_Config_JSON_(rw http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
var conf *config.Config
if name == "global" {
conf = &s.Config
} else {
p, ok := s.Plugins.Get(name)
if !ok {
http.Error(rw, pkg.ErrNotFound.Error(), http.StatusNotFound)
return
}
conf = &p.Config
}
rw.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(rw).Encode(conf.GetMap())
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
}
func (s *Server) GetConfig(_ context.Context, req *pb.GetConfigRequest) (res *pb.GetConfigResponse, err error) {
res = &pb.GetConfigResponse{}
var conf *config.Config
if req.Name == "global" {
conf = &s.Config
} else {
p, ok := s.Plugins.Get(req.Name)
if !ok {
err = pkg.ErrNotFound
return
}
conf = &p.Config
}
var mm []byte
mm, err = yaml.Marshal(conf.File)
if err != nil {
return
}
res.File = string(mm)
mm, err = yaml.Marshal(conf.Modify)
if err != nil {
return
}
res.Modified = string(mm)
mm, err = yaml.Marshal(conf.GetMap())
if err != nil {
return
}
res.Merged = string(mm)
return
}
func (s *Server) ModifyConfig(_ context.Context, req *pb.ModifyConfigRequest) (res *pb.ModifyConfigResponse, err error) {
var conf *config.Config
if req.Name == "global" {
conf = &s.Config
defer s.SaveConfig()
} else {
p, ok := s.Plugins.Get(req.Name)
if !ok {
err = pkg.ErrNotFound
return
}
defer p.SaveConfig()
conf = &p.Config
}
var modified map[string]any
err = yaml.Unmarshal([]byte(req.Yaml), &modified)
if err != nil {
return
}
conf.ParseModifyFile(modified)
return
}

BIN
architecture.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 625 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -320,6 +320,170 @@ func local_request_Global_StopSubscribe_0(ctx context.Context, marshaler runtime
}
func request_Global_GetConfig_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetConfigRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := client.GetConfig(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_GetConfig_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetConfigRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := server.GetConfig(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_GetFormily_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetConfigRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := client.GetFormily(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_GetFormily_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq GetConfigRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := server.GetFormily(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_ModifyConfig_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ModifyConfigRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.Yaml); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := client.ModifyConfig(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_ModifyConfig_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ModifyConfigRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.Yaml); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["name"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name")
}
protoReq.Name, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err)
}
msg, err := server.ModifyConfig(ctx, &protoReq)
return msg, metadata, err
}
// RegisterGlobalHandlerServer registers the http handlers for service Global to "mux".
// UnaryRPC :call GlobalServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@@ -501,6 +665,81 @@ func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser
})
mux.Handle("GET", pattern_Global_GetConfig_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/GetConfig", runtime.WithHTTPPathPattern("/api/config/get/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_GetConfig_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetConfig_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_GetFormily_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/GetFormily", runtime.WithHTTPPathPattern("/api/config/formily/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_GetFormily_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetFormily_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Global_ModifyConfig_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/ModifyConfig", runtime.WithHTTPPathPattern("/api/config/modify/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_ModifyConfig_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_ModifyConfig_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@@ -696,6 +935,72 @@ func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli
})
mux.Handle("GET", pattern_Global_GetConfig_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/GetConfig", runtime.WithHTTPPathPattern("/api/config/get/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_GetConfig_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetConfig_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_GetFormily_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/GetFormily", runtime.WithHTTPPathPattern("/api/config/formily/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_GetFormily_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_GetFormily_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Global_ModifyConfig_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/ModifyConfig", runtime.WithHTTPPathPattern("/api/config/modify/{name}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_ModifyConfig_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_ModifyConfig_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
@@ -713,6 +1018,12 @@ var (
pattern_Global_StreamSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "snap", "streamPath"}, ""))
pattern_Global_StopSubscribe_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "stop", "subscribe", "id"}, ""))
pattern_Global_GetConfig_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "config", "get", "name"}, ""))
pattern_Global_GetFormily_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "config", "formily", "name"}, ""))
pattern_Global_ModifyConfig_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "config", "modify", "name"}, ""))
)
var (
@@ -729,4 +1040,10 @@ var (
forward_Global_StreamSnap_0 = runtime.ForwardResponseMessage
forward_Global_StopSubscribe_0 = runtime.ForwardResponseMessage
forward_Global_GetConfig_0 = runtime.ForwardResponseMessage
forward_Global_GetFormily_0 = runtime.ForwardResponseMessage
forward_Global_ModifyConfig_0 = runtime.ForwardResponseMessage
)

View File

@@ -2,6 +2,7 @@ syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
package m7s;
option go_package="m7s.live/m7s/v5/pb";
@@ -42,6 +43,53 @@ service Global {
body: "*"
};
}
rpc GetConfig (GetConfigRequest) returns (GetConfigResponse) {
option (google.api.http) = {
get: "/api/config/get/{name}"
};
}
rpc GetFormily (GetConfigRequest) returns (GetConfigResponse) {
option (google.api.http) = {
get: "/api/config/formily/{name}"
};
}
rpc ModifyConfig (ModifyConfigRequest) returns (ModifyConfigResponse) {
option (google.api.http) = {
post: "/api/config/modify/{name}"
body: "yaml"
};
}
}
message GetConfigRequest {
string name = 1;
}
message Formily {
string type = 1;
map<string, Formily> properties = 2;
string component = 3;
map<string, google.protobuf.Any> componentProps = 4;
}
message FormilyResponse {
string type = 1;
map<string, Formily> properties = 2;
}
message GetConfigResponse {
string file = 1;
string modified = 2;
string merged = 3;
}
message ModifyConfigRequest {
string name = 1;
string yaml = 2;
}
message ModifyConfigResponse {
bool success = 1;
}
message NetWorkInfo {

View File

@@ -30,6 +30,9 @@ type GlobalClient interface {
StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error)
StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error)
StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error)
GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error)
ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*ModifyConfigResponse, error)
}
type globalClient struct {
@@ -103,6 +106,33 @@ func (c *globalClient) StopSubscribe(ctx context.Context, in *StopSubscribeReque
return out, nil
}
func (c *globalClient) GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
out := new(GetConfigResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetConfig", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) {
out := new(GetConfigResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/GetFormily", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) ModifyConfig(ctx context.Context, in *ModifyConfigRequest, opts ...grpc.CallOption) (*ModifyConfigResponse, error) {
out := new(ModifyConfigResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/ModifyConfig", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GlobalServer is the server API for Global service.
// All implementations must embed UnimplementedGlobalServer
// for forward compatibility
@@ -114,6 +144,9 @@ type GlobalServer interface {
StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error)
StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error)
StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error)
GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error)
ModifyConfig(context.Context, *ModifyConfigRequest) (*ModifyConfigResponse, error)
mustEmbedUnimplementedGlobalServer()
}
@@ -142,6 +175,15 @@ func (UnimplementedGlobalServer) StreamSnap(context.Context, *StreamSnapRequest)
func (UnimplementedGlobalServer) StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopSubscribe not implemented")
}
func (UnimplementedGlobalServer) GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetConfig not implemented")
}
func (UnimplementedGlobalServer) GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFormily not implemented")
}
func (UnimplementedGlobalServer) ModifyConfig(context.Context, *ModifyConfigRequest) (*ModifyConfigResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ModifyConfig not implemented")
}
func (UnimplementedGlobalServer) mustEmbedUnimplementedGlobalServer() {}
// UnsafeGlobalServer may be embedded to opt out of forward compatibility for this service.
@@ -281,6 +323,60 @@ func _Global_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _Global_GetConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetConfig",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetConfig(ctx, req.(*GetConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_GetFormily_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).GetFormily(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/GetFormily",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).GetFormily(ctx, req.(*GetConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_ModifyConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ModifyConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).ModifyConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/ModifyConfig",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).ModifyConfig(ctx, req.(*ModifyConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
// Global_ServiceDesc is the grpc.ServiceDesc for Global service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -316,6 +412,18 @@ var Global_ServiceDesc = grpc.ServiceDesc{
MethodName: "StopSubscribe",
Handler: _Global_StopSubscribe_Handler,
},
{
MethodName: "GetConfig",
Handler: _Global_GetConfig_Handler,
},
{
MethodName: "GetFormily",
Handler: _Global_GetFormily_Handler,
},
{
MethodName: "ModifyConfig",
Handler: _Global_ModifyConfig_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "global.proto",

View File

@@ -50,21 +50,19 @@ type (
DataFrame
IDR bool
Timestamp time.Duration // 绝对时间戳
Wraps []IAVFrame // 封装格式
Wraps []IAVFrame // 封装格式
}
AVRing = util.Ring[AVFrame]
DataFrame struct {
sync.RWMutex `json:"-" yaml:"-"` // 读写锁
discard bool
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Raw any `json:"-" yaml:"-"` // 裸格式
}
)
func (frame *AVFrame) Reset() {
frame.BytesIn = 0
frame.Timestamp = 0
for _, wrap := range frame.Wraps {
wrap.Recycle()

View File

@@ -6,15 +6,19 @@ import (
"reflect"
"slices"
"sync/atomic"
"time"
"m7s.live/m7s/v5/pkg/util"
)
type (
Track struct {
*slog.Logger `json:"-" yaml:"-"`
Ready *util.Promise[struct{}]
FrameType reflect.Type
*slog.Logger
Ready *util.Promise[struct{}]
FrameType reflect.Type
bytesIn int
lastBPSTime time.Time
BPS int
}
DataTrack struct {
@@ -62,6 +66,15 @@ func (t *Track) GetKey() reflect.Type {
return t.FrameType
}
func (t *Track) AddBytesIn(n int) {
t.bytesIn += n
if dur := time.Since(t.lastBPSTime); dur > time.Second {
t.BPS = int(float64(t.bytesIn) / dur.Seconds())
t.bytesIn = 0
t.lastBPSTime = time.Now()
}
}
func (t *Track) Trace(msg string, fields ...any) {
t.Log(context.TODO(), TraceLevel, msg, fields...)
}

View File

@@ -93,7 +93,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
}
}
}
s.Plugins = append(s.Plugins, p)
s.Plugins.Add(p)
p.Start()
}
@@ -156,6 +156,10 @@ func (Plugin) nothing() {
}
func (p *Plugin) GetKey() string {
return p.Meta.Name
}
func (p *Plugin) GetGlobalCommonConf() *config.Common {
return p.server.GetCommonConf()
}
@@ -179,7 +183,13 @@ func (p *Plugin) assign() {
}
p.Config.ParseModifyFile(modifyConfig)
}
p.registerHandler()
var handlerMap map[string]http.HandlerFunc
if v, ok := p.handler.(interface {
RegisterHandler() map[string]http.HandlerFunc
}); ok {
handlerMap = v.RegisterHandler()
}
p.registerHandler(handlerMap)
}
func (p *Plugin) Stop(err error) {
@@ -293,7 +303,7 @@ func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subsc
return
}
func (p *Plugin) registerHandler() {
func (p *Plugin) registerHandler(handlers map[string]http.HandlerFunc) {
t := reflect.TypeOf(p.handler)
v := reflect.ValueOf(p.handler)
// 注册http响应
@@ -308,6 +318,9 @@ func (p *Plugin) registerHandler() {
p.handle(patten, http.HandlerFunc(handler))
}
}
for patten, handler := range handlers {
p.handle(patten, handler)
}
if rootHandler, ok := p.handler.(http.Handler); ok {
p.handle("/", rootHandler)
}
@@ -365,3 +378,22 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
func (p *Plugin) PostToServer(event any) {
p.server.PostMessage(event)
}
func (p *Plugin) SaveConfig() (err error) {
_, err = p.server.Call(func() error {
if p.Modify == nil {
os.Remove(p.settingPath())
return nil
}
file, err := os.OpenFile(p.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err == nil {
defer file.Close()
err = yaml.NewEncoder(file).Encode(p.Modify)
}
if err == nil {
p.Info("config saved")
}
return err
})
return
}

View File

@@ -100,6 +100,10 @@ func (cfg *ConsolePlugin) OnInit() error {
go cfg.ReceiveRequest(s, conn)
}
}
if !cfg.IsStopped() {
<-time.After(time.Second)
cfg.OnInit()
}
}()
return err
}

View File

@@ -235,9 +235,9 @@ func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
if len(frame.Packets) == 0 || packet.Timestamp == frame.Packets[0].Timestamp {
frame.Packets = append(frame.Packets, &packet)
} else {
t := time.Now()
// t := time.Now()
publisher.WriteVideo(frame)
fmt.Println("write video", time.Since(t))
// fmt.Println("write video", time.Since(t))
frame = &mrtp.RTPVideo{}
frame.Packets = []*rtp.Packet{&packet}
frame.RTPCodecParameters = &codecP

View File

@@ -151,11 +151,13 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
p.baseTs -= ts
}
frame.Timestamp = max(1, p.baseTs+ts)
bytesIn := frame.Wraps[0].GetSize()
t.AddBytesIn(bytesIn)
p.lastTs = frame.Timestamp
if p.Enabled(p, TraceLevel) {
codec := t.FourCC().String()
size, data := frame.Wraps[0].GetSize(), frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", codec, "size", size, "data", data)
data := frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", codec, "size", bytesIn, "data", data)
}
t.Step()
p.speedControl(p.Publish.Speed, p.lastTs)

4
scripts/protoc.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. \
--go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative ${1}.proto

View File

@@ -46,7 +46,7 @@ type Server struct {
config.Engine
ID int
eventChan chan any
Plugins []*Plugin
Plugins util.Collection[string, *Plugin]
Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller]
Pushs util.Collection[string, *Pusher]
@@ -152,7 +152,9 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
lv.Set(TraceLevel)
}
s.LogHandler.SetLevel(lv.Level())
s.registerHandler()
s.registerHandler(map[string]http.HandlerFunc{
"/api/config/json/{name}": s.api_Config_JSON_,
})
if httpConf.ListenAddrTLS != "" {
s.Info("https listen at ", "addr", httpConf.ListenAddrTLS)
@@ -215,7 +217,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
for _, subscriber := range s.Subscribers.Items {
subscriber.Stop(err)
}
for _, p := range s.Plugins {
for _, p := range s.Plugins.Items {
p.Stop(err)
}
httpConf.StopListen()
@@ -323,7 +325,7 @@ func (s *Server) eventLoop() {
case slog.Handler:
s.LogHandler.Add(v)
}
for _, plugin := range s.Plugins {
for _, plugin := range s.Plugins.Items {
if plugin.Disabled {
continue
}
@@ -434,7 +436,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
fmt.Fprintf(w, "Monibuca Engine %s StartTime:%s\n", Version, s.StartTime)
for _, plugin := range s.Plugins {
for _, plugin := range s.Plugins.Items {
fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version)
}
for _, api := range s.apiList {