feat: add logrotate

This commit is contained in:
langhuihui
2024-05-09 10:36:09 +08:00
parent d268e786d7
commit 0b0fd72504
17 changed files with 1630 additions and 213 deletions

122
api.go
View File

@@ -2,19 +2,48 @@ package m7s
import (
"context"
"errors"
"net"
"net/http"
"strings"
"time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
. "github.com/shirou/gopsutil/v3/net"
"google.golang.org/protobuf/types/known/emptypb"
"m7s.live/m7s/v5/pkg"
"google.golang.org/protobuf/types/known/timestamppb"
"m7s.live/m7s/v5/pb"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/util"
)
func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) {
result, err := s.Call(req)
if err != nil {
return nil, err
var localIP string
func (s *Server) SysInfo(context.Context, *emptypb.Empty) (res *pb.SysInfoResponse, err error) {
if localIP == "" {
if conn, err := net.Dial("udp", "114.114.114.114:80"); err == nil {
localIP, _, _ = strings.Cut(conn.LocalAddr().String(), ":")
}
}
puber := result.(*Publisher)
return puber.SnapShot(), nil
res = &pb.SysInfoResponse{
Version: Version,
LocalIP: localIP,
StartTime: timestamppb.New(s.StartTime),
}
return
}
func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) {
s.Call(func() {
if pub, ok := s.Streams.Get(req.StreamPath); ok {
res = pub.SnapShot()
} else {
err = pkg.ErrNotFound
}
})
return
}
func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) {
@@ -34,15 +63,82 @@ func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *empt
}
func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest) (res *pb.StopSubscribeResponse, err error) {
_, err = s.Call(req)
s.Call(func() {
if subscriber, ok := s.Subscribers.Get(int(req.Id)); ok {
subscriber.Stop(errors.New("stop by api"))
} else {
err = pkg.ErrNotFound
}
})
return &pb.StopSubscribeResponse{
Success: err == nil,
}, err
}
func (s *Server) StreamList(ctx context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
var result any
result, err = s.Call(req)
return result.(*pb.StreamListResponse), err
func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
s.Call(func() {
var streams []*pb.StreamSummay
for _, publisher := range s.Streams.Items {
streams = append(streams, &pb.StreamSummay{
Path: publisher.StreamPath,
})
}
res = &pb.StreamListResponse{List: streams, Total: int32(s.Streams.Length), PageNum: req.PageNum, PageSize: req.PageSize}
})
return
}
func (s *Server) API_Summary_SSE(rw http.ResponseWriter, r *http.Request) {
util.ReturnFetchValue(func() *pb.SummaryResponse {
ret, _ := s.Summary(r.Context(), nil)
return ret
}, rw, r)
}
func (s *Server) Summary(context.Context, *emptypb.Empty) (res *pb.SummaryResponse, err error) {
s.Call(func() {
dur := time.Since(s.lastSummaryTime)
if dur < time.Second {
res = s.lastSummary
return
}
v, _ := mem.VirtualMemory()
d, _ := disk.Usage("/")
nv, _ := IOCounters(true)
res = &pb.SummaryResponse{
Memory: &pb.Usage{
Total: v.Total >> 20,
Free: v.Available >> 20,
Used: v.Used >> 20,
Usage: float32(v.UsedPercent),
},
HardDisk: &pb.Usage{
Total: d.Total >> 30,
Free: d.Free >> 30,
Used: d.Used >> 30,
Usage: float32(d.UsedPercent),
},
}
if cc, _ := cpu.Percent(time.Second, false); len(cc) > 0 {
res.CpuUsage = float32(cc[0])
}
netWorks := []*pb.NetWorkInfo{}
for i, n := range nv {
info := &pb.NetWorkInfo{
Name: n.Name,
Receive: n.BytesRecv,
Sent: n.BytesSent,
}
if s.lastSummary != nil && len(s.lastSummary.NetWork) > i {
info.ReceiveSpeed = (n.BytesRecv - s.lastSummary.NetWork[i].Receive) / uint64(dur.Seconds())
info.SentSpeed = (n.BytesSent - s.lastSummary.NetWork[i].Sent) / uint64(dur.Seconds())
}
netWorks = append(netWorks, info)
}
res.StreamCount = int32(s.Streams.Length)
res.NetWork = netWorks
s.lastSummary = res
s.lastSummaryTime = time.Now()
})
return
}

