feat: add pull testMode

This commit is contained in:
langhuihui
2025-05-27 10:43:34 +08:00
parent 09175f0255
commit 5aa8503aeb
20 changed files with 264 additions and 339 deletions

6
api.go
View File

@@ -808,9 +808,9 @@ func (s *Server) GetRecordList(ctx context.Context, req *pb.ReqRecordList) (resp
return
}
resp = &pb.ResponseList{
TotalCount: uint32(totalCount),
PageNum: req.PageNum,
PageSize: req.PageSize,
Total: uint32(totalCount),
PageNum: req.PageNum,
PageSize: req.PageSize,
}
for _, recordFile := range result {
resp.Data = append(resp.Data, &pb.RecordFile{

View File

@@ -73,7 +73,8 @@ type (
RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔
Proxy string `desc:"代理地址"` // 代理地址
Header HTTPValues
Args HTTPValues `gorm:"-:all"` // 拉流参数
Args HTTPValues `gorm:"-:all"` // 拉流参数
TestMode int `desc:"测试模式,0:关闭,1:只拉流不发布"` // 测试模式
}
Push struct {
URL string `desc:"推送地址"` // 推送地址

View File

@@ -18,6 +18,7 @@ import (
myproc "github.com/cloudwego/goref/pkg/proc"
"github.com/go-delve/delve/pkg/config"
"github.com/go-delve/delve/service/debugger"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"m7s.live/v5"
"m7s.live/v5/plugin/debug/pb"
@@ -445,7 +446,7 @@ func (p *DebugPlugin) GetHeapGraph(ctx context.Context, empty *emptypb.Empty) (*
}, nil
}
func (p *DebugPlugin) StartTcpDump(ctx context.Context, req *pb.TcpDumpRequest) (*pb.TcpDumpResponse, error) {
func (p *DebugPlugin) StartTcpDump(req *pb.TcpDumpRequest, stream grpc.ServerStreamingServer[pb.TcpDumpResponse]) error {
args := []string{}
if req.Interface != "" {
args = append(args, "-i", req.Interface)
@@ -459,29 +460,17 @@ func (p *DebugPlugin) StartTcpDump(ctx context.Context, req *pb.TcpDumpRequest)
if req.ExtraArgs != "" {
args = append(args, strings.Fields(req.ExtraArgs)...)
}
cmd := exec.CommandContext(ctx, "tcpdump", args...)
output, err := cmd.CombinedOutput()
if err != nil {
// 如果 tcpdump 因为超时而退出,这通常不是一个错误,而是预期的行为
if exitErr, ok := err.(*exec.ExitError); ok && strings.Contains(string(exitErr.Stderr), "timeout") {
return &pb.TcpDumpResponse{
Code: 0,
Message: "tcpdump completed with timeout as expected.",
Data: string(output),
}, nil
cmd := exec.CommandContext(p, "tcpdump", args...)
stdout, _ := cmd.StdoutPipe()
var buf [2048]byte
for {
stdout.Read(buf[:])
if len(buf) == 0 {
break
}
return &pb.TcpDumpResponse{
Code: 1,
Message: fmt.Sprintf("failed to run tcpdump: %v. Output: %s", err, string(output)),
Data: string(output),
}, nil
stream.Send(&pb.TcpDumpResponse{
Data: buf[:],
})
}
return &pb.TcpDumpResponse{
Code: 0,
Message: "tcpdump completed successfully.",
Data: string(output),
}, nil
return cmd.Run()
}

View File

@@ -1078,9 +1078,7 @@ func (x *TcpDumpRequest) GetExtraArgs() string {
// TCP Dump响应数据
type TcpDumpResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` // 状态码 (0 表示成功)
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` // 状态消息
Data string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // tcpdump 的文本输出内容
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1115,25 +1113,11 @@ func (*TcpDumpResponse) Descriptor() ([]byte, []int) {
return file_debug_proto_rawDescGZIP(), []int{15}
}
func (x *TcpDumpResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *TcpDumpResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *TcpDumpResponse) GetData() string {
func (x *TcpDumpResponse) GetData() []byte {
if x != nil {
return x.Data
}
return ""
return nil
}
var File_debug_proto protoreflect.FileDescriptor
@@ -1230,17 +1214,15 @@ const file_debug_proto_rawDesc = "" +
"\x06filter\x18\x02 \x01(\tR\x06filter\x12\x1a\n" +
"\bduration\x18\x03 \x01(\rR\bduration\x12\x1d\n" +
"\n" +
"extra_args\x18\x04 \x01(\tR\textraArgs\"S\n" +
"extra_args\x18\x04 \x01(\tR\textraArgs\"%\n" +
"\x0fTcpDumpResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12\x12\n" +
"\x04data\x18\x03 \x01(\tR\x04data2\xb4\x03\n" +
"\x04data\x18\x01 \x01(\fR\x04data2\xb6\x03\n" +
"\x03api\x12O\n" +
"\aGetHeap\x12\x16.google.protobuf.Empty\x1a\x13.debug.HeapResponse\"\x17\x82\xd3\xe4\x93\x02\x11\x12\x0f/debug/api/heap\x12_\n" +
"\fGetHeapGraph\x12\x16.google.protobuf.Empty\x1a\x18.debug.HeapGraphResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x12\x15/debug/api/heap/graph\x12W\n" +
"\vGetCpuGraph\x12\x11.debug.CpuRequest\x1a\x17.debug.CpuGraphResponse\"\x1c\x82\xd3\xe4\x93\x02\x16\x12\x14/debug/api/cpu/graph\x12G\n" +
"\x06GetCpu\x12\x11.debug.CpuRequest\x1a\x12.debug.CpuResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/debug/api/cpu\x12Y\n" +
"\fStartTcpDump\x12\x15.debug.TcpDumpRequest\x1a\x16.debug.TcpDumpResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\"\x12/debug/api/tcpdumpB\x1dZ\x1bm7s.live/v5/plugin/debug/pbb\x06proto3"
"\x06GetCpu\x12\x11.debug.CpuRequest\x1a\x12.debug.CpuResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/debug/api/cpu\x12[\n" +
"\fStartTcpDump\x12\x15.debug.TcpDumpRequest\x1a\x16.debug.TcpDumpResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\x12\x12/debug/api/tcpdump0\x01B\x1dZ\x1bm7s.live/v5/plugin/debug/pbb\x06proto3"
var (
file_debug_proto_rawDescOnce sync.Once

View File

@@ -144,7 +144,7 @@ var (
filter_Api_StartTcpDump_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (Api_StartTcpDumpClient, runtime.ServerMetadata, error) {
var protoReq TcpDumpRequest
var metadata runtime.ServerMetadata
@@ -155,24 +155,16 @@ func request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.StartTcpDump(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_StartTcpDump_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq TcpDumpRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
stream, err := client.StartTcpDump(ctx, &protoReq)
if err != nil {
return nil, metadata, err
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_StartTcpDump_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
header, err := stream.Header()
if err != nil {
return nil, metadata, err
}
msg, err := server.StartTcpDump(ctx, &protoReq)
return msg, metadata, err
metadata.HeaderMD = header
return stream, metadata, nil
}
@@ -282,29 +274,11 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
})
mux.Handle("POST", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/debug.Api/StartTcpDump", runtime.WithHTTPPathPattern("/debug/api/tcpdump"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_StartTcpDump_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
mux.Handle("GET", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
})
return nil
@@ -436,7 +410,7 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
})
mux.Handle("POST", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle("GET", pattern_Api_StartTcpDump_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
@@ -454,7 +428,7 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
return
}
forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
forward_Api_StartTcpDump_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
})
@@ -482,5 +456,5 @@ var (
forward_Api_GetCpu_0 = runtime.ForwardResponseMessage
forward_Api_StartTcpDump_0 = runtime.ForwardResponseMessage
forward_Api_StartTcpDump_0 = runtime.ForwardResponseStream
)

View File

@@ -27,9 +27,9 @@ service api {
};
}
rpc StartTcpDump (TcpDumpRequest) returns (TcpDumpResponse) {
rpc StartTcpDump (TcpDumpRequest) returns (stream TcpDumpResponse) {
option (google.api.http) = {
post: "/debug/api/tcpdump"
get: "/debug/api/tcpdump"
};
}
}
@@ -150,7 +150,5 @@ message TcpDumpRequest {
// TCP Dump响应数据
message TcpDumpResponse {
uint32 code = 1; // 状态码 (0 表示成功)
string message = 2; // 状态消息
string data = 3; // tcpdump 的文本输出内容
bytes data = 1;
}

View File

@@ -35,7 +35,7 @@ type ApiClient interface {
GetHeapGraph(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*HeapGraphResponse, error)
GetCpuGraph(ctx context.Context, in *CpuRequest, opts ...grpc.CallOption) (*CpuGraphResponse, error)
GetCpu(ctx context.Context, in *CpuRequest, opts ...grpc.CallOption) (*CpuResponse, error)
StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (*TcpDumpResponse, error)
StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TcpDumpResponse], error)
}
type apiClient struct {
@@ -86,16 +86,25 @@ func (c *apiClient) GetCpu(ctx context.Context, in *CpuRequest, opts ...grpc.Cal
return out, nil
}
func (c *apiClient) StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (*TcpDumpResponse, error) {
func (c *apiClient) StartTcpDump(ctx context.Context, in *TcpDumpRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TcpDumpResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TcpDumpResponse)
err := c.cc.Invoke(ctx, Api_StartTcpDump_FullMethodName, in, out, cOpts...)
stream, err := c.cc.NewStream(ctx, &Api_ServiceDesc.Streams[0], Api_StartTcpDump_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
return out, nil
x := &grpc.GenericClientStream[TcpDumpRequest, TcpDumpResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Api_StartTcpDumpClient = grpc.ServerStreamingClient[TcpDumpResponse]
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility.
@@ -104,7 +113,7 @@ type ApiServer interface {
GetHeapGraph(context.Context, *emptypb.Empty) (*HeapGraphResponse, error)
GetCpuGraph(context.Context, *CpuRequest) (*CpuGraphResponse, error)
GetCpu(context.Context, *CpuRequest) (*CpuResponse, error)
StartTcpDump(context.Context, *TcpDumpRequest) (*TcpDumpResponse, error)
StartTcpDump(*TcpDumpRequest, grpc.ServerStreamingServer[TcpDumpResponse]) error
mustEmbedUnimplementedApiServer()
}
@@ -127,8 +136,8 @@ func (UnimplementedApiServer) GetCpuGraph(context.Context, *CpuRequest) (*CpuGra
func (UnimplementedApiServer) GetCpu(context.Context, *CpuRequest) (*CpuResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetCpu not implemented")
}
func (UnimplementedApiServer) StartTcpDump(context.Context, *TcpDumpRequest) (*TcpDumpResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartTcpDump not implemented")
func (UnimplementedApiServer) StartTcpDump(*TcpDumpRequest, grpc.ServerStreamingServer[TcpDumpResponse]) error {
return status.Errorf(codes.Unimplemented, "method StartTcpDump not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
@@ -223,24 +232,17 @@ func _Api_GetCpu_Handler(srv interface{}, ctx context.Context, dec func(interfac
return interceptor(ctx, in, info, handler)
}
func _Api_StartTcpDump_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TcpDumpRequest)
if err := dec(in); err != nil {
return nil, err
func _Api_StartTcpDump_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TcpDumpRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
if interceptor == nil {
return srv.(ApiServer).StartTcpDump(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_StartTcpDump_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StartTcpDump(ctx, req.(*TcpDumpRequest))
}
return interceptor(ctx, in, info, handler)
return srv.(ApiServer).StartTcpDump(m, &grpc.GenericServerStream[TcpDumpRequest, TcpDumpResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Api_StartTcpDumpServer = grpc.ServerStreamingServer[TcpDumpResponse]
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -264,11 +266,13 @@ var Api_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetCpu",
Handler: _Api_GetCpu_Handler,
},
},
Streams: []grpc.StreamDesc{
{
MethodName: "StartTcpDump",
Handler: _Api_StartTcpDump_Handler,
StreamName: "StartTcpDump",
Handler: _Api_StartTcpDump_Handler,
ServerStreams: true,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "debug.proto",
}

View File

@@ -2,6 +2,7 @@ package flv
import (
"errors"
"io"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
@@ -15,6 +16,10 @@ type Puller struct {
func (p *Puller) Run() (err error) {
reader := util.NewBufReader(p.ReadCloser)
publisher := p.PullJob.Publisher
if publisher == nil {
io.Copy(io.Discard, p.ReadCloser)
return
}
var hasAudio, hasVideo bool
var absTS uint32
var head util.Memory

View File

@@ -9,6 +9,7 @@ import (
"time"
m7s "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
@@ -47,6 +48,9 @@ func (p *RecordReader) Dispose() {
func (p *RecordReader) Run() (err error) {
pullJob := &p.PullJob
publisher := pullJob.Publisher
if publisher == nil {
return pkg.ErrDisabled
}
allocator := util.NewScalableMemoryAllocator(1 << 10)
var tagHeader [11]byte
var ts int64
@@ -57,9 +61,12 @@ func (p *RecordReader) Run() (err error) {
defer func() {
allocator.Recycle()
}()
publisher.OnGetPosition = func() time.Time {
return realTime
if publisher != nil {
publisher.OnGetPosition = func() time.Time {
return realTime
}
}
for loop := 0; loop < p.Loop; loop++ {
nextStream:
for i, stream := range p.Streams {
@@ -85,15 +92,15 @@ func (p *RecordReader) Run() (err error) {
err = head.NewReader().ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag)
hasAudio := (flag & 0x04) != 0
hasVideo := (flag & 0x01) != 0
if err != nil {
return
}
if !hasAudio {
publisher.NoAudio()
}
if !hasVideo {
publisher.NoVideo()
}
if err != nil {
return
}
if flvHead != [3]byte{'F', 'L', 'V'} {
return errors.New("not flv file")
}
@@ -194,7 +201,7 @@ func (p *RecordReader) Run() (err error) {
}
}
} else {
publisher.Info("script", name, obj)
p.Info("script", name, obj)
}
default:
err = fmt.Errorf("unknown tag type: %d", t)

View File

@@ -20,6 +20,10 @@ type HTTPReader struct {
func (p *HTTPReader) Run() (err error) {
pullJob := &p.PullJob
publisher := pullJob.Publisher
if publisher == nil {
io.Copy(io.Discard, p.ReadCloser)
return
}
allocator := util.NewScalableMemoryAllocator(1 << 10)
var demuxer *Demuxer
defer allocator.Recycle()
@@ -36,12 +40,12 @@ func (p *HTTPReader) Run() (err error) {
}
publisher.OnSeek = func(seekTime time.Time) {
p.Stop(errors.New("seek"))
pullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
pullJob.Connection.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
newHTTPReader := &HTTPReader{}
pullJob.AddTask(newHTTPReader)
}
if pullJob.Args.Get(util.StartKey) != "" {
seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Args.Get(util.StartKey))
if pullJob.Connection.Args.Get(util.StartKey) != "" {
seekTime, _ := time.Parse(util.LocalTimeFormat, pullJob.Connection.Args.Get(util.StartKey))
demuxer.SeekTime(uint64(seekTime.UnixMilli()))
}
for _, track := range demuxer.Tracks {

View File

@@ -6,6 +6,7 @@ import (
"time"
m7s "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
@@ -39,6 +40,9 @@ func NewPuller(conf config.Pull) m7s.IPuller {
func (p *RecordReader) Run() (err error) {
pullJob := &p.PullJob
publisher := pullJob.Publisher
if publisher == nil {
return pkg.ErrDisabled
}
// allocator := util.NewScalableMemoryAllocator(1 << 10)
var ts, tsOffset int64
var realTime time.Time

View File

@@ -15,7 +15,7 @@ import (
func (c *Client) Start() (err error) {
var addr string
if c.direction == DIRECTION_PULL {
if c.direction == DIRECTION_PULL && c.pullCtx.TestMode == 0 {
addr = c.pullCtx.Connection.RemoteURL
err = c.pullCtx.Publish()
if err != nil {
@@ -158,7 +158,9 @@ func (c *Client) Run() (err error) {
if len(args) > 0 {
m.StreamName += "?" + args.Encode()
}
c.Receivers[response.StreamId] = c.pullCtx.Publisher
if c.pullCtx.Publisher != nil {
c.Receivers[response.StreamId] = c.pullCtx.Publisher
}
err = c.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// if response, ok := msg.MsgData.(*ResponsePlayMessage); ok {
// if response.Object["code"] == "NetStream.Play.Start" {

View File

@@ -59,9 +59,11 @@ func (c *Client) Run() (err error) {
return
}
if c.direction == DIRECTION_PULL {
err = c.pullCtx.Publish()
if err != nil {
return
if c.pullCtx.TestMode == 0 {
err = c.pullCtx.Publish()
if err != nil {
return
}
}
var medias []*Media
if medias, err = c.Describe(); err != nil {

View File

@@ -447,7 +447,7 @@ func (r *Receiver) Receive() (err error) {
},
}
return r.NetConnection.Receive(false, func(channelID byte, buf []byte) error {
if r.Publisher.Paused != nil {
if r.Publisher != nil && r.Publisher.Paused != nil {
r.Stream.Pause()
r.Publisher.Paused.Await()
r.Stream.Play()
@@ -471,6 +471,9 @@ func (r *Receiver) Receive() (err error) {
return err
}
}
if r.Publisher == nil {
return pkg.ErrMuted
}
switch int(channelID) {
case r.AudioChannelID:
if !r.PubAudio {
@@ -561,8 +564,6 @@ func (r *Receiver) Receive() (err error) {
videoFrame.SetAllocator(r.MemoryAllocator)
return pkg.ErrDiscard
}
default:
}
return pkg.ErrUnsupportCodec
}, func(channelID byte, buf []byte) error {

View File

@@ -19,12 +19,13 @@ import (
"m7s.live/v5/plugin/stress/pb"
)
func (r *StressPlugin) pull(count int, url string, puller m7s.PullerFactory) (err error) {
func (r *StressPlugin) pull(count int, url string, testMode int32, puller m7s.PullerFactory) (err error) {
hasPlaceholder := strings.Contains(url, "%d")
if i := r.pullers.Length; count > i {
for j := i; j < count; j++ {
conf := config.Pull{}
defaults.SetDefaults(&conf)
conf.TestMode = int(testMode)
if hasPlaceholder {
conf.URL = fmt.Sprintf(url, j)
} else {
@@ -100,7 +101,7 @@ func (r *StressPlugin) StartPull(ctx context.Context, req *pb.PullRequest) (res
default:
return nil, fmt.Errorf("unsupport protocol %s", req.Protocol)
}
return &gpb.SuccessResponse{}, r.pull(int(req.PullCount), req.RemoteURL, puller)
return &gpb.SuccessResponse{}, r.pull(int(req.PullCount), req.RemoteURL, req.TestMode, puller)
}
func (r *StressPlugin) StopPush(ctx context.Context, req *emptypb.Empty) (res *gpb.SuccessResponse, err error) {

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: stress.proto
package pb
@@ -14,6 +14,7 @@ import (
pb "m7s.live/v5/pb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -24,21 +25,18 @@ const (
)
type CountResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
PushCount uint32 `protobuf:"varint,1,opt,name=pushCount,proto3" json:"pushCount,omitempty"`
PullCount uint32 `protobuf:"varint,2,opt,name=pullCount,proto3" json:"pullCount,omitempty"`
unknownFields protoimpl.UnknownFields
PushCount uint32 `protobuf:"varint,1,opt,name=pushCount,proto3" json:"pushCount,omitempty"`
PullCount uint32 `protobuf:"varint,2,opt,name=pullCount,proto3" json:"pullCount,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *CountResponse) Reset() {
*x = CountResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_stress_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_stress_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CountResponse) String() string {
@@ -49,7 +47,7 @@ func (*CountResponse) ProtoMessage() {}
func (x *CountResponse) ProtoReflect() protoreflect.Message {
mi := &file_stress_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -79,23 +77,20 @@ func (x *CountResponse) GetPullCount() uint32 {
}
type PushRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"`
RemoteURL string `protobuf:"bytes,3,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"`
PushCount int32 `protobuf:"varint,4,opt,name=pushCount,proto3" json:"pushCount,omitempty"`
unknownFields protoimpl.UnknownFields
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"`
RemoteURL string `protobuf:"bytes,3,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"`
PushCount int32 `protobuf:"varint,4,opt,name=pushCount,proto3" json:"pushCount,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *PushRequest) Reset() {
*x = PushRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_stress_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_stress_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PushRequest) String() string {
@@ -106,7 +101,7 @@ func (*PushRequest) ProtoMessage() {}
func (x *PushRequest) ProtoReflect() protoreflect.Message {
mi := &file_stress_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -150,22 +145,20 @@ func (x *PushRequest) GetPushCount() int32 {
}
type PullRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
RemoteURL string `protobuf:"bytes,1,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"`
Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"`
PullCount int32 `protobuf:"varint,3,opt,name=pullCount,proto3" json:"pullCount,omitempty"`
TestMode int32 `protobuf:"varint,4,opt,name=testMode,proto3" json:"testMode,omitempty"` // 0: pull, 1: pull without publish
unknownFields protoimpl.UnknownFields
RemoteURL string `protobuf:"bytes,1,opt,name=remoteURL,proto3" json:"remoteURL,omitempty"`
Protocol string `protobuf:"bytes,2,opt,name=protocol,proto3" json:"protocol,omitempty"`
PullCount int32 `protobuf:"varint,3,opt,name=pullCount,proto3" json:"pullCount,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *PullRequest) Reset() {
*x = PullRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_stress_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_stress_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PullRequest) String() string {
@@ -176,7 +169,7 @@ func (*PullRequest) ProtoMessage() {}
func (x *PullRequest) ProtoReflect() protoreflect.Message {
mi := &file_stress_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -212,85 +205,54 @@ func (x *PullRequest) GetPullCount() int32 {
return 0
}
func (x *PullRequest) GetTestMode() int32 {
if x != nil {
return x.TestMode
}
return 0
}
var File_stress_proto protoreflect.FileDescriptor
var file_stress_proto_rawDesc = []byte{
0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06,
0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x1a, 0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22,
0x4b, 0x0a, 0x0d, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1c,
0x0a, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0d, 0x52, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x85, 0x01, 0x0a,
0x0b, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f,
0x74, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d,
0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f,
0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x70, 0x75, 0x73, 0x68, 0x43,
0x6f, 0x75, 0x6e, 0x74, 0x22, 0x65, 0x0a, 0x0b, 0x50, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52, 0x4c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x55, 0x52,
0x4c, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1c, 0x0a,
0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05,
0x52, 0x09, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, 0xf1, 0x03, 0x0a, 0x03,
0x61, 0x70, 0x69, 0x12, 0x6d, 0x0a, 0x09, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x73, 0x68,
0x12, 0x13, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53,
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x32,
0x82, 0xd3, 0xe4, 0x93, 0x02, 0x2c, 0x22, 0x27, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f,
0x61, 0x70, 0x69, 0x2f, 0x70, 0x75, 0x73, 0x68, 0x2f, 0x7b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
0x6f, 0x6c, 0x7d, 0x2f, 0x7b, 0x70, 0x75, 0x73, 0x68, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x7d, 0x3a,
0x01, 0x2a, 0x12, 0x6d, 0x0a, 0x09, 0x53, 0x74, 0x61, 0x72, 0x74, 0x50, 0x75, 0x6c, 0x6c, 0x12,
0x13, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75,
0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x32, 0x82,
0xd3, 0xe4, 0x93, 0x02, 0x2c, 0x22, 0x27, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x70, 0x75, 0x6c, 0x6c, 0x2f, 0x7b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f,
0x6c, 0x7d, 0x2f, 0x7b, 0x70, 0x75, 0x6c, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x7d, 0x3a, 0x01,
0x2a, 0x12, 0x54, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x16, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x43,
0x6f, 0x75, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3,
0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70,
0x69, 0x2f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x5a, 0x0a, 0x08, 0x53, 0x74, 0x6f, 0x70, 0x50,
0x75, 0x73, 0x68, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x67, 0x6c,
0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x22, 0x15, 0x2f, 0x73,
0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x70,
0x75, 0x73, 0x68, 0x12, 0x5a, 0x0a, 0x08, 0x53, 0x74, 0x6f, 0x70, 0x50, 0x75, 0x6c, 0x6c, 0x12,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c,
0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x22, 0x15, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73,
0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x70, 0x75, 0x6c, 0x6c, 0x42,
0x1e, 0x5a, 0x1c, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x76, 0x35, 0x2f, 0x70,
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, 0x62, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_stress_proto_rawDesc = "" +
"\n" +
"\fstress.proto\x12\x06stress\x1a\x1cgoogle/api/annotations.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\fglobal.proto\"K\n" +
"\rCountResponse\x12\x1c\n" +
"\tpushCount\x18\x01 \x01(\rR\tpushCount\x12\x1c\n" +
"\tpullCount\x18\x02 \x01(\rR\tpullCount\"\x85\x01\n" +
"\vPushRequest\x12\x1e\n" +
"\n" +
"streamPath\x18\x01 \x01(\tR\n" +
"streamPath\x12\x1a\n" +
"\bprotocol\x18\x02 \x01(\tR\bprotocol\x12\x1c\n" +
"\tremoteURL\x18\x03 \x01(\tR\tremoteURL\x12\x1c\n" +
"\tpushCount\x18\x04 \x01(\x05R\tpushCount\"\x81\x01\n" +
"\vPullRequest\x12\x1c\n" +
"\tremoteURL\x18\x01 \x01(\tR\tremoteURL\x12\x1a\n" +
"\bprotocol\x18\x02 \x01(\tR\bprotocol\x12\x1c\n" +
"\tpullCount\x18\x03 \x01(\x05R\tpullCount\x12\x1a\n" +
"\btestMode\x18\x04 \x01(\x05R\btestMode2\xf1\x03\n" +
"\x03api\x12m\n" +
"\tStartPush\x12\x13.stress.PushRequest\x1a\x17.global.SuccessResponse\"2\x82\xd3\xe4\x93\x02,:\x01*\"'/stress/api/push/{protocol}/{pushCount}\x12m\n" +
"\tStartPull\x12\x13.stress.PullRequest\x1a\x17.global.SuccessResponse\"2\x82\xd3\xe4\x93\x02,:\x01*\"'/stress/api/pull/{protocol}/{pullCount}\x12T\n" +
"\bGetCount\x12\x16.google.protobuf.Empty\x1a\x15.stress.CountResponse\"\x19\x82\xd3\xe4\x93\x02\x13\x12\x11/stress/api/count\x12Z\n" +
"\bStopPush\x12\x16.google.protobuf.Empty\x1a\x17.global.SuccessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\"\x15/stress/api/stop/push\x12Z\n" +
"\bStopPull\x12\x16.google.protobuf.Empty\x1a\x17.global.SuccessResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\"\x15/stress/api/stop/pullB\x1eZ\x1cm7s.live/v5/plugin/stress/pbb\x06proto3"
var (
file_stress_proto_rawDescOnce sync.Once
file_stress_proto_rawDescData = file_stress_proto_rawDesc
file_stress_proto_rawDescData []byte
)
func file_stress_proto_rawDescGZIP() []byte {
file_stress_proto_rawDescOnce.Do(func() {
file_stress_proto_rawDescData = protoimpl.X.CompressGZIP(file_stress_proto_rawDescData)
file_stress_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_stress_proto_rawDesc), len(file_stress_proto_rawDesc)))
})
return file_stress_proto_rawDescData
}
var file_stress_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_stress_proto_goTypes = []interface{}{
var file_stress_proto_goTypes = []any{
(*CountResponse)(nil), // 0: stress.CountResponse
(*PushRequest)(nil), // 1: stress.PushRequest
(*PullRequest)(nil), // 2: stress.PullRequest
@@ -320,49 +282,11 @@ func file_stress_proto_init() {
if File_stress_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_stress_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CountResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stress_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PushRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_stress_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PullRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_stress_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_stress_proto_rawDesc), len(file_stress_proto_rawDesc)),
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
@@ -373,7 +297,6 @@ func file_stress_proto_init() {
MessageInfos: file_stress_proto_msgTypes,
}.Build()
File_stress_proto = out.File
file_stress_proto_rawDesc = nil
file_stress_proto_goTypes = nil
file_stress_proto_depIdxs = nil
}

View File

@@ -51,4 +51,5 @@ message PullRequest {
string remoteURL = 1;
string protocol = 2;
int32 pullCount = 3;
int32 testMode = 4; // 0: pull, 1: pull without publish
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: stress.proto
package pb
@@ -17,8 +17,16 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
Api_StartPush_FullMethodName = "/stress.api/StartPush"
Api_StartPull_FullMethodName = "/stress.api/StartPull"
Api_GetCount_FullMethodName = "/stress.api/GetCount"
Api_StopPush_FullMethodName = "/stress.api/StopPush"
Api_StopPull_FullMethodName = "/stress.api/StopPull"
)
// ApiClient is the client API for Api service.
//
@@ -40,8 +48,9 @@ func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
}
func (c *apiClient) StartPush(ctx context.Context, in *PushRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/stress.api/StartPush", in, out, opts...)
err := c.cc.Invoke(ctx, Api_StartPush_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -49,8 +58,9 @@ func (c *apiClient) StartPush(ctx context.Context, in *PushRequest, opts ...grpc
}
func (c *apiClient) StartPull(ctx context.Context, in *PullRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/stress.api/StartPull", in, out, opts...)
err := c.cc.Invoke(ctx, Api_StartPull_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -58,8 +68,9 @@ func (c *apiClient) StartPull(ctx context.Context, in *PullRequest, opts ...grpc
}
func (c *apiClient) GetCount(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*CountResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CountResponse)
err := c.cc.Invoke(ctx, "/stress.api/GetCount", in, out, opts...)
err := c.cc.Invoke(ctx, Api_GetCount_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -67,8 +78,9 @@ func (c *apiClient) GetCount(ctx context.Context, in *emptypb.Empty, opts ...grp
}
func (c *apiClient) StopPush(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/stress.api/StopPush", in, out, opts...)
err := c.cc.Invoke(ctx, Api_StopPush_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -76,8 +88,9 @@ func (c *apiClient) StopPush(ctx context.Context, in *emptypb.Empty, opts ...grp
}
func (c *apiClient) StopPull(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/stress.api/StopPull", in, out, opts...)
err := c.cc.Invoke(ctx, Api_StopPull_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -86,7 +99,7 @@ func (c *apiClient) StopPull(ctx context.Context, in *emptypb.Empty, opts ...grp
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility
// for forward compatibility.
type ApiServer interface {
StartPush(context.Context, *PushRequest) (*pb.SuccessResponse, error)
StartPull(context.Context, *PullRequest) (*pb.SuccessResponse, error)
@@ -96,9 +109,12 @@ type ApiServer interface {
mustEmbedUnimplementedApiServer()
}
// UnimplementedApiServer must be embedded to have forward compatible implementations.
type UnimplementedApiServer struct {
}
// UnimplementedApiServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedApiServer struct{}
func (UnimplementedApiServer) StartPush(context.Context, *PushRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartPush not implemented")
@@ -116,6 +132,7 @@ func (UnimplementedApiServer) StopPull(context.Context, *emptypb.Empty) (*pb.Suc
return nil, status.Errorf(codes.Unimplemented, "method StopPull not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
@@ -125,6 +142,13 @@ type UnsafeApiServer interface {
}
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
// If the following call pancis, it indicates UnimplementedApiServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Api_ServiceDesc, srv)
}
@@ -138,7 +162,7 @@ func _Api_StartPush_Handler(srv interface{}, ctx context.Context, dec func(inter
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/stress.api/StartPush",
FullMethod: Api_StartPush_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StartPush(ctx, req.(*PushRequest))
@@ -156,7 +180,7 @@ func _Api_StartPull_Handler(srv interface{}, ctx context.Context, dec func(inter
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/stress.api/StartPull",
FullMethod: Api_StartPull_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StartPull(ctx, req.(*PullRequest))
@@ -174,7 +198,7 @@ func _Api_GetCount_Handler(srv interface{}, ctx context.Context, dec func(interf
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/stress.api/GetCount",
FullMethod: Api_GetCount_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).GetCount(ctx, req.(*emptypb.Empty))
@@ -192,7 +216,7 @@ func _Api_StopPush_Handler(srv interface{}, ctx context.Context, dec func(interf
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/stress.api/StopPush",
FullMethod: Api_StopPush_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StopPush(ctx, req.(*emptypb.Empty))
@@ -210,7 +234,7 @@ func _Api_StopPull_Handler(srv interface{}, ctx context.Context, dec func(interf
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/stress.api/StopPull",
FullMethod: Api_StopPull_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StopPull(ctx, req.(*emptypb.Empty))

View File

@@ -25,7 +25,6 @@ type (
Args url.Values
RemoteURL string // 远程服务器地址(用于推拉)
HTTPClient *http.Client
Header http.Header
}
IPuller interface {
@@ -37,10 +36,10 @@ type (
PullJob struct {
Connection
*config.Pull
Publisher *Publisher
PublishConfig config.Publish
puller IPuller
conf *config.Pull
}
HTTPFilePuller struct {
@@ -66,11 +65,10 @@ type (
}
)
func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, proxyConf string, header http.Header) {
func (conn *Connection) Init(plugin *Plugin, streamPath string, href string, proxyConf string) {
conn.RemoteURL = href
conn.StreamPath = streamPath
conn.Plugin = plugin
conn.Header = header
conn.HTTPClient = http.DefaultClient
if proxyConf != "" {
proxy, err := url.Parse(proxyConf)
@@ -93,8 +91,8 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c
p.PublishConfig = *pubConf
}
p.PublishConfig.PubType = PublishTypePull
p.Args = url.Values(conf.Args.DeepClone())
p.conf = &conf
p.Connection.Args = url.Values(conf.Args.DeepClone())
p.Pull = &conf
remoteURL := conf.URL
u, err := url.Parse(remoteURL)
if err == nil {
@@ -102,17 +100,17 @@ func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf c
// file
remoteURL = u.Path
}
if p.Args == nil {
p.Args = u.Query()
if p.Connection.Args == nil {
p.Connection.Args = u.Query()
} else {
for k, v := range u.Query() {
for _, vv := range v {
p.Args.Add(k, vv)
p.Connection.Args.Add(k, vv)
}
}
}
}
p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy, http.Header(conf.Header))
p.Connection.Init(plugin, streamPath, remoteURL, conf.Proxy)
p.puller = puller
p.SetDescriptions(task.Description{
"plugin": plugin.Meta.Name,
@@ -159,13 +157,13 @@ func (p *PullJob) GetKey() string {
func (p *PullJob) Publish() (err error) {
streamPath := p.StreamPath
if len(p.Args) > 0 {
streamPath += "?" + p.Args.Encode()
if len(p.Connection.Args) > 0 {
streamPath += "?" + p.Connection.Args.Encode()
}
p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, p.PublishConfig)
if err == nil {
p.Publisher.OnDispose(func() {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.conf.MaxRetry == 0 {
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.MaxRetry == 0 {
p.Stop(p.Publisher.StopReason())
} else {
p.puller.Stop(p.Publisher.StopReason())
@@ -185,8 +183,10 @@ func (p *PullJob) Start() (err error) {
}
func (p *HTTPFilePuller) Start() (err error) {
if err = p.PullJob.Publish(); err != nil {
return
if p.PullJob.TestMode == 0 {
if err = p.PullJob.Publish(); err != nil {
return
}
}
if p.ReadCloser != nil {
return
@@ -260,30 +260,34 @@ func (p *RecordFilePuller) Start() (err error) {
return pkg.ErrNoDB
}
p.PullJob.PublishConfig.PubType = PublishTypeVod
if err = p.PullJob.Publish(); err != nil {
return
if p.PullJob.TestMode == 0 {
if err = p.PullJob.Publish(); err != nil {
return
}
}
if p.PullStartTime, p.PullEndTime, err = util.TimeRangeQueryParse(p.PullJob.Args); err != nil {
if p.PullStartTime, p.PullEndTime, err = util.TimeRangeQueryParse(p.PullJob.Connection.Args); err != nil {
return
}
p.seekChan = make(chan time.Time, 1)
loop := p.PullJob.Args.Get(util.LoopKey)
loop := p.PullJob.Connection.Args.Get(util.LoopKey)
p.Loop, err = strconv.Atoi(loop)
if err != nil || p.Loop < 0 {
p.Loop = math.MaxInt32
}
publisher := p.PullJob.Publisher
publisher.OnSeek = func(seekTime time.Time) {
// p.PullStartTime = seekTime
// p.SetRetry(1, 0)
// if util.UnixTimeReg.MatchString(p.PullJob.Args.Get(util.EndKey)) {
// p.PullJob.Args.Set(util.StartKey, strconv.FormatInt(seekTime.Unix(), 10))
// } else {
// p.PullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
// }
select {
case p.seekChan <- seekTime:
default:
if publisher != nil {
publisher.OnSeek = func(seekTime time.Time) {
// p.PullStartTime = seekTime
// p.SetRetry(1, 0)
// if util.UnixTimeReg.MatchString(p.PullJob.Args.Get(util.EndKey)) {
// p.PullJob.Args.Set(util.StartKey, strconv.FormatInt(seekTime.Unix(), 10))
// } else {
// p.PullJob.Args.Set(util.StartKey, seekTime.Local().Format(util.LocalTimeFormat))
// }
select {
case p.seekChan <- seekTime:
default:
}
}
}
return p.queryRecordStreams(p.PullStartTime, p.PullEndTime)

View File

@@ -1,7 +1,6 @@
package m7s
import (
"net/http"
"time"
"m7s.live/v5/pkg"
@@ -29,7 +28,7 @@ func (p *PushJob) GetKey() string {
}
func (p *PushJob) Init(pusher IPusher, plugin *Plugin, streamPath string, conf config.Push, subConf *config.Subscribe) *PushJob {
p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy, http.Header(conf.Header))
p.Connection.Init(plugin, streamPath, conf.URL, conf.Proxy)
p.pusher = pusher
if subConf == nil {
p.SubConf = plugin.config.Subscribe