From d76d69b3274b65a762b72cfa5d7325bd38c80d05 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Fri, 19 Apr 2024 16:42:14 +0800 Subject: [PATCH] feat: use my buf reader --- api.go | 7 + example/default/config.yaml | 8 +- example/readflv/{config.yaml => config1.yaml} | 4 +- example/readflv/config2.yaml | 16 + example/readflv/main.go | 7 +- example/rtmp-push/config1.yaml | 4 +- pb/global.pb.go | 413 +++++++++++++----- pb/global.pb.gw.go | 69 +++ pb/global.proto | 14 + pb/global_grpc.pb.go | 36 ++ pkg/util/buf-reader.go | 51 ++- pkg/util/buffers.go | 18 +- plugin/hdl/pkg/pull.go | 72 +-- plugin/rtmp/index.go | 8 +- plugin/rtmp/pkg/chunk.go | 7 +- plugin/rtmp/pkg/client.go | 8 +- plugin/rtmp/pkg/const.go | 11 +- plugin/rtmp/pkg/handshake.go | 37 +- plugin/rtmp/pkg/net-connection.go | 69 ++- publisher.go | 11 +- server.go | 17 +- 21 files changed, 630 insertions(+), 257 deletions(-) rename example/readflv/{config.yaml => config1.yaml} (79%) create mode 100644 example/readflv/config2.yaml diff --git a/api.go b/api.go index 2342a21..056ae35 100644 --- a/api.go +++ b/api.go @@ -39,3 +39,10 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest Success: err == nil, }, err } + + +func (s *Server) StreamList(ctx context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) { + var result any + result, err = s.Call(req) + return result.(*pb.StreamListResponse), err +} \ No newline at end of file diff --git a/example/default/config.yaml b/example/default/config.yaml index 83623f1..b406063 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,5 +1,5 @@ global: - loglevel: debug + loglevel: trace tcp: listenaddr: :50051 rtmp: @@ -12,4 +12,8 @@ rtmp: # subaudio: false pull: pullonsub: - live/pull: rtmp://localhost/live/test \ No newline at end of file + live/pull: rtmp://localhost/live/test +# hdl: +# pull: +# pullonstart: +# live/test: /Users/dexter/Movies/jb-demo.flv \ No newline at end of file diff --git a/example/readflv/config.yaml b/example/readflv/config1.yaml similarity index 79% rename from example/readflv/config.yaml rename to example/readflv/config1.yaml index dde3993..e761ffc 100644 --- a/example/readflv/config.yaml +++ b/example/readflv/config1.yaml @@ -1,10 +1,10 @@ global: - loglevel: debug + loglevel: trace tcp: listenaddr: :50051 hdl: publish: - speed: 2 + # speed: 2 pull: pullonstart: live/test: /Users/dexter/Movies/jb-demo.flv \ No newline at end of file diff --git a/example/readflv/config2.yaml b/example/readflv/config2.yaml new file mode 100644 index 0000000..9dd6577 --- /dev/null +++ b/example/readflv/config2.yaml @@ -0,0 +1,16 @@ +global: + loglevel: trace + tcp: + listenaddr: :50052 + http: + listenaddr: :8081 + listenaddrtls: :8555 +rtmp: + tcp: + listenaddr: :1936 +hdl: + publish: + # speed: 2 + pull: + pullonstart: + live/test: http://localhost:8080/hdl/live/test.flv \ No newline at end of file diff --git a/example/readflv/main.go b/example/readflv/main.go index ed1053c..1d8ed3d 100644 --- a/example/readflv/main.go +++ b/example/readflv/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "time" "m7s.live/m7s/v5" _ "m7s.live/m7s/v5/plugin/debug" @@ -10,6 +11,8 @@ import ( ) func main() { - // ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100)) - m7s.Run(context.Background(), "config.yaml") + ctx := context.Background() + go m7s.Run(ctx, "config1.yaml") + time.Sleep(2 * time.Second) + m7s.NewServer().Run(ctx, "config2.yaml") } diff --git a/example/rtmp-push/config1.yaml b/example/rtmp-push/config1.yaml index 918d091..947a3b4 100644 --- a/example/rtmp-push/config1.yaml +++ b/example/rtmp-push/config1.yaml @@ -1,4 +1,6 @@ global: - # loglevel: debug + loglevel: debug + tcp: + listenaddr: :50050 rtmp: chunksize: 2048 diff --git a/pb/global.pb.go b/pb/global.pb.go index 2cffe4e..0d9db9b 100644 --- a/pb/global.pb.go +++ b/pb/global.pb.go @@ -22,6 +22,138 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type StreamInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` +} + +func (x *StreamInfo) Reset() { + *x = StreamInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_global_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamInfo) ProtoMessage() {} + +func (x *StreamInfo) ProtoReflect() protoreflect.Message { + mi := &file_global_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamInfo.ProtoReflect.Descriptor instead. +func (*StreamInfo) Descriptor() ([]byte, []int) { + return file_global_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamInfo) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type StreamListRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *StreamListRequest) Reset() { + *x = StreamListRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_global_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamListRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamListRequest) ProtoMessage() {} + +func (x *StreamListRequest) ProtoReflect() protoreflect.Message { + mi := &file_global_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamListRequest.ProtoReflect.Descriptor instead. +func (*StreamListRequest) Descriptor() ([]byte, []int) { + return file_global_proto_rawDescGZIP(), []int{1} +} + +type StreamListResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + List []*StreamInfo `protobuf:"bytes,1,rep,name=list,proto3" json:"list,omitempty"` +} + +func (x *StreamListResponse) Reset() { + *x = StreamListResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_global_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamListResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamListResponse) ProtoMessage() {} + +func (x *StreamListResponse) ProtoReflect() protoreflect.Message { + mi := &file_global_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamListResponse.ProtoReflect.Descriptor instead. +func (*StreamListResponse) Descriptor() ([]byte, []int) { + return file_global_proto_rawDescGZIP(), []int{2} +} + +func (x *StreamListResponse) GetList() []*StreamInfo { + if x != nil { + return x.List + } + return nil +} + type StreamSnapRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -33,7 +165,7 @@ type StreamSnapRequest struct { func (x *StreamSnapRequest) Reset() { *x = StreamSnapRequest{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[0] + mi := &file_global_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -46,7 +178,7 @@ func (x *StreamSnapRequest) String() string { func (*StreamSnapRequest) ProtoMessage() {} func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[0] + mi := &file_global_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -59,7 +191,7 @@ func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamSnapRequest.ProtoReflect.Descriptor instead. func (*StreamSnapRequest) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{0} + return file_global_proto_rawDescGZIP(), []int{3} } func (x *StreamSnapRequest) GetStreamPath() string { @@ -82,7 +214,7 @@ type Wrap struct { func (x *Wrap) Reset() { *x = Wrap{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[1] + mi := &file_global_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -95,7 +227,7 @@ func (x *Wrap) String() string { func (*Wrap) ProtoMessage() {} func (x *Wrap) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[1] + mi := &file_global_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -108,7 +240,7 @@ func (x *Wrap) ProtoReflect() protoreflect.Message { // Deprecated: Use Wrap.ProtoReflect.Descriptor instead. func (*Wrap) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{1} + return file_global_proto_rawDescGZIP(), []int{4} } func (x *Wrap) GetTimestamp() uint32 { @@ -147,7 +279,7 @@ type TrackSnapShot struct { func (x *TrackSnapShot) Reset() { *x = TrackSnapShot{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[2] + mi := &file_global_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -160,7 +292,7 @@ func (x *TrackSnapShot) String() string { func (*TrackSnapShot) ProtoMessage() {} func (x *TrackSnapShot) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[2] + mi := &file_global_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -173,7 +305,7 @@ func (x *TrackSnapShot) ProtoReflect() protoreflect.Message { // Deprecated: Use TrackSnapShot.ProtoReflect.Descriptor instead. func (*TrackSnapShot) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{2} + return file_global_proto_rawDescGZIP(), []int{5} } func (x *TrackSnapShot) GetSequence() uint32 { @@ -223,7 +355,7 @@ type StreamSnapShot struct { func (x *StreamSnapShot) Reset() { *x = StreamSnapShot{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[3] + mi := &file_global_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -236,7 +368,7 @@ func (x *StreamSnapShot) String() string { func (*StreamSnapShot) ProtoMessage() {} func (x *StreamSnapShot) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[3] + mi := &file_global_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -249,7 +381,7 @@ func (x *StreamSnapShot) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamSnapShot.ProtoReflect.Descriptor instead. func (*StreamSnapShot) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{3} + return file_global_proto_rawDescGZIP(), []int{6} } func (x *StreamSnapShot) GetVideoTrack() []*TrackSnapShot { @@ -277,7 +409,7 @@ type StopSubscribeRequest struct { func (x *StopSubscribeRequest) Reset() { *x = StopSubscribeRequest{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[4] + mi := &file_global_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -290,7 +422,7 @@ func (x *StopSubscribeRequest) String() string { func (*StopSubscribeRequest) ProtoMessage() {} func (x *StopSubscribeRequest) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[4] + mi := &file_global_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -303,7 +435,7 @@ func (x *StopSubscribeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StopSubscribeRequest.ProtoReflect.Descriptor instead. func (*StopSubscribeRequest) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{4} + return file_global_proto_rawDescGZIP(), []int{7} } func (x *StopSubscribeRequest) GetId() uint32 { @@ -324,7 +456,7 @@ type StopSubscribeResponse struct { func (x *StopSubscribeResponse) Reset() { *x = StopSubscribeResponse{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[5] + mi := &file_global_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -337,7 +469,7 @@ func (x *StopSubscribeResponse) String() string { func (*StopSubscribeResponse) ProtoMessage() {} func (x *StopSubscribeResponse) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[5] + mi := &file_global_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -350,7 +482,7 @@ func (x *StopSubscribeResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StopSubscribeResponse.ProtoReflect.Descriptor instead. func (*StopSubscribeResponse) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{5} + return file_global_proto_rawDescGZIP(), []int{8} } func (x *StopSubscribeResponse) GetSuccess() bool { @@ -371,7 +503,7 @@ type RequestWithId struct { func (x *RequestWithId) Reset() { *x = RequestWithId{} if protoimpl.UnsafeEnabled { - mi := &file_global_proto_msgTypes[6] + mi := &file_global_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -384,7 +516,7 @@ func (x *RequestWithId) String() string { func (*RequestWithId) ProtoMessage() {} func (x *RequestWithId) ProtoReflect() protoreflect.Message { - mi := &file_global_proto_msgTypes[6] + mi := &file_global_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -397,7 +529,7 @@ func (x *RequestWithId) ProtoReflect() protoreflect.Message { // Deprecated: Use RequestWithId.ProtoReflect.Descriptor instead. func (*RequestWithId) Descriptor() ([]byte, []int) { - return file_global_proto_rawDescGZIP(), []int{6} + return file_global_proto_rawDescGZIP(), []int{9} } func (x *RequestWithId) GetId() uint32 { @@ -414,67 +546,80 @@ var file_global_proto_rawDesc = []byte{ 0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x33, - 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, - 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, - 0x61, 0x74, 0x68, 0x22, 0x4c, 0x0a, 0x04, 0x57, 0x72, 0x61, 0x70, 0x12, 0x1c, 0x0a, 0x09, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x12, 0x0a, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x22, 0xa0, 0x01, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, - 0x68, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, - 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1c, 0x0a, - 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, - 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x63, 0x61, - 0x6e, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x77, 0x72, 0x61, 0x70, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x57, 0x72, 0x61, 0x70, 0x52, 0x04, - 0x77, 0x72, 0x61, 0x70, 0x22, 0x78, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, - 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x32, 0x0a, 0x0a, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, - 0x72, 0x61, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x37, 0x73, - 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x52, 0x0a, - 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x75, - 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, - 0x6f, 0x74, 0x52, 0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x22, 0x26, - 0x0a, 0x14, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x22, 0x31, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x1f, 0x0a, 0x0d, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x32, 0x80, 0x03, 0x0a, 0x06, 0x47, - 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x52, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, - 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x57, - 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1a, 0x82, - 0xd3, 0xe4, 0x93, 0x02, 0x14, 0x22, 0x12, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, - 0x64, 0x6f, 0x77, 0x6e, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x50, 0x0a, 0x07, 0x52, 0x65, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x22, 0x11, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x72, - 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x63, 0x0a, 0x0a, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x12, 0x16, 0x2e, 0x6d, 0x37, 0x73, 0x2e, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, - 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x22, 0x12, 0x20, - 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x6e, 0x61, 0x70, - 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, - 0x12, 0x6b, 0x0a, 0x0d, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, - 0x65, 0x12, 0x19, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6d, + 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x20, + 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, + 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, + 0x22, 0x13, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x39, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, + 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x04, 0x6c, + 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6d, 0x37, 0x73, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74, + 0x22, 0x33, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, + 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x4c, 0x0a, 0x04, 0x57, 0x72, 0x61, 0x70, 0x12, 0x1c, 0x0a, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x73, + 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x22, 0xa0, 0x01, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, + 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, + 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, + 0x1c, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, + 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x77, 0x72, 0x61, 0x70, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x57, 0x72, 0x61, 0x70, + 0x52, 0x04, 0x77, 0x72, 0x61, 0x70, 0x22, 0x78, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x32, 0x0a, 0x0a, 0x76, 0x69, 0x64, 0x65, + 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, + 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, + 0x52, 0x0a, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x12, 0x32, 0x0a, 0x0a, + 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, + 0x53, 0x68, 0x6f, 0x74, 0x52, 0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, + 0x22, 0x26, 0x0a, 0x14, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x22, 0x31, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x1f, 0x0a, 0x0d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x32, 0xd9, 0x03, 0x0a, + 0x06, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x52, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64, + 0x6f, 0x77, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x1a, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x14, 0x22, 0x12, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, + 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x50, 0x0a, 0x07, 0x52, + 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x22, 0x11, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x72, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x57, 0x0a, + 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e, 0x6d, 0x37, + 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x12, 0x12, 0x10, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x12, 0x63, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x53, 0x6e, 0x61, 0x70, 0x12, 0x16, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, + 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, + 0x74, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x22, 0x12, 0x20, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x6e, 0x61, 0x70, 0x2f, 0x7b, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x12, 0x6b, 0x0a, 0x0d, 0x53, + 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x19, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, - 0x22, 0x18, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x14, 0x5a, - 0x12, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, - 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, + 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x61, 0x70, + 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x14, 0x5a, 0x12, 0x6d, 0x37, 0x73, 0x2e, + 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -489,34 +634,40 @@ func file_global_proto_rawDescGZIP() []byte { return file_global_proto_rawDescData } -var file_global_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_global_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_global_proto_goTypes = []interface{}{ - (*StreamSnapRequest)(nil), // 0: m7s.StreamSnapRequest - (*Wrap)(nil), // 1: m7s.Wrap - (*TrackSnapShot)(nil), // 2: m7s.TrackSnapShot - (*StreamSnapShot)(nil), // 3: m7s.StreamSnapShot - (*StopSubscribeRequest)(nil), // 4: m7s.StopSubscribeRequest - (*StopSubscribeResponse)(nil), // 5: m7s.StopSubscribeResponse - (*RequestWithId)(nil), // 6: m7s.RequestWithId - (*emptypb.Empty)(nil), // 7: google.protobuf.Empty + (*StreamInfo)(nil), // 0: m7s.StreamInfo + (*StreamListRequest)(nil), // 1: m7s.StreamListRequest + (*StreamListResponse)(nil), // 2: m7s.StreamListResponse + (*StreamSnapRequest)(nil), // 3: m7s.StreamSnapRequest + (*Wrap)(nil), // 4: m7s.Wrap + (*TrackSnapShot)(nil), // 5: m7s.TrackSnapShot + (*StreamSnapShot)(nil), // 6: m7s.StreamSnapShot + (*StopSubscribeRequest)(nil), // 7: m7s.StopSubscribeRequest + (*StopSubscribeResponse)(nil), // 8: m7s.StopSubscribeResponse + (*RequestWithId)(nil), // 9: m7s.RequestWithId + (*emptypb.Empty)(nil), // 10: google.protobuf.Empty } var file_global_proto_depIdxs = []int32{ - 1, // 0: m7s.TrackSnapShot.wrap:type_name -> m7s.Wrap - 2, // 1: m7s.StreamSnapShot.videoTrack:type_name -> m7s.TrackSnapShot - 2, // 2: m7s.StreamSnapShot.audioTrack:type_name -> m7s.TrackSnapShot - 6, // 3: m7s.Global.Shutdown:input_type -> m7s.RequestWithId - 6, // 4: m7s.Global.Restart:input_type -> m7s.RequestWithId - 0, // 5: m7s.Global.StreamSnap:input_type -> m7s.StreamSnapRequest - 4, // 6: m7s.Global.StopSubscribe:input_type -> m7s.StopSubscribeRequest - 7, // 7: m7s.Global.Shutdown:output_type -> google.protobuf.Empty - 7, // 8: m7s.Global.Restart:output_type -> google.protobuf.Empty - 3, // 9: m7s.Global.StreamSnap:output_type -> m7s.StreamSnapShot - 5, // 10: m7s.Global.StopSubscribe:output_type -> m7s.StopSubscribeResponse - 7, // [7:11] is the sub-list for method output_type - 3, // [3:7] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 0, // 0: m7s.StreamListResponse.list:type_name -> m7s.StreamInfo + 4, // 1: m7s.TrackSnapShot.wrap:type_name -> m7s.Wrap + 5, // 2: m7s.StreamSnapShot.videoTrack:type_name -> m7s.TrackSnapShot + 5, // 3: m7s.StreamSnapShot.audioTrack:type_name -> m7s.TrackSnapShot + 9, // 4: m7s.Global.Shutdown:input_type -> m7s.RequestWithId + 9, // 5: m7s.Global.Restart:input_type -> m7s.RequestWithId + 1, // 6: m7s.Global.StreamList:input_type -> m7s.StreamListRequest + 3, // 7: m7s.Global.StreamSnap:input_type -> m7s.StreamSnapRequest + 7, // 8: m7s.Global.StopSubscribe:input_type -> m7s.StopSubscribeRequest + 10, // 9: m7s.Global.Shutdown:output_type -> google.protobuf.Empty + 10, // 10: m7s.Global.Restart:output_type -> google.protobuf.Empty + 2, // 11: m7s.Global.StreamList:output_type -> m7s.StreamListResponse + 6, // 12: m7s.Global.StreamSnap:output_type -> m7s.StreamSnapShot + 8, // 13: m7s.Global.StopSubscribe:output_type -> m7s.StopSubscribeResponse + 9, // [9:14] is the sub-list for method output_type + 4, // [4:9] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_global_proto_init() } @@ -526,7 +677,7 @@ func file_global_proto_init() { } if !protoimpl.UnsafeEnabled { file_global_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamSnapRequest); i { + switch v := v.(*StreamInfo); i { case 0: return &v.state case 1: @@ -538,7 +689,7 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Wrap); i { + switch v := v.(*StreamListRequest); i { case 0: return &v.state case 1: @@ -550,7 +701,7 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TrackSnapShot); i { + switch v := v.(*StreamListResponse); i { case 0: return &v.state case 1: @@ -562,7 +713,7 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamSnapShot); i { + switch v := v.(*StreamSnapRequest); i { case 0: return &v.state case 1: @@ -574,7 +725,7 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StopSubscribeRequest); i { + switch v := v.(*Wrap); i { case 0: return &v.state case 1: @@ -586,7 +737,7 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StopSubscribeResponse); i { + switch v := v.(*TrackSnapShot); i { case 0: return &v.state case 1: @@ -598,6 +749,42 @@ func file_global_proto_init() { } } file_global_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamSnapShot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_global_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StopSubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_global_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StopSubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_global_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*RequestWithId); i { case 0: return &v.state @@ -616,7 +803,7 @@ func file_global_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_global_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/pb/global.pb.gw.go b/pb/global.pb.gw.go index d169967..5bbc23b 100644 --- a/pb/global.pb.gw.go +++ b/pb/global.pb.gw.go @@ -135,6 +135,24 @@ func local_request_Global_Restart_0(ctx context.Context, marshaler runtime.Marsh } +func request_Global_StreamList_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StreamListRequest + var metadata runtime.ServerMetadata + + msg, err := client.StreamList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Global_StreamList_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StreamListRequest + var metadata runtime.ServerMetadata + + msg, err := server.StreamList(ctx, &protoReq) + return msg, metadata, err + +} + func request_Global_StreamSnap_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq StreamSnapRequest var metadata runtime.ServerMetadata @@ -303,6 +321,31 @@ func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser }) + mux.Handle("GET", pattern_Global_StreamList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/StreamList", runtime.WithHTTPPathPattern("/api/stream/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Global_StreamList_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Global_StreamList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -438,6 +481,28 @@ func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli }) + mux.Handle("GET", pattern_Global_StreamList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/StreamList", runtime.WithHTTPPathPattern("/api/stream/list")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Global_StreamList_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Global_StreamList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -490,6 +555,8 @@ var ( pattern_Global_Restart_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"api", "restart", "id"}, "")) + pattern_Global_StreamList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "stream", "list"}, "")) + pattern_Global_StreamSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "snap", "streamPath"}, "")) pattern_Global_StopSubscribe_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "stop", "subscribe", "id"}, "")) @@ -500,6 +567,8 @@ var ( forward_Global_Restart_0 = runtime.ForwardResponseMessage + forward_Global_StreamList_0 = runtime.ForwardResponseMessage + forward_Global_StreamSnap_0 = runtime.ForwardResponseMessage forward_Global_StopSubscribe_0 = runtime.ForwardResponseMessage diff --git a/pb/global.proto b/pb/global.proto index e416940..00eb7ec 100644 --- a/pb/global.proto +++ b/pb/global.proto @@ -15,6 +15,11 @@ service Global { post: "/api/restart/{id}" }; } + rpc StreamList (StreamListRequest) returns (StreamListResponse) { + option (google.api.http) = { + get: "/api/stream/list" + }; + } rpc StreamSnap (StreamSnapRequest) returns (StreamSnapShot) { option (google.api.http) = { get: "/api/stream/snap/{streamPath=**}" @@ -27,6 +32,15 @@ service Global { }; } } +message StreamInfo { + string path = 1; +} +message StreamListRequest { + +} +message StreamListResponse { + repeated StreamInfo list = 1; +} message StreamSnapRequest { string streamPath = 1; diff --git a/pb/global_grpc.pb.go b/pb/global_grpc.pb.go index 1623353..1fb6c79 100644 --- a/pb/global_grpc.pb.go +++ b/pb/global_grpc.pb.go @@ -25,6 +25,7 @@ const _ = grpc.SupportPackageIsVersion7 type GlobalClient interface { Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) + StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error) } @@ -55,6 +56,15 @@ func (c *globalClient) Restart(ctx context.Context, in *RequestWithId, opts ...g return out, nil } +func (c *globalClient) StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error) { + out := new(StreamListResponse) + err := c.cc.Invoke(ctx, "/m7s.Global/StreamList", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *globalClient) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) { out := new(StreamSnapShot) err := c.cc.Invoke(ctx, "/m7s.Global/StreamSnap", in, out, opts...) @@ -79,6 +89,7 @@ func (c *globalClient) StopSubscribe(ctx context.Context, in *StopSubscribeReque type GlobalServer interface { Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) + StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error) mustEmbedUnimplementedGlobalServer() @@ -94,6 +105,9 @@ func (UnimplementedGlobalServer) Shutdown(context.Context, *RequestWithId) (*emp func (UnimplementedGlobalServer) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented") } +func (UnimplementedGlobalServer) StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StreamList not implemented") +} func (UnimplementedGlobalServer) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) { return nil, status.Errorf(codes.Unimplemented, "method StreamSnap not implemented") } @@ -149,6 +163,24 @@ func _Global_Restart_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Global_StreamList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StreamListRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GlobalServer).StreamList(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/m7s.Global/StreamList", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GlobalServer).StreamList(ctx, req.(*StreamListRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Global_StreamSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(StreamSnapRequest) if err := dec(in); err != nil { @@ -200,6 +232,10 @@ var Global_ServiceDesc = grpc.ServiceDesc{ MethodName: "Restart", Handler: _Global_Restart_Handler, }, + { + MethodName: "StreamList", + Handler: _Global_StreamList_Handler, + }, { MethodName: "StreamSnap", Handler: _Global_StreamSnap_Handler, diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 78afb67..6d292b6 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -4,7 +4,7 @@ import ( "io" ) -const defaultBufSize = 4096 +const defaultBufSize = 65536 type BufReader struct { reader io.Reader @@ -15,7 +15,7 @@ type BufReader struct { func NewBufReader(reader io.Reader) (r *BufReader) { r = &BufReader{} r.reader = reader - r.buf.ScalableMemoryAllocator = NewScalableMemoryAllocator(4096) + r.buf.ScalableMemoryAllocator = NewScalableMemoryAllocator(65536) r.BufLen = defaultBufSize return } @@ -31,12 +31,11 @@ func (r *BufReader) eat() error { } func (r *BufReader) ReadByte() (byte, error) { - if r.buf.Length > 0 { - return r.buf.ReadByte() - } - err := r.eat() - if err != nil { - return 0, err + for r.buf.Length == 0 { + err := r.eat() + if err != nil { + return 0, err + } } return r.buf.ReadByte() } @@ -45,22 +44,46 @@ func (r *BufReader) ReadBE(n int) (num int, err error) { for i := range n { b, err := r.ReadByte() if err != nil { - return -1, err + return 0, err } num += int(b) << ((n - i - 1) << 3) } return } +func (r *BufReader) ReadLE32(n int) (num uint32, err error) { + for i := range n { + b, err := r.ReadByte() + if err != nil { + return 0, err + } + num += uint32(b) << (i << 3) + } + return +} + +func (r *BufReader) ReadBE32(n int) (num uint32, err error) { + for i := range n { + b, err := r.ReadByte() + if err != nil { + return 0, err + } + num += uint32(b) << ((n - i - 1) << 3) + } + return +} + func (r *BufReader) ReadBytes(n int) (mem *RecyclableBuffers, err error) { mem = &RecyclableBuffers{ScalableMemoryAllocator: r.buf.ScalableMemoryAllocator} for r.buf.RecycleFront(); n > 0 && err == nil; err = r.eat() { - if r.buf.Length >= n { - mem.ReadFromBytes(r.buf.Buffers.Cut(n)...) - return + if r.buf.Length > 0 { + if r.buf.Length >= n { + mem.ReadFromBytes(r.buf.Buffers.Cut(n)...) + return + } + n -= r.buf.Length + mem.ReadFromBytes(r.buf.CutAll()...) } - n -= r.buf.Length - mem.ReadFromBytes(r.buf.CutAll()...) } return } diff --git a/pkg/util/buffers.go b/pkg/util/buffers.go index 04668d0..33d7743 100644 --- a/pkg/util/buffers.go +++ b/pkg/util/buffers.go @@ -160,6 +160,22 @@ func (buffers *Buffers) ReadBytes(n int) ([]byte, error) { return b, err } +func (buffers *Buffers) WriteTo(w io.Writer) (n int64, err error) { + var buf net.Buffers + if len(buffers.Buffers) > buffers.offset { + buf = append(buf, buffers.Buffers[buffers.offset:]...) + } + if buffers.curBufLen > 0 { + buf[0] = buffers.curBuf + } + buffers.curBuf = nil + buffers.curBufLen = 0 + buffers.offset = len(buffers.Buffers) + buffers.Offset = buffers.Length + buffers.Length = 0 + return buf.WriteTo(w) +} + func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) { for actual = n; buffers.Length > 0 && n > 0; buffers.skipBuf() { if buffers.curBufLen > n { @@ -248,7 +264,7 @@ func (buffers *Buffers) ClipBack(n int) []byte { func (buffers *Buffers) CutAll() (r net.Buffers) { r = append(r, buffers.curBuf) - for i := buffers.offset+1; i < len(buffers.Buffers); i++ { + for i := buffers.offset + 1; i < len(buffers.Buffers); i++ { r = append(r, buffers.Buffers[i]) } if len(buffers.Buffers[buffers.offset]) == buffers.curBufLen { diff --git a/plugin/hdl/pkg/pull.go b/plugin/hdl/pkg/pull.go index 4a75acf..fdbf7c3 100644 --- a/plugin/hdl/pkg/pull.go +++ b/plugin/hdl/pkg/pull.go @@ -1,7 +1,6 @@ package hdl import ( - "bufio" "errors" "io" "net/http" @@ -15,17 +14,14 @@ import ( ) type HDLPuller struct { - *bufio.Reader + *util.BufReader hasAudio bool hasVideo bool absTS uint32 //绝对时间戳 - pool *util.ScalableMemoryAllocator } func NewHDLPuller() *HDLPuller { - return &HDLPuller{ - pool: util.NewScalableMemoryAllocator(1024), - } + return &HDLPuller{} } func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { @@ -45,24 +41,28 @@ func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { return io.EOF } p.Closer = res.Body - puller.Reader = bufio.NewReader(res.Body) + puller.BufReader = util.NewBufReader(res.Body) } } else { var res *os.File if res, err = os.Open(p.RemoteURL); err == nil { p.Closer = res - puller.Reader = bufio.NewReader(res) + puller.BufReader = util.NewBufReader(res) } } if err == nil { - header := puller.pool.Malloc(13) - defer puller.pool.Free(header) - if _, err = io.ReadFull(puller, header); err == nil { - if header[0] != 'F' || header[1] != 'L' || header[2] != 'V' { + var head *util.RecyclableBuffers + head, err = puller.BufReader.ReadBytes(13) + defer head.Recycle() + if err == nil { + var flvHead [3]byte + var version, flag byte + head.ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag) + if flvHead != [...]byte{'F', 'L', 'V'} { err = errors.New("not flv file") } else { - puller.hasAudio = header[4]&0x04 != 0 - puller.hasVideo = header[4]&0x01 != 0 + puller.hasAudio = flag&0x04 != 0 + puller.hasVideo = flag&0x01 != 0 } } } @@ -71,7 +71,6 @@ func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) { var startTs uint32 - var buf15 [15]byte pubConf := p.GetPublishConfig() if !puller.hasAudio { pubConf.PubAudio = false @@ -79,43 +78,48 @@ func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) { if !puller.hasVideo { pubConf.PubVideo = false } - pubaudio, pubvideo := pubConf.PubAudio, pubConf.PubVideo - for offsetTs := puller.absTS; err == nil; _, err = io.ReadFull(puller, buf15[11:]) { - tmp := util.Buffer(buf15[:11]) - _, err = io.ReadFull(puller, tmp) + for offsetTs := puller.absTS; err == nil; _, err = puller.ReadBE(4) { + t, err := puller.ReadByte() if err != nil { - return + return err } - t := tmp.ReadByte() - dataSize := tmp.ReadUint24() - timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24 + dataSize, err := puller.ReadBE32(3) + if err != nil { + return err + } + timestamp, err := puller.ReadBE32(3) + if err != nil { + return err + } + h, err := puller.ReadByte() + if err != nil { + return err + } + timestamp = timestamp | uint32(h)<<24 if startTs == 0 { startTs = timestamp } - tmp.ReadUint24() + puller.ReadBE(3) // stream id always 0 var frame rtmp.RTMPData - frame.ScalableMemoryAllocator = puller.pool - mem := frame.Malloc(int(dataSize)) - _, err = io.ReadFull(puller, mem) + frame.RecyclableBuffers, err = puller.ReadBytes(int(dataSize)) if err != nil { frame.Recycle() - return + return err } - frame.ReadFromBytes(mem) puller.absTS = offsetTs + (timestamp - startTs) frame.Timestamp = puller.absTS // fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS) switch t { case FLV_TAG_TYPE_AUDIO: - if pubaudio { - p.WriteAudio(&rtmp.RTMPAudio{frame}) + if pubConf.PubAudio { + p.WriteAudio(frame.WrapAudio()) } case FLV_TAG_TYPE_VIDEO: - if pubvideo { - p.WriteVideo(&rtmp.RTMPVideo{frame}) + if pubConf.PubVideo { + p.WriteVideo(frame.WrapVideo()) } case FLV_TAG_TYPE_SCRIPT: - p.Info("script", "data", mem) + p.Info("script") frame.Recycle() } } diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index 20828ac..04d0850 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -193,17 +193,13 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { } case RTMP_MSG_AUDIO: if r, ok := receivers[msg.MessageStreamID]; ok { - r.WriteAudio(&RTMPAudio{msg.AVData}) - msg.AVData = RTMPData{} - msg.AVData.ScalableMemoryAllocator = nc.ReadPool + r.WriteAudio(msg.AVData.WrapAudio()) } else { logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID) } case RTMP_MSG_VIDEO: if r, ok := receivers[msg.MessageStreamID]; ok { - r.WriteVideo(&RTMPVideo{msg.AVData}) - msg.AVData = RTMPData{} - msg.AVData.ScalableMemoryAllocator = nc.ReadPool + r.WriteVideo(msg.AVData.WrapVideo()) } else { logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID) } diff --git a/plugin/rtmp/pkg/chunk.go b/plugin/rtmp/pkg/chunk.go index 572fb23..f8b3cb5 100644 --- a/plugin/rtmp/pkg/chunk.go +++ b/plugin/rtmp/pkg/chunk.go @@ -29,6 +29,7 @@ type Chunk struct { ChunkHeader AVData RTMPData MsgData RtmpMessage + bufLen int } type ChunkHeader struct { @@ -82,10 +83,10 @@ func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) { } type ( - ChunkHeader8 ChunkHeader + ChunkHeader8 ChunkHeader ChunkHeader12 ChunkHeader - ChunkHeader1 ChunkHeader - IChunkHeader interface { + ChunkHeader1 ChunkHeader + IChunkHeader interface { WriteTo(*util.Buffer) } ) diff --git a/plugin/rtmp/pkg/client.go b/plugin/rtmp/pkg/client.go index b56983c..9687e98 100644 --- a/plugin/rtmp/pkg/client.go +++ b/plugin/rtmp/pkg/client.go @@ -110,13 +110,9 @@ func (puller *Client) Pull(p *m7s.Puller) (err error) { } switch msg.MessageTypeID { case RTMP_MSG_AUDIO: - p.WriteAudio(&RTMPAudio{msg.AVData}) - msg.AVData = RTMPData{} - msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool + p.WriteAudio(msg.AVData.WrapAudio()) case RTMP_MSG_VIDEO: - p.WriteVideo(&RTMPVideo{msg.AVData}) - msg.AVData = RTMPData{} - msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool + p.WriteVideo(msg.AVData.WrapVideo()) case RTMP_MSG_AMF0_COMMAND: cmd := msg.MsgData.(Commander).GetCommand() switch cmd.CommandName { diff --git a/plugin/rtmp/pkg/const.go b/plugin/rtmp/pkg/const.go index b86d166..95279dc 100644 --- a/plugin/rtmp/pkg/const.go +++ b/plugin/rtmp/pkg/const.go @@ -18,8 +18,7 @@ const ( type RTMPData struct { Timestamp uint32 - util.Buffers - util.RecyclableMemory + *util.RecyclableBuffers } func (avcc *RTMPData) GetSize() int { @@ -40,3 +39,11 @@ func (avcc *RTMPData) GetTimestamp() time.Duration { func (avcc *RTMPData) IsIDR() bool { return false } + +func (avcc *RTMPData) WrapAudio() *RTMPAudio { + return &RTMPAudio{RTMPData: *avcc} +} + +func (avcc *RTMPData) WrapVideo() *RTMPVideo { + return &RTMPVideo{RTMPData: *avcc} +} \ No newline at end of file diff --git a/plugin/rtmp/pkg/handshake.go b/plugin/rtmp/pkg/handshake.go index 899ace1..3dfed46 100644 --- a/plugin/rtmp/pkg/handshake.go +++ b/plugin/rtmp/pkg/handshake.go @@ -66,16 +66,10 @@ var ( // C2 S2 : 参考C1 S1 -func (nc *NetConnection) ReadBuf(length int) (buf []byte, err error) { - buf = nc.ReadPool.Malloc(length) - _, err = io.ReadFull(nc.Reader, buf) - return -} - -func (nc *NetConnection) Handshake() error { - C0C1, err := nc.ReadBuf(C1S1_SIZE + 1) - defer nc.ReadPool.Free(C0C1) - if err != nil { +func (nc *NetConnection) Handshake() (err error) { + C0C1 := nc.writePool.Malloc(C1S1_SIZE + 1) + defer nc.writePool.Recycle() + if _, err = io.ReadFull(nc.Conn, C0C1); err != nil { return err } if C0C1[0] != RTMP_HANDSHAKE_VERSION { @@ -96,37 +90,36 @@ func (nc *NetConnection) Handshake() error { } func (client *NetConnection) ClientHandshake() (err error) { - C0C1 := client.ReadPool.Malloc(C1S1_SIZE + 1) - defer client.ReadPool.Free(C0C1) + C0C1 := client.writePool.Malloc(C1S1_SIZE + 1) + defer client.writePool.Recycle() C0C1[0] = RTMP_HANDSHAKE_VERSION if _, err = client.Write(C0C1); err == nil { // read S0 S1 - if _, err = io.ReadFull(client.Reader, C0C1); err == nil { + if _, err = io.ReadFull(client.Conn, C0C1); err == nil { if C0C1[0] != RTMP_HANDSHAKE_VERSION { err = errors.New("S1 C1 Error") // C2 } else if _, err = client.Write(C0C1[1:]); err == nil { - _, err = io.ReadFull(client.Reader, C0C1[1:]) // S2 + _, err = io.ReadFull(client.Conn, C0C1[1:]) // S2 } } } - return + return err } func (nc *NetConnection) simple_handshake(C1 []byte) error { - S0S1 := nc.ReadPool.Malloc(C1S1_SIZE + 1) + S0S1 := nc.writePool.Malloc(C1S1_SIZE + 1) S0S1[0] = RTMP_HANDSHAKE_VERSION util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF) copy(S0S1[5:], "Monibuca") nc.Write(S0S1) nc.Write(C1) // S2 - defer nc.ReadPool.Free(S0S1) - C2, err := nc.ReadBuf(C1S1_SIZE) - defer nc.ReadPool.Free(C2) + C2, err := nc.ReadBytes(C1S1_SIZE) + defer C2.Recycle() if err != nil { return err } - if !bytes.Equal(C2[8:], S0S1[9:]) { + if !bytes.Equal(C2.ToBytes()[8:], S0S1[9:]) { return errors.New("C2 Error") } return nil @@ -181,8 +174,8 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error { buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest} buffer.WriteTo(nc) - b, _ := nc.ReadBuf(1536) - nc.ReadPool.Free(b) + b, _ := nc.ReadBytes(1536) + b.Recycle() return nil } diff --git a/plugin/rtmp/pkg/net-connection.go b/plugin/rtmp/pkg/net-connection.go index fcebcca..f0d8b43 100644 --- a/plugin/rtmp/pkg/net-connection.go +++ b/plugin/rtmp/pkg/net-connection.go @@ -1,10 +1,7 @@ package rtmp import ( - "bufio" - "encoding/binary" "errors" - "io" "log/slog" "net" "runtime" @@ -47,7 +44,7 @@ const ( type NetConnection struct { *slog.Logger `json:"-" yaml:"-"` - *bufio.Reader `json:"-" yaml:"-"` + *util.BufReader `json:"-" yaml:"-"` net.Conn `json:"-" yaml:"-"` bandwidth uint32 readSeqNum uint32 // 当前读的字节 @@ -61,8 +58,7 @@ type NetConnection struct { AppName string tmpBuf util.Buffer //用来接收/发送小数据,复用内存 chunkHeaderBuf util.Buffer - ReadPool *util.ScalableMemoryAllocator - WritePool util.RecyclableMemory + writePool util.RecyclableMemory writing atomic.Bool // false 可写,true 不可写 } @@ -70,26 +66,18 @@ func NewNetConnection(conn net.Conn, logger *slog.Logger) (ret *NetConnection) { ret = &NetConnection{ Logger: logger, Conn: conn, - Reader: bufio.NewReader(conn), + BufReader: util.NewBufReader(conn), WriteChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, incommingChunks: make(map[uint32]*Chunk), bandwidth: RTMP_MAX_CHUNK_SIZE << 3, tmpBuf: make(util.Buffer, 4), chunkHeaderBuf: make(util.Buffer, 0, 20), - ReadPool: util.NewScalableMemoryAllocator(2048), } - ret.WritePool.ScalableMemoryAllocator = util.NewScalableMemoryAllocator(1024) + ret.writePool.ScalableMemoryAllocator = util.NewScalableMemoryAllocator(1024) return } -func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) { - n, err = io.ReadFull(conn.Reader, buf) - if err == nil { - conn.readSeqNum += uint32(n) - } - return -} func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) { return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID}) } @@ -136,41 +124,47 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { // println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType) chunk, ok := conn.incommingChunks[ChunkStreamID] - if ChunkType != 3 && ok && chunk.AVData.Length > 0 { + if ChunkType != 3 && ok && chunk.bufLen > 0 { // 如果块类型不为3,那么这个rtmp的body应该为空. return nil, errors.New("incompleteRtmpBody error") } if !ok { chunk = &Chunk{} conn.incommingChunks[ChunkStreamID] = chunk - chunk.AVData.ScalableMemoryAllocator = conn.ReadPool } if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil { return nil, errors.New("get chunk type error :" + err.Error()) } msgLen := int(chunk.MessageLength) - - mem := chunk.AVData.Malloc(conn.readChunkSize) - if unRead := msgLen - chunk.AVData.Length; unRead < conn.readChunkSize { - mem = mem[:unRead] - } - if n, err := conn.ReadFull(mem); err != nil { - chunk.AVData.Recycle() - return nil, err + var mem *util.RecyclableBuffers + if unRead := msgLen - chunk.bufLen; unRead < conn.readChunkSize { + mem, err = conn.ReadBytes(unRead) } else { - conn.readSeqNum += uint32(n) + mem, err = conn.ReadBytes(conn.readChunkSize) } - if chunk.AVData.ReadFromBytes(mem); chunk.AVData.Length == msgLen { + if err != nil { + mem.Recycle() + return nil, err + } + conn.readSeqNum += uint32(mem.Length) + if chunk.bufLen == 0 { + chunk.AVData.RecyclableBuffers = mem + } else { + chunk.AVData.ReadFromBytes(mem.Buffers.Buffers...) + } + chunk.bufLen += mem.Length + if chunk.AVData.Length == msgLen { chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp msg = chunk switch chunk.MessageTypeID { case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO: msg.AVData.Timestamp = chunk.ChunkHeader.ExtendTimestamp default: - err = GetRtmpMessage(msg, msg.AVData.ToBytes()) msg.AVData.Recycle() + err = GetRtmpMessage(msg, msg.AVData.ToBytes()) } + msg.bufLen = 0 } return } @@ -208,22 +202,18 @@ func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32, } func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) { - conn.tmpBuf.Reset() - b4 := conn.tmpBuf.Malloc(4) - b3 := b4[:3] if chunkType == 3 { // 3个字节的时间戳 } else { // Timestamp 3 bytes - if _, err = conn.ReadFull(b3); err != nil { + if h.Timestamp, err = conn.ReadBE32(3); err != nil { return err } - util.GetBE(b3, &h.Timestamp) + if chunkType != 2 { - if _, err = conn.ReadFull(b3); err != nil { + if h.MessageLength, err = conn.ReadBE32(3); err != nil { return err } - util.GetBE(b3, &h.MessageLength) // Message Type ID 1 bytes if h.MessageTypeID, err = conn.ReadByte(); err != nil { return err @@ -231,20 +221,18 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err er conn.readSeqNum++ if chunkType == 0 { // Message Stream ID 4bytes - if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID + if h.MessageStreamID, err = conn.ReadBE32(4); err != nil { // 读取Message Stream ID return err } - h.MessageStreamID = binary.LittleEndian.Uint32(b4) } } } // ExtendTimestamp 4 bytes if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 - if _, err = conn.ReadFull(b4); err != nil { + if h.Timestamp, err = conn.ReadBE32(4); err != nil { return err } - util.GetBE(b4, &h.Timestamp) } if chunkType == 0 { h.ExtendTimestamp = h.Timestamp @@ -331,6 +319,5 @@ func (conn *NetConnection) sendChunk(r util.Buffers, head *ChunkHeader, headType var nw int64 nw, err = chunks.WriteTo(conn.Conn) conn.writeSeqNum += uint32(nw) - conn.WritePool.Recycle() return err } diff --git a/publisher.go b/publisher.go index d6bef20..9d76fd9 100644 --- a/publisher.go +++ b/publisher.go @@ -5,9 +5,9 @@ import ( "sync" "time" + "m7s.live/m7s/v5/pb" . "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/config" - "m7s.live/m7s/v5/pb" ) type PublisherState int @@ -136,7 +136,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { } frame.Timestamp = max(1, p.baseTs+ts) p.lastTs = frame.Timestamp - p.Trace("write", "seq", frame.Sequence) + p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", t.Codec.String(), "size", frame.Wrap.GetSize(), "data", frame.Wrap.String()) t.Step() p.speedControl(p.Publish.Speed, p.lastTs) } @@ -167,7 +167,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { if t.IDRing != nil { p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence) if t.HistoryRing == nil { - if l := t.Size - p.GOP; l > 12 { + if l := t.Size - p.GOP; l > 12 && t.Size > 100 { t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5) t.Reduce(5) //缩小缓冲环节省内存 } @@ -233,7 +233,10 @@ func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) { return t } p.RUnlock() - return p.createTransTrack(dataType) + if p.AudioTrack != nil { + return p.createTransTrack(dataType) + } + return } func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) { diff --git a/server.go b/server.go index cf45099..a8711dd 100644 --- a/server.go +++ b/server.go @@ -105,10 +105,10 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) { } func (s *Server) run(ctx context.Context, conf any) (err error) { - mux := runtime.NewServeMux(runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), runtime.WithRoutingErrorHandler(runtime.RoutingErrorHandlerFunc(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) { - s.config.HTTP.GetHttpMux().ServeHTTP(w, r) - }))) httpConf, tcpConf := &s.config.HTTP, &s.config.TCP + mux := runtime.NewServeMux(runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), runtime.WithRoutingErrorHandler(runtime.RoutingErrorHandlerFunc(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) { + httpConf.GetHttpMux().ServeHTTP(w, r) + }))) httpConf.SetMux(mux) s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.Info("start") @@ -146,7 +146,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) { } s.Logger = slog.New( console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}), - ) + ).With("server", s.ID) s.registerHandler() if httpConf.ListenAddrTLS != "" { @@ -322,6 +322,15 @@ func (s *Server) eventLoop() { v.Fulfill(ErrNotFound) } continue + case *pb.StreamListRequest: + var streams []*pb.StreamInfo + for _, publisher := range s.Streams.Items { + streams = append(streams, &pb.StreamInfo{ + Path: publisher.StreamPath, + }) + } + v.Resolve(&pb.StreamListResponse{List: streams}) + continue } } for _, plugin := range s.Plugins {