View File

@@ -1,9 +1,11 @@
global:
loglevel: trace
loglevel: debug
tcp:
listenaddr: :50051
console:
secret: de2c0bb9fd47684adc07a426e139239b
logrotate:
level: trace
level: info
webrtc:
publish:
pubaudio: false

View File

View File

@@ -9,6 +9,7 @@ import (
_ "m7s.live/m7s/v5/plugin/webrtc"
_ "m7s.live/m7s/v5/plugin/rtmp"
_ "m7s.live/m7s/v5/plugin/logrotate"
_ "m7s.live/m7s/v5/plugin/console"
)
func main() {

File diff suppressed because it is too large Load Diff

View File

@@ -21,6 +21,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)
// Suppress "imported and not used" errors
@@ -31,6 +32,42 @@ var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Global_SysInfo_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.SysInfo(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_SysInfo_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.SysInfo(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_Summary_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.Summary(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_Summary_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.Summary(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_Shutdown_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq RequestWithId
var metadata runtime.ServerMetadata
@@ -135,10 +172,21 @@ func local_request_Global_Restart_0(ctx context.Context, marshaler runtime.Marsh
}
var (
filter_Global_StreamList_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_Global_StreamList_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamListRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Global_StreamList_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.StreamList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
@@ -148,6 +196,13 @@ func local_request_Global_StreamList_0(ctx context.Context, marshaler runtime.Ma
var protoReq StreamListRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Global_StreamList_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.StreamList(ctx, &protoReq)
return msg, metadata, err
@@ -271,6 +326,56 @@ func local_request_Global_StopSubscribe_0(ctx context.Context, marshaler runtime
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterGlobalHandlerFromEndpoint instead.
func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, server GlobalServer) error {
mux.Handle("GET", pattern_Global_SysInfo_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/SysInfo", runtime.WithHTTPPathPattern("/api/sysinfo"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_SysInfo_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_SysInfo_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_Summary_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/Summary", runtime.WithHTTPPathPattern("/api/summary"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_Summary_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_Summary_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Global_Shutdown_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -437,6 +542,50 @@ func RegisterGlobalHandler(ctx context.Context, mux *runtime.ServeMux, conn *grp
// "GlobalClient" to call the correct interceptors.
func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, client GlobalClient) error {
mux.Handle("GET", pattern_Global_SysInfo_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/SysInfo", runtime.WithHTTPPathPattern("/api/sysinfo"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_SysInfo_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_SysInfo_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_Summary_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/Summary", runtime.WithHTTPPathPattern("/api/summary"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_Summary_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_Summary_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Global_Shutdown_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
@@ -551,6 +700,10 @@ func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli
}
var (
pattern_Global_SysInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"api", "sysinfo"}, ""))
pattern_Global_Summary_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"api", "summary"}, ""))
pattern_Global_Shutdown_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"api", "shutdown", "id"}, ""))
pattern_Global_Restart_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"api", "restart", "id"}, ""))
@@ -563,6 +716,10 @@ var (
)
var (
forward_Global_SysInfo_0 = runtime.ForwardResponseMessage
forward_Global_Summary_0 = runtime.ForwardResponseMessage
forward_Global_Shutdown_0 = runtime.ForwardResponseMessage
forward_Global_Restart_0 = runtime.ForwardResponseMessage

View File

@@ -1,10 +1,21 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
package m7s;
option go_package="m7s.live/m7s/v5/pb";
service Global {
rpc SysInfo (google.protobuf.Empty) returns (SysInfoResponse) {
option (google.api.http) = {
get: "/api/sysinfo"
};
}
rpc Summary (google.protobuf.Empty) returns (SummaryResponse) {
option (google.api.http) = {
get: "/api/summary"
};
}
rpc Shutdown (RequestWithId) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/api/shutdown/{id}"
@@ -32,14 +43,57 @@ service Global {
};
}
}
message StreamInfo {
string path = 1;
}
message StreamListRequest {
message NetWorkInfo {
string name = 1;
uint64 receive = 2;
uint64 sent = 3;
uint64 receiveSpeed = 4;
uint64 sentSpeed = 5;
}
message Usage {
uint64 total = 1;
uint64 free = 2;
uint64 used = 3;
float usage = 4;
}
message SummaryResponse {
string address = 1;
Usage memory = 2;
float cpuUsage = 3;
Usage hardDisk = 4;
repeated NetWorkInfo netWork = 5;
int32 streamCount = 6;
}
message StreamSummay {
string path = 1;
int32 state = 2;
int32 subscribers = 3;
repeated string tracks = 4;
google.protobuf.Timestamp startTime = 5;
string type = 6;
int32 bps = 7;
}
message SysInfoResponse {
google.protobuf.Timestamp startTime = 1;
string localIP = 2;
string version = 3;
}
message StreamListRequest {
int32 pageNum = 1;
int32 pageSize = 2;
}
message StreamListResponse {
repeated StreamInfo list = 1;
int32 total = 1;
int32 pageNum = 2;
int32 pageSize = 3;
repeated StreamSummay list = 4;
}
message StreamSnapRequest {

View File

@@ -23,6 +23,8 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GlobalClient interface {
SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error)
Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error)
Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error)
@@ -38,6 +40,24 @@ func NewGlobalClient(cc grpc.ClientConnInterface) GlobalClient {
return &globalClient{cc}
}
func (c *globalClient) SysInfo(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SysInfoResponse, error) {
out := new(SysInfoResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/SysInfo", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) Summary(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SummaryResponse, error) {
out := new(SummaryResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/Summary", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/m7s.Global/Shutdown", in, out, opts...)
@@ -87,6 +107,8 @@ func (c *globalClient) StopSubscribe(ctx context.Context, in *StopSubscribeReque
// All implementations must embed UnimplementedGlobalServer
// for forward compatibility
type GlobalServer interface {
SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error)
Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error)
Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error)
Restart(context.Context, *RequestWithId) (*emptypb.Empty, error)
StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error)
@@ -99,6 +121,12 @@ type GlobalServer interface {
type UnimplementedGlobalServer struct {
}
func (UnimplementedGlobalServer) SysInfo(context.Context, *emptypb.Empty) (*SysInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SysInfo not implemented")
}
func (UnimplementedGlobalServer) Summary(context.Context, *emptypb.Empty) (*SummaryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Summary not implemented")
}
func (UnimplementedGlobalServer) Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Shutdown not implemented")
}
@@ -127,6 +155,42 @@ func RegisterGlobalServer(s grpc.ServiceRegistrar, srv GlobalServer) {
s.RegisterService(&Global_ServiceDesc, srv)
}
func _Global_SysInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).SysInfo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/SysInfo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).SysInfo(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_Summary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).Summary(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/Summary",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).Summary(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _Global_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestWithId)
if err := dec(in); err != nil {
@@ -224,6 +288,14 @@ var Global_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.Global",
HandlerType: (*GlobalServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SysInfo",
Handler: _Global_SysInfo_Handler,
},
{
MethodName: "Summary",
Handler: _Global_Summary_Handler,
},
{
MethodName: "Shutdown",
Handler: _Global_Shutdown_Handler,

View File

@@ -1,5 +1,11 @@
package util
import (
"log"
"os"
"path/filepath"
)
func Conditoinal[T any](cond bool, t, f T) T {
if cond {
return t
@@ -19,3 +25,22 @@ func LenOfBuffers(b [][]byte) (n int) {
}
return
}
func initFatalLog() *os.File {
fatal_log_dir := "./fatal"
if _fatal_log := os.Getenv("M7S_FATAL_LOG"); _fatal_log != "" {
fatal_log_dir = _fatal_log
}
os.MkdirAll(fatal_log_dir, 0766)
fatal_log := filepath.Join(fatal_log_dir, "latest.log")
info, err := os.Stat(fatal_log)
if err == nil && info.Size() != 0 {
os.Rename(fatal_log, filepath.Join(fatal_log_dir, info.ModTime().Format("2006-01-02 15:04:05")+".log"))
}
logFile, err := os.OpenFile(fatal_log, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
log.Println("服务启动出错", "打开异常日志文件失败", err)
return nil
}
return logFile
}

272
pkg/util/socket.go Normal file
View File

@@ -0,0 +1,272 @@
package util
import (
"crypto/sha256"
"crypto/subtle"
"encoding/json"
"fmt"
"net"
"net/http"
"reflect"
"strconv"
"time"
"gopkg.in/yaml.v3"
)
func FetchValue[T any](t T) func() T {
return func() T {
return t
}
}
const (
APIErrorNone = 0
APIErrorDecode = iota + 4000
APIErrorQueryParse
APIErrorNoBody
)
const (
APIErrorNotFound = iota + 4040
APIErrorNoStream
APIErrorNoConfig
APIErrorNoPusher
APIErrorNoSubscriber
APIErrorNoSEI
)
const (
APIErrorInternal = iota + 5000
APIErrorJSONEncode
APIErrorPublish
APIErrorSave
APIErrorOpen
)
type APIError struct {
Code int `json:"code"`
Message string `json:"msg"`
}
type APIResult struct {
Code int `json:"code"`
Data any `json:"data"`
Message string `json:"msg"`
}
func ReturnValue(v any, rw http.ResponseWriter, r *http.Request) {
ReturnFetchValue(FetchValue(v), rw, r)
}
func ReturnOK(rw http.ResponseWriter, r *http.Request) {
ReturnError(0, "ok", rw, r)
}
func ReturnError(code int, msg string, rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
isJson := query.Get("format") == "json"
if isJson {
if err := json.NewEncoder(rw).Encode(APIError{code, msg}); err != nil {
json.NewEncoder(rw).Encode(APIError{
Code: APIErrorJSONEncode,
Message: err.Error(),
})
}
} else {
switch true {
case code == 0:
http.Error(rw, msg, http.StatusOK)
case code/10 == 404:
http.Error(rw, msg, http.StatusNotFound)
case code > 5000:
http.Error(rw, msg, http.StatusInternalServerError)
default:
http.Error(rw, msg, http.StatusBadRequest)
}
}
}
func ReturnFetchList[T any](fetch func() []T, rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
isYaml := query.Get("format") == "yaml"
isJson := query.Get("format") == "json"
pageSize := query.Get("pageSize")
pageNum := query.Get("pageNum")
data := fetch()
var output any
output = data
if pageSize != "" && pageNum != "" {
pageSizeInt, _ := strconv.Atoi(pageSize)
pageNumInt, _ := strconv.Atoi(pageNum)
if pageSizeInt > 0 && pageNumInt > 0 {
start := (pageNumInt - 1) * pageSizeInt
end := pageNumInt * pageSizeInt
if start > len(data) {
start = len(data)
}
if end > len(data) {
end = len(data)
}
output = map[string]any{
"total": len(data),
"list": data[start:end],
"pageSize": pageSizeInt,
"pageNum": pageNumInt,
}
}
}
rw.Header().Set("Content-Type", Conditoinal(isYaml, "text/yaml", "application/json"))
if isYaml {
if err := yaml.NewEncoder(rw).Encode(output); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
} else if isJson {
if err := json.NewEncoder(rw).Encode(APIResult{
Code: 0,
Data: output,
Message: "ok",
}); err != nil {
json.NewEncoder(rw).Encode(APIError{
Code: APIErrorJSONEncode,
Message: err.Error(),
})
}
} else {
if err := json.NewEncoder(rw).Encode(output); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
}
}
func ReturnFetchValue[T any](fetch func() T, rw http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
isYaml := query.Get("format") == "yaml"
isJson := query.Get("format") == "json"
tickDur, err := time.ParseDuration(query.Get("interval"))
if err != nil {
tickDur = time.Second
}
if r.Header.Get("Accept") == "text/event-stream" {
sse := NewSSE(rw, r.Context())
tick := time.NewTicker(tickDur)
defer tick.Stop()
writer := Conditoinal(isYaml, sse.WriteYAML, sse.WriteJSON)
writer(fetch())
for range tick.C {
if writer(fetch()) != nil {
return
}
}
} else {
data := fetch()
rw.Header().Set("Content-Type", Conditoinal(isYaml, "text/yaml", "application/json"))
if isYaml {
if err := yaml.NewEncoder(rw).Encode(data); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
} else if isJson {
if err := json.NewEncoder(rw).Encode(APIResult{
Code: 0,
Data: data,
Message: "ok",
}); err != nil {
json.NewEncoder(rw).Encode(APIError{
Code: APIErrorJSONEncode,
Message: err.Error(),
})
}
} else {
t := reflect.TypeOf(data)
switch t.Kind() {
case reflect.String, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
rw.Header().Set("Content-Type", "text/plain")
fmt.Fprint(rw, data)
default:
if err := json.NewEncoder(rw).Encode(data); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
}
}
}
}
func ListenUDP(address string, networkBuffer int) (*net.UDPConn, error) {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
if err = conn.SetReadBuffer(networkBuffer); err != nil {
return nil, err
}
if err = conn.SetWriteBuffer(networkBuffer); err != nil {
return nil, err
}
return conn, err
}
// CORS 加入跨域策略头包含CORP
func CORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
header := w.Header()
header.Set("Access-Control-Allow-Credentials", "true")
header.Set("Cross-Origin-Resource-Policy", "cross-origin")
header.Set("Access-Control-Allow-Headers", "Content-Type,Access-Token")
origin := r.Header["Origin"]
if len(origin) == 0 {
header.Set("Access-Control-Allow-Origin", "*")
} else {
header.Set("Access-Control-Allow-Origin", origin[0])
}
if next != nil && r.Method != "OPTIONS" {
next.ServeHTTP(w, r)
}
})
}
func BasicAuth(u, p string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract the username and password from the request
// Authorization header. If no Authentication header is present
// or the header value is invalid, then the 'ok' return value
// will be false.
username, password, ok := r.BasicAuth()
if ok {
// Calculate SHA-256 hashes for the provided and expected
// usernames and passwords.
usernameHash := sha256.Sum256([]byte(username))
passwordHash := sha256.Sum256([]byte(password))
expectedUsernameHash := sha256.Sum256([]byte(u))
expectedPasswordHash := sha256.Sum256([]byte(p))
// 使用 subtle.ConstantTimeCompare() 进行校验
// the provided username and password hashes equal the
// expected username and password hashes. ConstantTimeCompare
// 如果值相等则返回1否则返回0。
// Importantly, we should to do the work to evaluate both the
// username and password before checking the return values to
// 避免泄露信息。
usernameMatch := (subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1)
passwordMatch := (subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1)
// If the username and password are correct, then call
// the next handler in the chain. Make sure to return
// afterwards, so that none of the code below is run.
if usernameMatch && passwordMatch {
if next != nil {
next.ServeHTTP(w, r)
}
return
}
}
// If the Authentication header is not present, is invalid, or the
// username or password is wrong, then set a WWW-Authenticate
// header to inform the client that we expect them to use basic
// authentication and send a 401 Unauthorized response.
w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
})
}

