diff --git a/api.go b/api.go index ca82599..faccb12 100644 --- a/api.go +++ b/api.go @@ -808,9 +808,9 @@ func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp return } resp = &pb.ResponseList{ - TotalCount: uint32(totalCount), - PageNum: req.PageNum, - PageSize: req.PageSize, + Total: uint32(totalCount), + PageNum: req.PageNum, + PageSize: req.PageSize, } for _, recordFile := range result { resp.Data = append(resp.Data, &pb.RecordFile{ diff --git a/pkg/config/types.go b/pkg/config/types.go index 867f297..a8242b1 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -73,7 +73,8 @@ type ( RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔 Proxy string `desc:"代理地址"` // 代理地址 Header HTTPValues - Args HTTPValues `gorm:"-:all"` // 拉流参数 + Args HTTPValues `gorm:"-:all"` // 拉流参数 + TestMode int `desc:"测试模式,0:关闭,1:只拉流不发布"` // 测试模式 } Push struct { URL string `desc:"推送地址"` // 推送地址 diff --git a/plugin/debug/index.go b/plugin/debug/index.go index 73364a9..9129b15 100644 --- a/plugin/debug/index.go +++ b/plugin/debug/index.go @@ -18,6 +18,7 @@ import ( myproc "github.com/cloudwego/goref/pkg/proc" "github.com/go-delve/delve/pkg/config" "github.com/go-delve/delve/service/debugger" + "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "m7s.live/v5" "m7s.live/v5/plugin/debug/pb" @@ -445,7 +446,7 @@ func (p *DebugPlugin) GetHeapGraph(ctx context.Context, empty *emptypb.Empty) (* }, nil } -func (p *DebugPlugin) StartTcpDump(ctx context.Context, req *pb.TcpDumpRequest) (*pb.TcpDumpResponse, error) { +func (p *DebugPlugin) StartTcpDump(req *pb.TcpDumpRequest, stream grpc.ServerStreamingServer[pb.TcpDumpResponse]) error { args := []string{} if req.Interface != "" { args = append(args, "-i", req.Interface) @@ -459,29 +460,17 @@ func (p *DebugPlugin) StartTcpDump(ctx context.Context, req *pb.TcpDumpRequest) if req.ExtraArgs != "" { args = append(args, strings.Fields(req.ExtraArgs)...) } - - cmd := exec.CommandContext(ctx, "tcpdump", args...) - - output, err := cmd.CombinedOutput() - if err != nil { - // 如果 tcpdump 因为超时而退出,这通常不是一个错误,而是预期的行为 - if exitErr, ok := err.(*exec.ExitError); ok && strings.Contains(string(exitErr.Stderr), "timeout") { - return &pb.TcpDumpResponse{ - Code: 0, - Message: "tcpdump completed with timeout as expected.", - Data: string(output), - }, nil + cmd := exec.CommandContext(p, "tcpdump", args...) + stdout, _ := cmd.StdoutPipe() + var buf [2048]byte + for { + stdout.Read(buf[:]) + if len(buf) == 0 { + break } - return &pb.TcpDumpResponse{ - Code: 1, - Message: fmt.Sprintf("failed to run tcpdump: %v. Output: %s", err, string(output)), - Data: string(output), - }, nil + stream.Send(&pb.TcpDumpResponse{ + Data: buf[:], + }) } - - return &pb.TcpDumpResponse{ - Code: 0, - Message: "tcpdump completed successfully.", - Data: string(output), - }, nil + return cmd.Run() } diff --git a/plugin/debug/pb/debug.pb.go b/plugin/debug/pb/debug.pb.go index e9d5ea0..083689e 100644 --- a/plugin/debug/pb/debug.pb.go +++ b/plugin/debug/pb/debug.pb.go @@ -1078,9 +1078,7 @@ func (x *TcpDumpRequest) GetExtraArgs() string { // TCP Dump响应数据 type TcpDumpResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` // 状态码 (0 表示成功) - Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` // 状态消息 - Data string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // tcpdump 的文本输出内容 + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1115,25 +1113,11 @@ func (*TcpDumpResponse) Descriptor() ([]byte, []int) { return file_debug_proto_rawDescGZIP(), []int{15} } -func (x *TcpDumpResponse) GetCode() uint32 { - if x != nil { - return x.Code - } - return 0 -} - -func (x *TcpDumpResponse) GetMessage() string { - if x != nil { - return x.Message - } - return "" -} - -func (x *TcpDumpResponse) GetData() string { +func (x *TcpDumpResponse) GetData() []byte { if x != nil { return x.Data } - return "" + return nil } var File_debug_proto protoreflect.FileDescriptor @@ -1230,17 +1214,15 @@ const file_debug_proto_rawDesc = "" + "\x06filter\x18\x02 \x01(\tR\x06filter\x12\x1a\n" + "\bduration\x18\x03 \x01(\rR\bduration\x12\x1d\n" + "\n" + - "extra_args\x18\x04 \x01(\tR\textraArgs\"S\n" + + "extra_args\x18\x04 \x01(\tR\textraArgs\"%\n" + "\x0fTcpDumpResponse\x12\x12\n" + - "\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" + - "\amessage\x18\x02 \x01(\tR\amessage\x12\x12\n" + - "\x04data\x18\x03 \x01(\tR\x04data2\xb4\x03\n" + + "\x04data\x18\x01 \x01(\fR\x04data2\xb6\x03\n" + "\x03api\x12O\n" + "\aGetHeap\x12\x16.google.protobuf.Empty\x1a\x13.debug.HeapResponse\"\x17\x82\xd3\xe4\x93\x02\x11\x12\x0f/debug/api/heap\x12_\n" + "\fGetHeapGraph\x12\x16.google.protobuf.Empty\x1a\x18.debug.HeapGraphResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x12\x15/debug/api/heap/graph\x12W\n" + "\vGetCpuGraph\x12\x11.debug.CpuRequest\x1a\x17.debug.CpuGraphResponse\"\x1c\x82\xd3\xe4\x93\x02\x16\x12\x14/debug/api/cpu/graph\x12G\n" + - "\x06GetCpu\x12\x11.debug.CpuRequest\x1a\x12.debug.CpuResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/debug/api/cpu\x12Y\n" + - "\fStartTcpDump\x12\x15.debug.TcpDumpRequest\x1a\x16.debug.TcpDumpResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\"\x12/debug/api/tcpdumpB\x1dZ\x1bm7s.live/v5/plugin/debug/pbb\x06proto3" + "\x06GetCpu\x12\x11.debug.CpuRequest\x1a\x12.debug.CpuResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/debug/api/cpu\x12[\n" + + "\fStartTcpDump\x12\x15.debug.TcpDumpRequest\x1a\x16.debug.TcpDumpResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\x12\x12/debug/api/tcpdump0\x01B\x1dZ\x1bm7s.live/v5/plugin/debug/pbb\x06proto3" var ( file_debug_proto_rawDescOnce sync.Once diff --git a/plugin/debug/pb/debug.pb.gw.go b/plugin/debug/pb/debug.pb.gw.go index 51f1d98..e3513e0 100644 --- a/plugin/debug/pb/debug.pb.gw.go +++ b/plugin/debug/pb/debug.pb.gw.go @@ -144,7 +144,7 @@ var ( filter_Api_StartTcpDump_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} ) -func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { +func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (Api_StartTcpDumpClient, runtime.ServerMetadata, error) { var protoReq TcpDumpRequest var metadata runtime.ServerMetadata @@ -155,24 +155,16 @@ func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.StartTcpDump(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) - return msg, metadata, err - -} - -func local_request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq TcpDumpRequest - var metadata runtime.ServerMetadata - - if err := req.ParseForm(); err != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + stream, err := client.StartTcpDump(ctx, &protoReq) + if err != nil { + return nil, metadata, err } - if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_StartTcpDump_0); err != nil { - return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + header, err := stream.Header() + if err != nil { + return nil, metadata, err } - - msg, err := server.StartTcpDump(ctx, &protoReq) - return msg, metadata, err + metadata.HeaderMD = header + return stream, metadata, nil } @@ -282,29 +274,11 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server }) - mux.Handle("POST", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { - ctx, cancel := context.WithCancel(req.Context()) - defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) - inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/debug.Api/StartTcpDump", runtime.WithHTTPPathPattern("/debug/api/tcpdump")) - if err != nil { - runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) - return - } - resp, md, err := local_request_Api_StartTcpDump_0(annotatedContext, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) - annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) - if err != nil { - runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) - return - } - - forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - + mux.Handle("GET", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return }) return nil @@ -436,7 +410,7 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client }) - mux.Handle("POST", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle("GET", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) @@ -454,7 +428,7 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client return } - forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) }) @@ -482,5 +456,5 @@ var ( forward_Api_GetCpu_0 = runtime.ForwardResponseMessage - forward_Api_StartTcpDump_0 = runtime.ForwardResponseMessage + forward_Api_StartTcpDump_0 = runtime.ForwardResponseStream ) diff --git a/plugin/debug/pb/debug.proto b/plugin/debug/pb/debug.proto index 297395f..c3ae37f 100644 --- a/plugin/debug/pb/debug.proto +++ b/plugin/debug/pb/debug.proto @@ -27,9 +27,9 @@ service api { }; } - rpc StartTcpDump (TcpDumpRequest) returns (TcpDumpResponse) { + rpc StartTcpDump (TcpDumpRequest) returns (stream TcpDumpResponse) { option (google.api.http) = { - post: "/debug/api/tcpdump" + get: "/debug/api/tcpdump" }; } } @@ -150,7 +150,5 @@ message TcpDumpRequest { // TCP Dump响应数据 message TcpDumpResponse { - uint32 code = 1; // 状态码 (0 表示成功) - string message = 2; // 状态消息 - string data = 3; // tcpdump 的文本输出内容 + bytes data = 1; } \ No newline at end of file diff --git a/plugin/debug/pb/debug_grpc.pb.go b/plugin/debug/pb/debug_grpc.pb.go index 1040807..cf69263 100644 --- a/plugin/debug/pb/debug_grpc.pb.go +++ b/plugin/debug/pb/debug_grpc.pb.go @@ -35,7 +35,7 @@ type ApiClient interface { GetHeapGraph(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*HeapGraphResponse, error) GetCpuGraph(ctx context.Context, in *CpuRequest, opts ...grpc.CallOption) (*CpuGraphResponse, error) GetCpu(ctx context.Context, in *CpuRequest, opts ...grpc.CallOption) (*CpuResponse, error) - StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (*TcpDumpResponse, error) + StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TcpDumpResponse], error) } type apiClient struct { @@ -86,16 +86,25 @@ func (c *apiClient) GetCpu(ctx context.Context, in *CpuRequest, opts ...grpc.Cal return out, nil } -func (c *apiClient) StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (*TcpDumpResponse, error) { +func (c *apiClient) StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TcpDumpResponse], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(TcpDumpResponse) - err := c.cc.Invoke(ctx, Api_StartTcpDump_FullMethodName, in, out, cOpts...) + stream, err := c.cc.NewStream(ctx, &Api_ServiceDesc.Streams[0], Api_StartTcpDump_FullMethodName, cOpts...) if err != nil { return nil, err } - return out, nil + x := &grpc.GenericClientStream[TcpDumpRequest, TcpDumpResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil } +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Api_StartTcpDumpClient = grpc.ServerStreamingClient[TcpDumpResponse] + // ApiServer is the server API for Api service. // All implementations must embed UnimplementedApiServer // for forward compatibility. @@ -104,7 +113,7 @@ type ApiServer interface { GetHeapGraph(context.Context, *emptypb.Empty) (*HeapGraphResponse, error) GetCpuGraph(context.Context, *CpuRequest) (*CpuGraphResponse, error) GetCpu(context.Context, *CpuRequest) (*CpuResponse, error) - StartTcpDump(context.Context, *TcpDumpRequest) (*TcpDumpResponse, error) + StartTcpDump(*TcpDumpRequest, grpc.ServerStreamingServer[TcpDumpResponse]) error mustEmbedUnimplementedApiServer() } @@ -127,8 +136,8 @@ func (UnimplementedApiServer) GetCpuGraph(context.Context, *CpuRequest) (*CpuGra func (UnimplementedApiServer) GetCpu(context.Context, *CpuRequest) (*CpuResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetCpu not implemented") } -func (UnimplementedApiServer) StartTcpDump(context.Context, *TcpDumpRequest) (*TcpDumpResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StartTcpDump not implemented") +func (UnimplementedApiServer) StartTcpDump(*TcpDumpRequest, grpc.ServerStreamingServer[TcpDumpResponse]) error { + return status.Errorf(codes.Unimplemented, "method StartTcpDump not implemented") } func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {} func (UnimplementedApiServer) testEmbeddedByValue() {} @@ -223,24 +232,17 @@ func _Api_GetCpu_Handler(srv interface{}, ctx context.Context, dec func(interfac return interceptor(ctx, in, info, handler) } -func _Api_StartTcpDump_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TcpDumpRequest) - if err := dec(in); err != nil { - return nil, err +func _Api_StartTcpDump_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(TcpDumpRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - if interceptor == nil { - return srv.(ApiServer).StartTcpDump(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Api_StartTcpDump_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ApiServer).StartTcpDump(ctx, req.(*TcpDumpRequest)) - } - return interceptor(ctx, in, info, handler) + return srv.(ApiServer).StartTcpDump(m, &grpc.GenericServerStream[TcpDumpRequest, TcpDumpResponse]{ServerStream: stream}) } +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Api_StartTcpDumpServer = grpc.ServerStreamingServer[TcpDumpResponse] + // Api_ServiceDesc is the grpc.ServiceDesc for Api service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -264,11 +266,13 @@ var Api_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetCpu", Handler: _Api_GetCpu_Handler, }, + }, + Streams: []grpc.StreamDesc{ { - MethodName: "StartTcpDump", - Handler: _Api_StartTcpDump_Handler, + StreamName: "StartTcpDump", + Handler: _Api_StartTcpDump_Handler, + ServerStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "debug.proto", } diff --git a/plugin/flv/pkg/pull-httpfile.go b/plugin/flv/pkg/pull-httpfile.go index 44e337e..0071f59 100644 --- a/plugin/flv/pkg/pull-httpfile.go +++ b/plugin/flv/pkg/pull-httpfile.go @@ -2,6 +2,7 @@ package flv import ( "errors" + "io" "m7s.live/v5" "m7s.live/v5/pkg/util" @@ -15,6 +16,10 @@ type Puller struct { func (p *Puller) Run() (err error) { reader := util.NewBufReader(p.ReadCloser) publisher := p.PullJob.Publisher + if publisher == nil { + io.Copy(io.Discard, p.ReadCloser) + return + } var hasAudio, hasVideo bool var absTS uint32 var head util.Memory diff --git a/plugin/flv/pkg/pull-recorder.go b/plugin/flv/pkg/pull-recorder.go index a596b2c..f61bf2e 100644 --- a/plugin/flv/pkg/pull-recorder.go +++ b/plugin/flv/pkg/pull-recorder.go @@ -9,6 +9,7 @@ import ( "time" m7s "m7s.live/v5" + "m7s.live/v5/pkg" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" @@ -47,6 +48,9 @@ func (p *RecordReader) Dispose() { func (p *RecordReader) Run() (err error) { pullJob := &p.PullJob publisher := pullJob.Publisher + if publisher == nil { + return pkg.ErrDisabled + } allocator := util.NewScalableMemoryAllocator(1 << 10) var tagHeader [11]byte var ts int64 @@ -57,9 +61,12 @@ func (p *RecordReader) Run() (err error) { defer func() { allocator.Recycle() }() - publisher.OnGetPosition = func() time.Time { - return realTime + if publisher != nil { + publisher.OnGetPosition = func() time.Time { + return realTime + } } + for loop := 0; loop < p.Loop; loop++ { nextStream: for i, stream := range p.Streams { @@ -85,15 +92,15 @@ func (p *RecordReader) Run() (err error) { err = head.NewReader().ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag) hasAudio := (flag & 0x04) != 0 hasVideo := (flag & 0x01) != 0 + if err != nil { + return + } if !hasAudio { publisher.NoAudio() } if !hasVideo { publisher.NoVideo() } - if err != nil { - return - } if flvHead != [3]byte{'F', 'L', 'V'} { return errors.New("not flv file") } @@ -194,7 +201,7 @@ func (p *RecordReader) Run() (err error) { } } } else { - publisher.Info("script", name, obj) + p.Info("script", name, obj) } default: err = fmt.Errorf("unknown tag type: %d", t) diff --git a/plugin/mp4/pkg/pull-httpfile.go b/plugin/mp4/pkg/pull-httpfile.go index 209458e..bdc53e6 100644 --- a/plugin/mp4/pkg/pull-httpfile.go +++ b/plugin/mp4/pkg/pull-httpfile.go @@ -20,6 +20,10 @@ type HTTPReader struct { func (p *HTTPReader) Run() (err error) { pullJob := &p.PullJob publisher := pullJob.Publisher + if publisher == nil { + io.Copy(io.Discard, p.ReadCloser) + return + } allocator := util.NewScalableMemoryAllocator(1 << 10) var demuxer *Demuxer defer allocator.Recycle() @@ -36,12 +40,12 @@ func (p *HTTPReader) Run() (err error) { } publisher.OnSeek = func(seekTime time.Time) { p.Stop(errors.New("seek")) - pullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat)) + pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat)) newHTTPReader := &HTTPReader{} pullJob.AddTask(newHTTPReader) } - if pullJob.Args.Get(util.StartKey) != "" { - seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Args.Get(util.StartKey)) + if pullJob.Connection.Args.Get(util.StartKey) != "" { + seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey)) demuxer.SeekTime(uint64(seekTime.UnixMilli())) } for _, track := range demuxer.Tracks { diff --git a/plugin/mp4/pkg/pull-recorder.go b/plugin/mp4/pkg/pull-recorder.go index b790876..a946116 100644 --- a/plugin/mp4/pkg/pull-recorder.go +++ b/plugin/mp4/pkg/pull-recorder.go @@ -6,6 +6,7 @@ import ( "time" m7s "m7s.live/v5" + "m7s.live/v5/pkg" "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" @@ -39,6 +40,9 @@ func NewPuller(conf config.Pull) m7s.IPuller { func (p *RecordReader) Run() (err error) { pullJob := &p.PullJob publisher := pullJob.Publisher + if publisher == nil { + return pkg.ErrDisabled + } // allocator := util.NewScalableMemoryAllocator(1 << 10) var ts, tsOffset int64 var realTime time.Time diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index 623f7f9..a871670 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -15,7 +15,7 @@ import ( func (c *Client) Start() (err error) { var addr string - if c.direction == DIRECTION_PULL { + if c.direction == DIRECTION_PULL && c.pullCtx.TestMode == 0 { addr = c.pullCtx.Connection.RemoteURL err = c.pullCtx.Publish() if err != nil { @@ -158,7 +158,9 @@ func (c *Client) Run() (err error) { if len(args) > 0 { m.StreamName += "?" + args.Encode() } - c.Receivers[response.StreamId] = c.pullCtx.Publisher + if c.pullCtx.Publisher != nil { + c.Receivers[response.StreamId] = c.pullCtx.Publisher + } err = c.SendMessage(RTMP_MSG_AMF0_COMMAND, m) // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok { // if response.Object["code"] == "NetStream.Play.Start" { diff --git a/plugin/rtsp/pkg/client.go b/plugin/rtsp/pkg/client.go index 86e241a..de6df9b 100644 --- a/plugin/rtsp/pkg/client.go +++ b/plugin/rtsp/pkg/client.go @@ -59,9 +59,11 @@ func (c *Client) Run() (err error) { return } if c.direction == DIRECTION_PULL { - err = c.pullCtx.Publish() - if err != nil { - return + if c.pullCtx.TestMode == 0 { + err = c.pullCtx.Publish() + if err != nil { + return + } } var medias []*Media if medias, err = c.Describe(); err != nil { diff --git a/plugin/rtsp/pkg/transceiver.go b/plugin/rtsp/pkg/transceiver.go index e6163d0..8e96df0 100644 --- a/plugin/rtsp/pkg/transceiver.go +++ b/plugin/rtsp/pkg/transceiver.go @@ -447,7 +447,7 @@ func (r *Receiver) Receive() (err error) { }, } return r.NetConnection.Receive(false, func(channelID byte, buf []byte) error { - if r.Publisher.Paused != nil { + if r.Publisher != nil && r.Publisher.Paused != nil { r.Stream.Pause() r.Publisher.Paused.Await() r.Stream.Play() @@ -471,6 +471,9 @@ func (r *Receiver) Receive() (err error) { return err } } + if r.Publisher == nil { + return pkg.ErrMuted + } switch int(channelID) { case r.AudioChannelID: if !r.PubAudio { @@ -561,8 +564,6 @@ func (r *Receiver) Receive() (err error) { videoFrame.SetAllocator(r.MemoryAllocator) return pkg.ErrDiscard } - default: - } return pkg.ErrUnsupportCodec }, func(channelID byte, buf []byte) error { diff --git a/plugin/stress/api.go b/plugin/stress/api.go index bd35f7d..98ec378 100644 --- a/plugin/stress/api.go +++ b/plugin/stress/api.go @@ -19,12 +19,13 @@ import ( "m7s.live/v5/plugin/stress/pb" ) -func (r *StressPlugin) pull(count int, url string, puller m7s.PullerFactory) (err error) { +func (r *StressPlugin) pull(count int, url string, testMode int32, puller m7s.PullerFactory) (err error) { hasPlaceholder := strings.Contains(url, "%d") if i := r.pullers.Length; count > i { for j := i; j < count; j++ { conf := config.Pull{} defaults.SetDefaults(&conf) + conf.TestMode = int(testMode) if hasPlaceholder { conf.URL = fmt.Sprintf(url, j) } else { @@ -100,7 +101,7 @@ func (r *StressPlugin) StartPull(ctx context.Context, req *pb.PullRequest) (res default: return nil, fmt.Errorf("unsupport protocol %s", req.Protocol) } - return &gpb.SuccessResponse{}, r.pull(int(req.PullCount), req.RemoteURL, puller) + return &gpb.SuccessResponse{}, r.pull(int(req.PullCount), req.RemoteURL, req.TestMode, puller) } func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) { diff --git a/plugin/stress/pb/stress.pb.go b/plugin/stress/pb/stress.pb.go index 68cdb2d..50e23a6 100644 --- a/plugin/stress/pb/stress.pb.go +++ b/plugin/stress/pb/stress.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.19.1 +// protoc-gen-go v1.36.6 +// protoc v5.29.3 // source: stress.proto package pb @@ -14,6 +14,7 @@ import ( pb "m7s.live/v5/pb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -24,21 +25,18 @@ const ( ) type CountResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + PushCount uint32 `protobuf:"varint,1,opt,name=pushCount,proto3" json:"pushCount,omitempty"` + PullCount uint32 `protobuf:"varint,2,opt,name=pullCount,proto3" json:"pullCount,omitempty"` unknownFields protoimpl.UnknownFields - - PushCount uint32 `protobuf:"varint,1,opt,name=pushCount,proto3" json:"pushCount,omitempty"` - PullCount uint32 `protobuf:"varint,2,opt,name=pullCount,proto3" json:"pullCount,omitempty"` + sizeCache protoimpl.SizeCache } func (x *CountResponse) Reset() { *x = CountResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_stress_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_stress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *CountResponse) String() string { @@ -49,7 +47,7 @@ func (*CountResponse) ProtoMessage() {} func (x *CountResponse) ProtoReflect() protoreflect.Message { mi := &file_stress_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -79,23 +77,20 @@ func (x *CountResponse) GetPullCount() uint32 { } type PushRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` + Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"` + RemoteURL string `protobuf:"bytes,3,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"` + PushCount int32 `protobuf:"varint,4,opt,name=pushCount,proto3" json:"pushCount,omitempty"` unknownFields protoimpl.UnknownFields - - StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` - Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"` - RemoteURL string `protobuf:"bytes,3,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"` - PushCount int32 `protobuf:"varint,4,opt,name=pushCount,proto3" json:"pushCount,omitempty"` + sizeCache protoimpl.SizeCache } func (x *PushRequest) Reset() { *x = PushRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_stress_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_stress_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *PushRequest) String() string { @@ -106,7 +101,7 @@ func (*PushRequest) ProtoMessage() {} func (x *PushRequest) ProtoReflect() protoreflect.Message { mi := &file_stress_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -150,22 +145,20 @@ func (x *PushRequest) GetPushCount() int32 { } type PullRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + RemoteURL string `protobuf:"bytes,1,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"` + Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"` + PullCount int32 `protobuf:"varint,3,opt,name=pullCount,proto3" json:"pullCount,omitempty"` + TestMode int32 `protobuf:"varint,4,opt,name=testMode,proto3" json:"testMode,omitempty"` // 0: pull, 1: pull without publish unknownFields protoimpl.UnknownFields - - RemoteURL string `protobuf:"bytes,1,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"` - Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"` - PullCount int32 `protobuf:"varint,3,opt,name=pullCount,proto3" json:"pullCount,omitempty"` + sizeCache protoimpl.SizeCache } func (x *PullRequest) Reset() { *x = PullRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_stress_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_stress_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *PullRequest) String() string { @@ -176,7 +169,7 @@ func (*PullRequest) ProtoMessage() {} func (x *PullRequest) ProtoReflect() protoreflect.Message { mi := &file_stress_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -212,85 +205,54 @@ func (x *PullRequest) GetPullCount() int32 { return 0 } +func (x *PullRequest) GetTestMode() int32 { + if x != nil { + return x.TestMode + } + return 0 +} + var File_stress_proto protoreflect.FileDescriptor -var file_stress_proto_rawDesc = []byte{ - 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, - 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x1a, 0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, - 0x4b, 0x0a, 0x0d, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1c, - 0x0a, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x85, 0x01, 0x0a, - 0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, - 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, - 0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, - 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, - 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x65, 0x0a, 0x0b, 0x50, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, - 0x4c, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1c, 0x0a, - 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, 0xf1, 0x03, 0x0a, 0x03, - 0x61, 0x70, 0x69, 0x12, 0x6d, 0x0a, 0x09, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x73, 0x68, - 0x12, 0x13, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, - 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x32, - 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2c, 0x22, 0x27, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, - 0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, - 0x6f, 0x6c, 0x7d, 0x2f, 0x7b, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x7d, 0x3a, - 0x01, 0x2a, 0x12, 0x6d, 0x0a, 0x09, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x6c, 0x6c, 0x12, - 0x13, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, - 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x32, 0x82, - 0xd3, 0xe4, 0x93, 0x02, 0x2c, 0x22, 0x27, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x70, 0x75, 0x6c, 0x6c, 0x2f, 0x7b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x7d, 0x2f, 0x7b, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x7d, 0x3a, 0x01, - 0x2a, 0x12, 0x54, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, - 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, - 0x69, 0x2f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x5a, 0x0a, 0x08, 0x53, 0x74, 0x6f, 0x70, 0x50, - 0x75, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x67, 0x6c, - 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x22, 0x15, 0x2f, 0x73, - 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x70, - 0x75, 0x73, 0x68, 0x12, 0x5a, 0x0a, 0x08, 0x53, 0x74, 0x6f, 0x70, 0x50, 0x75, 0x6c, 0x6c, 0x12, - 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, - 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x22, 0x15, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, - 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x70, 0x75, 0x6c, 0x6c, 0x42, - 0x1e, 0x5a, 0x1c, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x70, - 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, 0x62, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +const file_stress_proto_rawDesc = "" + + "\n" + + "\fstress.proto\x12\x06stress\x1a\x1cgoogle/api/annotations.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\fglobal.proto\"K\n" + + "\rCountResponse\x12\x1c\n" + + "\tpushCount\x18\x01 \x01(\rR\tpushCount\x12\x1c\n" + + "\tpullCount\x18\x02 \x01(\rR\tpullCount\"\x85\x01\n" + + "\vPushRequest\x12\x1e\n" + + "\n" + + "streamPath\x18\x01 \x01(\tR\n" + + "streamPath\x12\x1a\n" + + "\bprotocol\x18\x02 \x01(\tR\bprotocol\x12\x1c\n" + + "\tremoteURL\x18\x03 \x01(\tR\tremoteURL\x12\x1c\n" + + "\tpushCount\x18\x04 \x01(\x05R\tpushCount\"\x81\x01\n" + + "\vPullRequest\x12\x1c\n" + + "\tremoteURL\x18\x01 \x01(\tR\tremoteURL\x12\x1a\n" + + "\bprotocol\x18\x02 \x01(\tR\bprotocol\x12\x1c\n" + + "\tpullCount\x18\x03 \x01(\x05R\tpullCount\x12\x1a\n" + + "\btestMode\x18\x04 \x01(\x05R\btestMode2\xf1\x03\n" + + "\x03api\x12m\n" + + "\tStartPush\x12\x13.stress.PushRequest\x1a\x17.global.SuccessResponse\"2\x82\xd3\xe4\x93\x02,:\x01*\"'/stress/api/push/{protocol}/{pushCount}\x12m\n" + + "\tStartPull\x12\x13.stress.PullRequest\x1a\x17.global.SuccessResponse\"2\x82\xd3\xe4\x93\x02,:\x01*\"'/stress/api/pull/{protocol}/{pullCount}\x12T\n" + + "\bGetCount\x12\x16.google.protobuf.Empty\x1a\x15.stress.CountResponse\"\x19\x82\xd3\xe4\x93\x02\x13\x12\x11/stress/api/count\x12Z\n" + + "\bStopPush\x12\x16.google.protobuf.Empty\x1a\x17.global.SuccessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\"\x15/stress/api/stop/push\x12Z\n" + + "\bStopPull\x12\x16.google.protobuf.Empty\x1a\x17.global.SuccessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\"\x15/stress/api/stop/pullB\x1eZ\x1cm7s.live/v5/plugin/stress/pbb\x06proto3" var ( file_stress_proto_rawDescOnce sync.Once - file_stress_proto_rawDescData = file_stress_proto_rawDesc + file_stress_proto_rawDescData []byte ) func file_stress_proto_rawDescGZIP() []byte { file_stress_proto_rawDescOnce.Do(func() { - file_stress_proto_rawDescData = protoimpl.X.CompressGZIP(file_stress_proto_rawDescData) + file_stress_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_stress_proto_rawDesc), len(file_stress_proto_rawDesc))) }) return file_stress_proto_rawDescData } var file_stress_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_stress_proto_goTypes = []interface{}{ +var file_stress_proto_goTypes = []any{ (*CountResponse)(nil), // 0: stress.CountResponse (*PushRequest)(nil), // 1: stress.PushRequest (*PullRequest)(nil), // 2: stress.PullRequest @@ -320,49 +282,11 @@ func file_stress_proto_init() { if File_stress_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_stress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CountResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_stress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PushRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_stress_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PullRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_stress_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_stress_proto_rawDesc), len(file_stress_proto_rawDesc)), NumEnums: 0, NumMessages: 3, NumExtensions: 0, @@ -373,7 +297,6 @@ func file_stress_proto_init() { MessageInfos: file_stress_proto_msgTypes, }.Build() File_stress_proto = out.File - file_stress_proto_rawDesc = nil file_stress_proto_goTypes = nil file_stress_proto_depIdxs = nil } diff --git a/plugin/stress/pb/stress.proto b/plugin/stress/pb/stress.proto index b4e0443..8e90910 100644 --- a/plugin/stress/pb/stress.proto +++ b/plugin/stress/pb/stress.proto @@ -51,4 +51,5 @@ message PullRequest { string remoteURL = 1; string protocol = 2; int32 pullCount = 3; + int32 testMode = 4; // 0: pull, 1: pull without publish } \ No newline at end of file diff --git a/plugin/stress/pb/stress_grpc.pb.go b/plugin/stress/pb/stress_grpc.pb.go index 926152c..08e4d7a 100644 --- a/plugin/stress/pb/stress_grpc.pb.go +++ b/plugin/stress/pb/stress_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.19.1 +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 // source: stress.proto package pb @@ -17,8 +17,16 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + Api_StartPush_FullMethodName = "/stress.api/StartPush" + Api_StartPull_FullMethodName = "/stress.api/StartPull" + Api_GetCount_FullMethodName = "/stress.api/GetCount" + Api_StopPush_FullMethodName = "/stress.api/StopPush" + Api_StopPull_FullMethodName = "/stress.api/StopPull" +) // ApiClient is the client API for Api service. // @@ -40,8 +48,9 @@ func NewApiClient(cc grpc.ClientConnInterface) ApiClient { } func (c *apiClient) StartPush(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(pb.SuccessResponse) - err := c.cc.Invoke(ctx, "/stress.api/StartPush", in, out, opts...) + err := c.cc.Invoke(ctx, Api_StartPush_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -49,8 +58,9 @@ func (c *apiClient) StartPush(ctx context.Context, in *PushRequest, opts ...grpc } func (c *apiClient) StartPull(ctx context.Context, in *PullRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(pb.SuccessResponse) - err := c.cc.Invoke(ctx, "/stress.api/StartPull", in, out, opts...) + err := c.cc.Invoke(ctx, Api_StartPull_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -58,8 +68,9 @@ func (c *apiClient) StartPull(ctx context.Context, in *PullRequest, opts ...grpc } func (c *apiClient) GetCount(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*CountResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CountResponse) - err := c.cc.Invoke(ctx, "/stress.api/GetCount", in, out, opts...) + err := c.cc.Invoke(ctx, Api_GetCount_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -67,8 +78,9 @@ func (c *apiClient) GetCount(ctx context.Context, in *emptypb.Empty, opts ...grp } func (c *apiClient) StopPush(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.SuccessResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(pb.SuccessResponse) - err := c.cc.Invoke(ctx, "/stress.api/StopPush", in, out, opts...) + err := c.cc.Invoke(ctx, Api_StopPush_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -76,8 +88,9 @@ func (c *apiClient) StopPush(ctx context.Context, in *emptypb.Empty, opts ...grp } func (c *apiClient) StopPull(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.SuccessResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(pb.SuccessResponse) - err := c.cc.Invoke(ctx, "/stress.api/StopPull", in, out, opts...) + err := c.cc.Invoke(ctx, Api_StopPull_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -86,7 +99,7 @@ func (c *apiClient) StopPull(ctx context.Context, in *emptypb.Empty, opts ...grp // ApiServer is the server API for Api service. // All implementations must embed UnimplementedApiServer -// for forward compatibility +// for forward compatibility. type ApiServer interface { StartPush(context.Context, *PushRequest) (*pb.SuccessResponse, error) StartPull(context.Context, *PullRequest) (*pb.SuccessResponse, error) @@ -96,9 +109,12 @@ type ApiServer interface { mustEmbedUnimplementedApiServer() } -// UnimplementedApiServer must be embedded to have forward compatible implementations. -type UnimplementedApiServer struct { -} +// UnimplementedApiServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedApiServer struct{} func (UnimplementedApiServer) StartPush(context.Context, *PushRequest) (*pb.SuccessResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StartPush not implemented") @@ -116,6 +132,7 @@ func (UnimplementedApiServer) StopPull(context.Context, *emptypb.Empty) (*pb.Suc return nil, status.Errorf(codes.Unimplemented, "method StopPull not implemented") } func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {} +func (UnimplementedApiServer) testEmbeddedByValue() {} // UnsafeApiServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to ApiServer will @@ -125,6 +142,13 @@ type UnsafeApiServer interface { } func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) { + // If the following call pancis, it indicates UnimplementedApiServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Api_ServiceDesc, srv) } @@ -138,7 +162,7 @@ func _Api_StartPush_Handler(srv interface{}, ctx context.Context, dec func(inter } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/stress.api/StartPush", + FullMethod: Api_StartPush_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ApiServer).StartPush(ctx, req.(*PushRequest)) @@ -156,7 +180,7 @@ func _Api_StartPull_Handler(srv interface{}, ctx context.Context, dec func(inter } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/stress.api/StartPull", + FullMethod: Api_StartPull_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ApiServer).StartPull(ctx, req.(*PullRequest)) @@ -174,7 +198,7 @@ func _Api_GetCount_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/stress.api/GetCount", + FullMethod: Api_GetCount_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ApiServer).GetCount(ctx, req.(*emptypb.Empty)) @@ -192,7 +216,7 @@ func _Api_StopPush_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/stress.api/StopPush", + FullMethod: Api_StopPush_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ApiServer).StopPush(ctx, req.(*emptypb.Empty)) @@ -210,7 +234,7 @@ func _Api_StopPull_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/stress.api/StopPull", + FullMethod: Api_StopPull_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ApiServer).StopPull(ctx, req.(*emptypb.Empty)) diff --git a/puller.go b/puller.go index c32d8f4..b6a9d96 100644 --- a/puller.go +++ b/puller.go @@ -25,7 +25,6 @@ type ( Args url.Values RemoteURL string // 远程服务器地址(用于推拉) HTTPClient *http.Client - Header http.Header } IPuller interface { @@ -37,10 +36,10 @@ type ( PullJob struct { Connection + *config.Pull Publisher *Publisher PublishConfig config.Publish puller IPuller - conf *config.Pull } HTTPFilePuller struct { @@ -66,11 +65,10 @@ type ( } ) -func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, proxyConf string, header http.Header) { +func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, proxyConf string) { conn.RemoteURL = href conn.StreamPath = streamPath conn.Plugin = plugin - conn.Header = header conn.HTTPClient = http.DefaultClient if proxyConf != "" { proxy, err := url.Parse(proxyConf) @@ -93,8 +91,8 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c p.PublishConfig = *pubConf } p.PublishConfig.PubType = PublishTypePull - p.Args = url.Values(conf.Args.DeepClone()) - p.conf = &conf + p.Connection.Args = url.Values(conf.Args.DeepClone()) + p.Pull = &conf remoteURL := conf.URL u, err := url.Parse(remoteURL) if err == nil { @@ -102,17 +100,17 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c // file remoteURL = u.Path } - if p.Args == nil { - p.Args = u.Query() + if p.Connection.Args == nil { + p.Connection.Args = u.Query() } else { for k, v := range u.Query() { for _, vv := range v { - p.Args.Add(k, vv) + p.Connection.Args.Add(k, vv) } } } } - p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy, http.Header(conf.Header)) + p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy) p.puller = puller p.SetDescriptions(task.Description{ "plugin": plugin.Meta.Name, @@ -159,13 +157,13 @@ func (p *PullJob) GetKey() string { func (p *PullJob) Publish() (err error) { streamPath := p.StreamPath - if len(p.Args) > 0 { - streamPath += "?" + p.Args.Encode() + if len(p.Connection.Args) > 0 { + streamPath += "?" + p.Connection.Args.Encode() } p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, p.PublishConfig) if err == nil { p.Publisher.OnDispose(func() { - if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.conf.MaxRetry == 0 { + if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.MaxRetry == 0 { p.Stop(p.Publisher.StopReason()) } else { p.puller.Stop(p.Publisher.StopReason()) @@ -185,8 +183,10 @@ func (p *PullJob) Start() (err error) { } func (p *HTTPFilePuller) Start() (err error) { - if err = p.PullJob.Publish(); err != nil { - return + if p.PullJob.TestMode == 0 { + if err = p.PullJob.Publish(); err != nil { + return + } } if p.ReadCloser != nil { return @@ -260,30 +260,34 @@ func (p *RecordFilePuller) Start() (err error) { return pkg.ErrNoDB } p.PullJob.PublishConfig.PubType = PublishTypeVod - if err = p.PullJob.Publish(); err != nil { - return + if p.PullJob.TestMode == 0 { + if err = p.PullJob.Publish(); err != nil { + return + } } - if p.PullStartTime, p.PullEndTime, err = util.TimeRangeQueryParse(p.PullJob.Args); err != nil { + if p.PullStartTime, p.PullEndTime, err = util.TimeRangeQueryParse(p.PullJob.Connection.Args); err != nil { return } p.seekChan = make(chan time.Time, 1) - loop := p.PullJob.Args.Get(util.LoopKey) + loop := p.PullJob.Connection.Args.Get(util.LoopKey) p.Loop, err = strconv.Atoi(loop) if err != nil || p.Loop < 0 { p.Loop = math.MaxInt32 } publisher := p.PullJob.Publisher - publisher.OnSeek = func(seekTime time.Time) { - // p.PullStartTime = seekTime - // p.SetRetry(1, 0) - // if util.UnixTimeReg.MatchString(p.PullJob.Args.Get(util.EndKey)) { - // p.PullJob.Args.Set(util.StartKey, strconv.FormatInt(seekTime.Unix(), 10)) - // } else { - // p.PullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat)) - // } - select { - case p.seekChan <- seekTime: - default: + if publisher != nil { + publisher.OnSeek = func(seekTime time.Time) { + // p.PullStartTime = seekTime + // p.SetRetry(1, 0) + // if util.UnixTimeReg.MatchString(p.PullJob.Args.Get(util.EndKey)) { + // p.PullJob.Args.Set(util.StartKey, strconv.FormatInt(seekTime.Unix(), 10)) + // } else { + // p.PullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat)) + // } + select { + case p.seekChan <- seekTime: + default: + } } } return p.queryRecordStreams(p.PullStartTime, p.PullEndTime) diff --git a/pusher.go b/pusher.go index ff96d36..f8d0666 100644 --- a/pusher.go +++ b/pusher.go @@ -1,7 +1,6 @@ package m7s import ( - "net/http" "time" "m7s.live/v5/pkg" @@ -29,7 +28,7 @@ func (p *PushJob) GetKey() string { } func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push, subConf *config.Subscribe) *PushJob { - p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, http.Header(conf.Header)) + p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy) p.pusher = pusher if subConf == nil { p.SubConf = plugin.config.Subscribe