diff --git a/api.go b/api.go index b3603ac..5f59697 100644 --- a/api.go +++ b/api.go @@ -396,6 +396,16 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.RequestWithId) (res return &pb.SuccessResponse{}, err } +func (s *Server) StopPublish(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.SuccessResponse, err error) { + s.Streams.Call(func() error { + if s, ok := s.Streams.Get(req.StreamPath); ok { + s.Stop(pkg.ErrStopFromAPI) + } + return nil + }) + return &pb.SuccessResponse{}, err +} + // /api/stream/list func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) { s.Streams.Call(func() error { diff --git a/pb/global.pb.go b/pb/global.pb.go index a9ca67b..f472002 100644 --- a/pb/global.pb.go +++ b/pb/global.pb.go @@ -3083,7 +3083,7 @@ var file_global_proto_rawDesc = []byte{ 0x0a, 0x05, 0x61, 0x6c, 0x69, 0x61, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x61, 0x6c, 0x69, 0x61, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x61, 0x75, 0x74, 0x6f, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x61, 0x75, 0x74, 0x6f, 0x52, 0x65, - 0x6d, 0x6f, 0x76, 0x65, 0x32, 0xc3, 0x10, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x50, 0x0a, 0x07, + 0x6d, 0x6f, 0x76, 0x65, 0x32, 0xb3, 0x11, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x50, 0x0a, 0x07, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x79, 0x73, 0x49, 0x6e, 0x66, 0x6f, @@ -3166,7 +3166,14 @@ var file_global_proto_rawDesc = []byte{ 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1c, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x16, 0x22, 0x11, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x61, - 0x6c, 0x69, 0x61, 0x73, 0x3a, 0x01, 0x2a, 0x12, 0x64, 0x0a, 0x0d, 0x53, 0x74, 0x6f, 0x70, 0x53, + 0x6c, 0x69, 0x61, 0x73, 0x3a, 0x01, 0x2a, 0x12, 0x6e, 0x0a, 0x0b, 0x53, 0x74, 0x6f, 0x70, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x19, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x2b, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x25, 0x22, 0x20, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, + 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, + 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x01, 0x2a, 0x12, 0x64, 0x0a, 0x0d, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x15, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, @@ -3332,37 +3339,39 @@ var file_global_proto_depIdxs = []int32{ 15, // 48: global.api.VideoTrackSnap:input_type -> global.StreamSnapRequest 27, // 49: global.api.ChangeSubscribe:input_type -> global.ChangeSubscribeRequest 34, // 50: global.api.SetStreamAlias:input_type -> global.SetStreamAliasRequest - 26, // 51: global.api.StopSubscribe:input_type -> global.RequestWithId - 0, // 52: global.api.GetConfig:input_type -> global.GetConfigRequest - 0, // 53: global.api.GetFormily:input_type -> global.GetConfigRequest - 4, // 54: global.api.ModifyConfig:input_type -> global.ModifyConfigRequest - 44, // 55: global.api.GetDeviceList:input_type -> google.protobuf.Empty - 33, // 56: global.api.AddDevice:input_type -> global.DeviceInfo - 26, // 57: global.api.RemoveDevice:input_type -> global.RequestWithId - 33, // 58: global.api.UpdateDevice:input_type -> global.DeviceInfo - 10, // 59: global.api.SysInfo:output_type -> global.SysInfoResponse - 7, // 60: global.api.Summary:output_type -> global.SummaryResponse - 44, // 61: global.api.Shutdown:output_type -> google.protobuf.Empty - 44, // 62: global.api.Restart:output_type -> google.protobuf.Empty - 11, // 63: global.api.TaskTree:output_type -> global.TaskTreeResponse - 13, // 64: global.api.StreamList:output_type -> global.StreamListResponse - 14, // 65: global.api.WaitList:output_type -> global.StreamWaitListResponse - 16, // 66: global.api.StreamInfo:output_type -> global.StreamInfoResponse - 31, // 67: global.api.GetSubscribers:output_type -> global.SubscribersResponse - 23, // 68: global.api.AudioTrackSnap:output_type -> global.TrackSnapShotResponse - 23, // 69: global.api.VideoTrackSnap:output_type -> global.TrackSnapShotResponse - 25, // 70: global.api.ChangeSubscribe:output_type -> global.SuccessResponse - 25, // 71: global.api.SetStreamAlias:output_type -> global.SuccessResponse - 25, // 72: global.api.StopSubscribe:output_type -> global.SuccessResponse - 3, // 73: global.api.GetConfig:output_type -> global.GetConfigResponse - 3, // 74: global.api.GetFormily:output_type -> global.GetConfigResponse - 25, // 75: global.api.ModifyConfig:output_type -> global.SuccessResponse - 32, // 76: global.api.GetDeviceList:output_type -> global.DeviceListResponse - 25, // 77: global.api.AddDevice:output_type -> global.SuccessResponse - 25, // 78: global.api.RemoveDevice:output_type -> global.SuccessResponse - 25, // 79: global.api.UpdateDevice:output_type -> global.SuccessResponse - 59, // [59:80] is the sub-list for method output_type - 38, // [38:59] is the sub-list for method input_type + 15, // 51: global.api.StopPublish:input_type -> global.StreamSnapRequest + 26, // 52: global.api.StopSubscribe:input_type -> global.RequestWithId + 0, // 53: global.api.GetConfig:input_type -> global.GetConfigRequest + 0, // 54: global.api.GetFormily:input_type -> global.GetConfigRequest + 4, // 55: global.api.ModifyConfig:input_type -> global.ModifyConfigRequest + 44, // 56: global.api.GetDeviceList:input_type -> google.protobuf.Empty + 33, // 57: global.api.AddDevice:input_type -> global.DeviceInfo + 26, // 58: global.api.RemoveDevice:input_type -> global.RequestWithId + 33, // 59: global.api.UpdateDevice:input_type -> global.DeviceInfo + 10, // 60: global.api.SysInfo:output_type -> global.SysInfoResponse + 7, // 61: global.api.Summary:output_type -> global.SummaryResponse + 44, // 62: global.api.Shutdown:output_type -> google.protobuf.Empty + 44, // 63: global.api.Restart:output_type -> google.protobuf.Empty + 11, // 64: global.api.TaskTree:output_type -> global.TaskTreeResponse + 13, // 65: global.api.StreamList:output_type -> global.StreamListResponse + 14, // 66: global.api.WaitList:output_type -> global.StreamWaitListResponse + 16, // 67: global.api.StreamInfo:output_type -> global.StreamInfoResponse + 31, // 68: global.api.GetSubscribers:output_type -> global.SubscribersResponse + 23, // 69: global.api.AudioTrackSnap:output_type -> global.TrackSnapShotResponse + 23, // 70: global.api.VideoTrackSnap:output_type -> global.TrackSnapShotResponse + 25, // 71: global.api.ChangeSubscribe:output_type -> global.SuccessResponse + 25, // 72: global.api.SetStreamAlias:output_type -> global.SuccessResponse + 25, // 73: global.api.StopPublish:output_type -> global.SuccessResponse + 25, // 74: global.api.StopSubscribe:output_type -> global.SuccessResponse + 3, // 75: global.api.GetConfig:output_type -> global.GetConfigResponse + 3, // 76: global.api.GetFormily:output_type -> global.GetConfigResponse + 25, // 77: global.api.ModifyConfig:output_type -> global.SuccessResponse + 32, // 78: global.api.GetDeviceList:output_type -> global.DeviceListResponse + 25, // 79: global.api.AddDevice:output_type -> global.SuccessResponse + 25, // 80: global.api.RemoveDevice:output_type -> global.SuccessResponse + 25, // 81: global.api.UpdateDevice:output_type -> global.SuccessResponse + 60, // [60:82] is the sub-list for method output_type + 38, // [38:60] is the sub-list for method input_type 38, // [38:38] is the sub-list for extension type_name 38, // [38:38] is the sub-list for extension extendee 0, // [0:38] is the sub-list for field type_name diff --git a/pb/global.pb.gw.go b/pb/global.pb.gw.go index 1097612..8c15047 100644 --- a/pb/global.pb.gw.go +++ b/pb/global.pb.gw.go @@ -576,6 +576,66 @@ func local_request_Api_SetStreamAlias_0(ctx context.Context, marshaler runtime.M } +func request_Api_StopPublish_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StreamSnapRequest + var metadata runtime.ServerMetadata + + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["streamPath"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") + } + + protoReq.StreamPath, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err) + } + + msg, err := client.StopPublish(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Api_StopPublish_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StreamSnapRequest + var metadata runtime.ServerMetadata + + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["streamPath"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath") + } + + protoReq.StreamPath, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err) + } + + msg, err := server.StopPublish(ctx, &protoReq) + return msg, metadata, err + +} + func request_Api_StopSubscribe_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq RequestWithId var metadata runtime.ServerMetadata @@ -1261,6 +1321,31 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server }) + mux.Handle("POST", pattern_Api_StopPublish_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/global.Api/StopPublish", runtime.WithHTTPPathPattern("/api/stream/stop/{streamPath=**}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Api_StopPublish_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Api_StopPublish_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("POST", pattern_Api_StopSubscribe_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -1788,6 +1873,28 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client }) + mux.Handle("POST", pattern_Api_StopPublish_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/global.Api/StopPublish", runtime.WithHTTPPathPattern("/api/stream/stop/{streamPath=**}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Api_StopPublish_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Api_StopPublish_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("POST", pattern_Api_StopSubscribe_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -1994,6 +2101,8 @@ var ( pattern_Api_SetStreamAlias_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "stream", "alias"}, "")) + pattern_Api_StopPublish_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "stop", "streamPath"}, "")) + pattern_Api_StopSubscribe_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "subscribe", "stop", "id"}, "")) pattern_Api_GetConfig_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "config", "get", "name"}, "")) @@ -2038,6 +2147,8 @@ var ( forward_Api_SetStreamAlias_0 = runtime.ForwardResponseMessage + forward_Api_StopPublish_0 = runtime.ForwardResponseMessage + forward_Api_StopSubscribe_0 = runtime.ForwardResponseMessage forward_Api_GetConfig_0 = runtime.ForwardResponseMessage diff --git a/pb/global.proto b/pb/global.proto index 7d0b11e..c8dbcdc 100644 --- a/pb/global.proto +++ b/pb/global.proto @@ -75,6 +75,12 @@ service api { body: "*" }; } + rpc StopPublish(StreamSnapRequest) returns (SuccessResponse) { + option (google.api.http) = { + post: "/api/stream/stop/{streamPath=**}" + body: "*" + }; + } rpc StopSubscribe (RequestWithId) returns (SuccessResponse) { option (google.api.http) = { post: "/api/subscribe/stop/{id}" diff --git a/pb/global_grpc.pb.go b/pb/global_grpc.pb.go index 91f8ff5..701123f 100644 --- a/pb/global_grpc.pb.go +++ b/pb/global_grpc.pb.go @@ -36,6 +36,7 @@ type ApiClient interface { VideoTrackSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*TrackSnapShotResponse, error) ChangeSubscribe(ctx context.Context, in *ChangeSubscribeRequest, opts ...grpc.CallOption) (*SuccessResponse, error) SetStreamAlias(ctx context.Context, in *SetStreamAliasRequest, opts ...grpc.CallOption) (*SuccessResponse, error) + StopPublish(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*SuccessResponse, error) StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) GetConfig(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) GetFormily(ctx context.Context, in *GetConfigRequest, opts ...grpc.CallOption) (*GetConfigResponse, error) @@ -171,6 +172,15 @@ func (c *apiClient) SetStreamAlias(ctx context.Context, in *SetStreamAliasReques return out, nil } +func (c *apiClient) StopPublish(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*SuccessResponse, error) { + out := new(SuccessResponse) + err := c.cc.Invoke(ctx, "/global.api/StopPublish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *apiClient) StopSubscribe(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*SuccessResponse, error) { out := new(SuccessResponse) err := c.cc.Invoke(ctx, "/global.api/StopSubscribe", in, out, opts...) @@ -260,6 +270,7 @@ type ApiServer interface { VideoTrackSnap(context.Context, *StreamSnapRequest) (*TrackSnapShotResponse, error) ChangeSubscribe(context.Context, *ChangeSubscribeRequest) (*SuccessResponse, error) SetStreamAlias(context.Context, *SetStreamAliasRequest) (*SuccessResponse, error) + StopPublish(context.Context, *StreamSnapRequest) (*SuccessResponse, error) StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error) GetConfig(context.Context, *GetConfigRequest) (*GetConfigResponse, error) GetFormily(context.Context, *GetConfigRequest) (*GetConfigResponse, error) @@ -314,6 +325,9 @@ func (UnimplementedApiServer) ChangeSubscribe(context.Context, *ChangeSubscribeR func (UnimplementedApiServer) SetStreamAlias(context.Context, *SetStreamAliasRequest) (*SuccessResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SetStreamAlias not implemented") } +func (UnimplementedApiServer) StopPublish(context.Context, *StreamSnapRequest) (*SuccessResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StopPublish not implemented") +} func (UnimplementedApiServer) StopSubscribe(context.Context, *RequestWithId) (*SuccessResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StopSubscribe not implemented") } @@ -585,6 +599,24 @@ func _Api_SetStreamAlias_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Api_StopPublish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StreamSnapRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ApiServer).StopPublish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/global.api/StopPublish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ApiServer).StopPublish(ctx, req.(*StreamSnapRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Api_StopSubscribe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(RequestWithId) if err := dec(in); err != nil { @@ -788,6 +820,10 @@ var Api_ServiceDesc = grpc.ServiceDesc{ MethodName: "SetStreamAlias", Handler: _Api_SetStreamAlias_Handler, }, + { + MethodName: "StopPublish", + Handler: _Api_StopPublish_Handler, + }, { MethodName: "StopSubscribe", Handler: _Api_StopSubscribe_Handler, diff --git a/pkg/task/task.go b/pkg/task/task.go index fa52239..0bbae8e 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -199,8 +199,14 @@ func (task *Task) StopReason() error { return context.Cause(task.Context) } -func (task *Task) StopReasonIs(err error) bool { - return errors.Is(err, task.StopReason()) +func (task *Task) StopReasonIs(errs ...error) bool { + stopReason := task.StopReason() + for _, err := range errs { + if errors.Is(err, stopReason) { + return true + } + } + return false } func (task *Task) Stop(err error) { diff --git a/puller.go b/puller.go index 35107ab..4a90c3a 100644 --- a/puller.go +++ b/puller.go @@ -134,8 +134,8 @@ func (p *PullJob) Publish() (err error) { p.Publisher.Type = PublishTypePull if err == nil && p.conf.MaxRetry != 0 { p.Publisher.OnDispose(func() { - if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout) { - p.Stop(pkg.ErrPublishDelayCloseTimeout) + if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout,pkg.ErrStopFromAPI) { + p.Stop(p.Publisher.StopReason()) } }) } diff --git a/recoder.go b/recoder.go index 5ce1de5..33585f2 100644 --- a/recoder.go +++ b/recoder.go @@ -1,11 +1,10 @@ package m7s import ( - "m7s.live/pro/pkg/config" - "os" - "path/filepath" "time" + "m7s.live/pro/pkg/config" + "m7s.live/pro/pkg/task" "m7s.live/pro/pkg" @@ -80,14 +79,14 @@ func (p *RecordJob) Start() (err error) { if _, ok := s.Records.Get(p.GetKey()); ok { return pkg.ErrRecordSamePath } - dir := p.FilePath - if p.Fragment == 0 || p.Append { - dir = filepath.Dir(p.FilePath) - } - p.SetDescription("filePath", p.FilePath) - if err = os.MkdirAll(dir, 0755); err != nil { - return - } + // dir := p.FilePath + // if p.Fragment == 0 || p.Append { + // dir = filepath.Dir(p.FilePath) + // } + // p.SetDescription("filePath", p.FilePath) + // if err = os.MkdirAll(dir, 0755); err != nil { + // return + // } p.AddTask(p.recorder, p.Logger) return } diff --git a/scripts/README.md b/scripts/README.md index b204fb1..fce4511 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -1,4 +1,9 @@ # use protoc to generate the go code from the proto file 1. cd to plugin/xxx -2. sh ../../scripts/protoc.sh \ No newline at end of file +2. sh ../../scripts/protoc.sh + +# use protoc_global to genertate the go code form the proto.file + +1. cd to the root +2. sh scripts/protoc_global.sh \ No newline at end of file diff --git a/scripts/protoc_global.sh b/scripts/protoc_global.sh new file mode 100644 index 0000000..f586195 --- /dev/null +++ b/scripts/protoc_global.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +cd pb +# Run the protoc command +protoc -I. \ + --go_out=. \ + --go_opt=paths=source_relative \ + --go-grpc_out=. \ + --go-grpc_opt=paths=source_relative \ + --grpc-gateway_out=. \ + --grpc-gateway_opt=paths=source_relative \ + "global.proto" + +# Check if the command was successful +if [ $? -eq 0 ]; then + echo "Proto files for global built successfully" +else + echo "Error building proto files for global" + exit 1 +fi \ No newline at end of file diff --git a/transformer.go b/transformer.go index a203bbd..2a15fa2 100644 --- a/transformer.go +++ b/transformer.go @@ -78,6 +78,13 @@ func (p *TransformJob) Subscribe() (err error) { func (p *TransformJob) Publish(streamPath string) (err error) { p.Publisher, err = p.Plugin.Publish(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath) p.Publisher.Type = PublishTypeTransform + if err == nil { + p.Publisher.OnDispose(func() { + if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout,pkg.ErrStopFromAPI) { + p.Stop(p.Publisher.StopReason()) + } + }) + } return }