View File

@@ -52,6 +52,10 @@ func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE {
header.Set("Connection", "keep-alive")
header.Set("X-Accel-Buffering", "no")
header.Set("Access-Control-Allow-Origin", "*")
// rw.Header().Set("Access-Control-Allow-Headers", "Content-Type,Authorization")
// rw.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
// rw.Header().Set("Access-Control-Allow-Credentials", "true")
// rw.Header().Set("Transfer-Encoding", "chunked")
return &SSE{
ResponseWriter: w,
Context: ctx,

16
pkg/util/stderr.go Normal file
View File

@@ -0,0 +1,16 @@
//go:build (linux && !arm64) || darwin
package util
import (
"os"
"syscall"
)
func init() {
logFile := initFatalLog()
if logFile != nil {
// 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息
syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd()))
}
}

16
pkg/util/stderr_arm64.go Normal file
View File

@@ -0,0 +1,16 @@
//go:build linux && !darwin
package util
import (
"os"
"syscall"
)
func init() {
logFile := initFatalLog()
if logFile != nil {
// 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息
syscall.Dup3(int(logFile.Fd()), int(os.Stderr.Fd()), 0)
}
}

View File

@@ -0,0 +1,38 @@
//go:build windows
package util
import (
"log"
"os"
"syscall"
)
var (
kernel32 = syscall.MustLoadDLL("kernel32.dll")
procSetStdHandle = kernel32.MustFindProc("SetStdHandle")
)
func setStdHandle(stdhandle int32, handle syscall.Handle) error {
r0, _, e1 := syscall.Syscall(procSetStdHandle.Addr(), 2, uintptr(stdhandle), uintptr(handle), 0)
if r0 == 0 {
if e1 != 0 {
return error(e1)
}
return syscall.EINVAL
}
return nil
}
// redirectStderr to the file passed in
func init() {
logFile := initFatalLog()
if logFile != nil {
err := setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(logFile.Fd()))
if err != nil {
log.Fatalf("Failed to redirect stderr to file: %v", err)
}
// SetStdHandle does not affect prior references to stderr
os.Stderr = logFile
}
}

View File

@@ -40,6 +40,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
p.Meta = plugin
p.server = s
p.Logger = s.Logger.With("plugin", plugin.Name)
p.Context, p.CancelCauseFunc = context.WithCancelCause(s.Context)
if os.Getenv(strings.ToUpper(plugin.Name)+"_ENABLE") == "false" {
p.Disabled = true
p.Warn("disabled by env")
@@ -85,7 +86,7 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
if plugin.ServiceDesc != nil && s.grpcServer != nil {
s.grpcServer.RegisterService(plugin.ServiceDesc, instance)
if plugin.RegisterGRPCHandler != nil {
err = plugin.RegisterGRPCHandler(s.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn)
err = plugin.RegisterGRPCHandler(p.Context, s.config.HTTP.GetGRPCMux(), s.grpcClientConn)
if err != nil {
p.Error("init", "error", err)
p.Stop(err)
@@ -155,6 +156,10 @@ func (Plugin) nothing() {
}
func (p *Plugin) GetGlobalCommonConf() *config.Common {
return p.server.GetCommonConf()
}
func (p *Plugin) GetCommonConf() *config.Common {
return &p.config
}
@@ -184,7 +189,6 @@ func (p *Plugin) Stop(err error) {
}
func (p *Plugin) Start() {
p.Context, p.CancelCauseFunc = context.WithCancelCause(p.server.Context)
httpConf := &p.config.HTTP
if httpConf.ListenAddrTLS != "" && (httpConf.ListenAddrTLS != p.server.config.HTTP.ListenAddrTLS) {
p.Info("https listen at ", "addr", httpConf.ListenAddrTLS)

154
plugin/console/index.go Normal file
View File

@@ -0,0 +1,154 @@
package plugin_console
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"
"github.com/quic-go/quic-go"
"m7s.live/m7s/v5"
)
type myResponseWriter struct {
}
func (*myResponseWriter) Header() http.Header {
return make(http.Header)
}
func (*myResponseWriter) WriteHeader(statusCode int) {
}
func (w *myResponseWriter) Flush() {
}
type myResponseWriter2 struct {
quic.Stream
myResponseWriter
}
type myResponseWriter3 struct {
handshake bool
myResponseWriter2
quic.Connection
}
func (w *myResponseWriter3) Write(b []byte) (int, error) {
if !w.handshake {
w.handshake = true
return len(b), nil
}
println(string(b))
return w.Stream.Write(b)
}
func (w *myResponseWriter3) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return net.Conn(w), bufio.NewReadWriter(bufio.NewReader(w), bufio.NewWriter(w)), nil
}
type ConsolePlugin struct {
m7s.Plugin
Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址
Secret string `desc:"远程控制台密钥"` //远程控制台密钥
}
var _ = m7s.InstallPlugin[ConsolePlugin]()
func (cfg *ConsolePlugin) OnInit() error {
tlsConf := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"monibuca"},
}
conn, err := quic.DialAddr(cfg.Context, cfg.Server, tlsConf, &quic.Config{
KeepAlivePeriod: time.Second * 10,
EnableDatagrams: true,
})
if stream := quic.Stream(nil); err == nil {
if stream, err = conn.OpenStreamSync(cfg.Context); err == nil {
_, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...))
if msg := []byte(nil); err == nil {
if msg, err = bufio.NewReader(stream).ReadSlice(0); err == nil {
var rMessage map[string]any
if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil {
if rMessage["code"].(float64) != 0 {
// cfg.Error("response from console server ", cfg.Server, rMessage["msg"])
return fmt.Errorf("response from console server %s %s", cfg.Server, rMessage["msg"])
} else {
// cfg.reportStream = stream
cfg.Info("response from console server ", cfg.Server, rMessage)
// if v, ok := rMessage["enableReport"]; ok {
// cfg.enableReport = v.(bool)
// }
// if v, ok := rMessage["instanceId"]; ok {
// cfg.instanceId = v.(string)
// }
}
}
}
}
}
}
go func() {
for err == nil {
var s quic.Stream
if s, err = conn.AcceptStream(cfg.Context); err == nil {
go cfg.ReceiveRequest(s, conn)
}
}
}()
return err
}
func (cfg *ConsolePlugin) ReceiveRequest(s quic.Stream, conn quic.Connection) error {
defer s.Close()
wr := &myResponseWriter2{Stream: s}
reader := bufio.NewReader(s)
var req *http.Request
url, _, err := reader.ReadLine()
if err == nil {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
req, err = http.NewRequestWithContext(ctx, "GET", string(url), reader)
for err == nil {
var h []byte
if h, _, err = reader.ReadLine(); len(h) > 0 {
if b, a, f := strings.Cut(string(h), ": "); f {
req.Header.Set(b, a)
}
} else {
break
}
}
if err == nil {
h := cfg.GetGlobalCommonConf().GetHandler()
if req.Header.Get("Accept") == "text/event-stream" {
go h.ServeHTTP(wr, req)
} else if req.Header.Get("Upgrade") == "websocket" {
var writer myResponseWriter3
writer.Stream = s
writer.Connection = conn
req.Host = req.Header.Get("Host")
if req.Host == "" {
req.Host = req.URL.Host
}
if req.Host == "" {
req.Host = "m7s.live"
}
h.ServeHTTP(&writer, req) //建立websocket连接,握手
} else {
h.ServeHTTP(wr, req)
}
}
io.ReadAll(s)
}
if err != nil {
cfg.Error("read console server", "err", err)
}
return err
}

View File

@@ -2,7 +2,6 @@ package m7s
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
@@ -45,20 +44,22 @@ type Server struct {
pb.UnimplementedGlobalServer
Plugin
config.Engine
ID int
eventChan chan any
Plugins []*Plugin
Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller]
Pushs util.Collection[string, *Pusher]
Waiting map[string][]*Subscriber
Subscribers util.Collection[int, *Subscriber]
LogHandler MultiLogHandler
pidG int
sidG int
apiList []string
grpcServer *grpc.Server
grpcClientConn *grpc.ClientConn
ID int
eventChan chan any
Plugins []*Plugin
Streams util.Collection[string, *Publisher]
Pulls util.Collection[string, *Puller]
Pushs util.Collection[string, *Pusher]
Waiting map[string][]*Subscriber
Subscribers util.Collection[int, *Subscriber]
LogHandler MultiLogHandler
pidG int
sidG int
apiList []string
grpcServer *grpc.Server
grpcClientConn *grpc.ClientConn
lastSummaryTime time.Time
lastSummary *pb.SummaryResponse
}
func NewServer() (s *Server) {
@@ -103,6 +104,7 @@ func (s *Server) reset() {
}
func (s *Server) Run(ctx context.Context, conf any) (err error) {
s.StartTime = time.Now()
for err = s.run(ctx, conf); err == ErrRestart; err = s.run(ctx, conf) {
s.reset()
}
@@ -265,6 +267,13 @@ func (s *Server) eventLoop() {
switch v := event.(type) {
case *util.Promise[any]:
switch vv := v.Value.(type) {
case func():
vv()
v.Fulfill(nil)
continue
case func() error:
v.Fulfill(vv())
continue
case *Publisher:
err := s.OnPublish(vv)
if v.Fulfill(err); err != nil {
@@ -310,30 +319,6 @@ func (s *Server) eventLoop() {
s.Pushs.Add(vv)
event = v.Value
}
case *pb.StreamSnapRequest:
if pub, ok := s.Streams.Get(vv.StreamPath); ok {
v.Resolve(pub)
} else {
v.Fulfill(ErrNotFound)
}
continue
case *pb.StopSubscribeRequest:
if subscriber, ok := s.Subscribers.Get(int(vv.Id)); ok {
subscriber.Stop(errors.New("stop by api"))
v.Fulfill(nil)
} else {
v.Fulfill(ErrNotFound)
}
continue
case *pb.StreamListRequest:
var streams []*pb.StreamInfo
for _, publisher := range s.Streams.Items {
streams = append(streams, &pb.StreamInfo{
Path: publisher.StreamPath,
})
}
v.Resolve(&pb.StreamListResponse{List: streams})
continue
}
case slog.Handler:
s.LogHandler.Add(v)