feat: use my buf reader

This commit is contained in:
langhuihui
2024-04-19 16:42:14 +08:00
parent 788923749e
commit d76d69b327
21 changed files with 630 additions and 257 deletions

7
api.go
View File

@@ -39,3 +39,10 @@ func (s *Server) StopSubscribe(ctx context.Context, req *pb.StopSubscribeRequest
Success: err == nil, Success: err == nil,
}, err }, err
} }
func (s *Server) StreamList(ctx context.Context, req *pb.StreamListRequest) (res *pb.StreamListResponse, err error) {
var result any
result, err = s.Call(req)
return result.(*pb.StreamListResponse), err
}

View File

@@ -1,5 +1,5 @@
global: global:
loglevel: debug loglevel: trace
tcp: tcp:
listenaddr: :50051 listenaddr: :50051
rtmp: rtmp:
@@ -12,4 +12,8 @@ rtmp:
# subaudio: false # subaudio: false
pull: pull:
pullonsub: pullonsub:
live/pull: rtmp://localhost/live/test live/pull: rtmp://localhost/live/test
# hdl:
# pull:
# pullonstart:
# live/test: /Users/dexter/Movies/jb-demo.flv

View File

@@ -1,10 +1,10 @@
global: global:
loglevel: debug loglevel: trace
tcp: tcp:
listenaddr: :50051 listenaddr: :50051
hdl: hdl:
publish: publish:
speed: 2 # speed: 2
pull: pull:
pullonstart: pullonstart:
live/test: /Users/dexter/Movies/jb-demo.flv live/test: /Users/dexter/Movies/jb-demo.flv

View File

@@ -0,0 +1,16 @@
global:
loglevel: trace
tcp:
listenaddr: :50052
http:
listenaddr: :8081
listenaddrtls: :8555
rtmp:
tcp:
listenaddr: :1936
hdl:
publish:
# speed: 2
pull:
pullonstart:
live/test: http://localhost:8080/hdl/live/test.flv

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"time"
"m7s.live/m7s/v5" "m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug" _ "m7s.live/m7s/v5/plugin/debug"
@@ -10,6 +11,8 @@ import (
) )
func main() { func main() {
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100)) ctx := context.Background()
m7s.Run(context.Background(), "config.yaml") go m7s.Run(ctx, "config1.yaml")
time.Sleep(2 * time.Second)
m7s.NewServer().Run(ctx, "config2.yaml")
} }

View File

@@ -1,4 +1,6 @@
global: global:
# loglevel: debug loglevel: debug
tcp:
listenaddr: :50050
rtmp: rtmp:
chunksize: 2048 chunksize: 2048

View File

@@ -22,6 +22,138 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
type StreamInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
}
func (x *StreamInfo) Reset() {
*x = StreamInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamInfo) ProtoMessage() {}
func (x *StreamInfo) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamInfo.ProtoReflect.Descriptor instead.
func (*StreamInfo) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{0}
}
func (x *StreamInfo) GetPath() string {
if x != nil {
return x.Path
}
return ""
}
type StreamListRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *StreamListRequest) Reset() {
*x = StreamListRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamListRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamListRequest) ProtoMessage() {}
func (x *StreamListRequest) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamListRequest.ProtoReflect.Descriptor instead.
func (*StreamListRequest) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{1}
}
type StreamListResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
List []*StreamInfo `protobuf:"bytes,1,rep,name=list,proto3" json:"list,omitempty"`
}
func (x *StreamListResponse) Reset() {
*x = StreamListResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamListResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamListResponse) ProtoMessage() {}
func (x *StreamListResponse) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamListResponse.ProtoReflect.Descriptor instead.
func (*StreamListResponse) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{2}
}
func (x *StreamListResponse) GetList() []*StreamInfo {
if x != nil {
return x.List
}
return nil
}
type StreamSnapRequest struct { type StreamSnapRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@@ -33,7 +165,7 @@ type StreamSnapRequest struct {
func (x *StreamSnapRequest) Reset() { func (x *StreamSnapRequest) Reset() {
*x = StreamSnapRequest{} *x = StreamSnapRequest{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[0] mi := &file_global_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -46,7 +178,7 @@ func (x *StreamSnapRequest) String() string {
func (*StreamSnapRequest) ProtoMessage() {} func (*StreamSnapRequest) ProtoMessage() {}
func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message { func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[0] mi := &file_global_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -59,7 +191,7 @@ func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use StreamSnapRequest.ProtoReflect.Descriptor instead. // Deprecated: Use StreamSnapRequest.ProtoReflect.Descriptor instead.
func (*StreamSnapRequest) Descriptor() ([]byte, []int) { func (*StreamSnapRequest) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{0} return file_global_proto_rawDescGZIP(), []int{3}
} }
func (x *StreamSnapRequest) GetStreamPath() string { func (x *StreamSnapRequest) GetStreamPath() string {
@@ -82,7 +214,7 @@ type Wrap struct {
func (x *Wrap) Reset() { func (x *Wrap) Reset() {
*x = Wrap{} *x = Wrap{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[1] mi := &file_global_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -95,7 +227,7 @@ func (x *Wrap) String() string {
func (*Wrap) ProtoMessage() {} func (*Wrap) ProtoMessage() {}
func (x *Wrap) ProtoReflect() protoreflect.Message { func (x *Wrap) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[1] mi := &file_global_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -108,7 +240,7 @@ func (x *Wrap) ProtoReflect() protoreflect.Message {
// Deprecated: Use Wrap.ProtoReflect.Descriptor instead. // Deprecated: Use Wrap.ProtoReflect.Descriptor instead.
func (*Wrap) Descriptor() ([]byte, []int) { func (*Wrap) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{1} return file_global_proto_rawDescGZIP(), []int{4}
} }
func (x *Wrap) GetTimestamp() uint32 { func (x *Wrap) GetTimestamp() uint32 {
@@ -147,7 +279,7 @@ type TrackSnapShot struct {
func (x *TrackSnapShot) Reset() { func (x *TrackSnapShot) Reset() {
*x = TrackSnapShot{} *x = TrackSnapShot{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[2] mi := &file_global_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -160,7 +292,7 @@ func (x *TrackSnapShot) String() string {
func (*TrackSnapShot) ProtoMessage() {} func (*TrackSnapShot) ProtoMessage() {}
func (x *TrackSnapShot) ProtoReflect() protoreflect.Message { func (x *TrackSnapShot) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[2] mi := &file_global_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -173,7 +305,7 @@ func (x *TrackSnapShot) ProtoReflect() protoreflect.Message {
// Deprecated: Use TrackSnapShot.ProtoReflect.Descriptor instead. // Deprecated: Use TrackSnapShot.ProtoReflect.Descriptor instead.
func (*TrackSnapShot) Descriptor() ([]byte, []int) { func (*TrackSnapShot) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{2} return file_global_proto_rawDescGZIP(), []int{5}
} }
func (x *TrackSnapShot) GetSequence() uint32 { func (x *TrackSnapShot) GetSequence() uint32 {
@@ -223,7 +355,7 @@ type StreamSnapShot struct {
func (x *StreamSnapShot) Reset() { func (x *StreamSnapShot) Reset() {
*x = StreamSnapShot{} *x = StreamSnapShot{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[3] mi := &file_global_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -236,7 +368,7 @@ func (x *StreamSnapShot) String() string {
func (*StreamSnapShot) ProtoMessage() {} func (*StreamSnapShot) ProtoMessage() {}
func (x *StreamSnapShot) ProtoReflect() protoreflect.Message { func (x *StreamSnapShot) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[3] mi := &file_global_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -249,7 +381,7 @@ func (x *StreamSnapShot) ProtoReflect() protoreflect.Message {
// Deprecated: Use StreamSnapShot.ProtoReflect.Descriptor instead. // Deprecated: Use StreamSnapShot.ProtoReflect.Descriptor instead.
func (*StreamSnapShot) Descriptor() ([]byte, []int) { func (*StreamSnapShot) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{3} return file_global_proto_rawDescGZIP(), []int{6}
} }
func (x *StreamSnapShot) GetVideoTrack() []*TrackSnapShot { func (x *StreamSnapShot) GetVideoTrack() []*TrackSnapShot {
@@ -277,7 +409,7 @@ type StopSubscribeRequest struct {
func (x *StopSubscribeRequest) Reset() { func (x *StopSubscribeRequest) Reset() {
*x = StopSubscribeRequest{} *x = StopSubscribeRequest{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[4] mi := &file_global_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -290,7 +422,7 @@ func (x *StopSubscribeRequest) String() string {
func (*StopSubscribeRequest) ProtoMessage() {} func (*StopSubscribeRequest) ProtoMessage() {}
func (x *StopSubscribeRequest) ProtoReflect() protoreflect.Message { func (x *StopSubscribeRequest) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[4] mi := &file_global_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -303,7 +435,7 @@ func (x *StopSubscribeRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use StopSubscribeRequest.ProtoReflect.Descriptor instead. // Deprecated: Use StopSubscribeRequest.ProtoReflect.Descriptor instead.
func (*StopSubscribeRequest) Descriptor() ([]byte, []int) { func (*StopSubscribeRequest) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{4} return file_global_proto_rawDescGZIP(), []int{7}
} }
func (x *StopSubscribeRequest) GetId() uint32 { func (x *StopSubscribeRequest) GetId() uint32 {
@@ -324,7 +456,7 @@ type StopSubscribeResponse struct {
func (x *StopSubscribeResponse) Reset() { func (x *StopSubscribeResponse) Reset() {
*x = StopSubscribeResponse{} *x = StopSubscribeResponse{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[5] mi := &file_global_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -337,7 +469,7 @@ func (x *StopSubscribeResponse) String() string {
func (*StopSubscribeResponse) ProtoMessage() {} func (*StopSubscribeResponse) ProtoMessage() {}
func (x *StopSubscribeResponse) ProtoReflect() protoreflect.Message { func (x *StopSubscribeResponse) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[5] mi := &file_global_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -350,7 +482,7 @@ func (x *StopSubscribeResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use StopSubscribeResponse.ProtoReflect.Descriptor instead. // Deprecated: Use StopSubscribeResponse.ProtoReflect.Descriptor instead.
func (*StopSubscribeResponse) Descriptor() ([]byte, []int) { func (*StopSubscribeResponse) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{5} return file_global_proto_rawDescGZIP(), []int{8}
} }
func (x *StopSubscribeResponse) GetSuccess() bool { func (x *StopSubscribeResponse) GetSuccess() bool {
@@ -371,7 +503,7 @@ type RequestWithId struct {
func (x *RequestWithId) Reset() { func (x *RequestWithId) Reset() {
*x = RequestWithId{} *x = RequestWithId{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[6] mi := &file_global_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -384,7 +516,7 @@ func (x *RequestWithId) String() string {
func (*RequestWithId) ProtoMessage() {} func (*RequestWithId) ProtoMessage() {}
func (x *RequestWithId) ProtoReflect() protoreflect.Message { func (x *RequestWithId) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[6] mi := &file_global_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -397,7 +529,7 @@ func (x *RequestWithId) ProtoReflect() protoreflect.Message {
// Deprecated: Use RequestWithId.ProtoReflect.Descriptor instead. // Deprecated: Use RequestWithId.ProtoReflect.Descriptor instead.
func (*RequestWithId) Descriptor() ([]byte, []int) { func (*RequestWithId) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{6} return file_global_proto_rawDescGZIP(), []int{9}
} }
func (x *RequestWithId) GetId() uint32 { func (x *RequestWithId) GetId() uint32 {
@@ -414,67 +546,80 @@ var file_global_proto_rawDesc = []byte{
0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f,
0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x33, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x20,
0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04,
0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68,
0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65,
0x61, 0x74, 0x68, 0x22, 0x4c, 0x0a, 0x04, 0x57, 0x72, 0x61, 0x70, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x39, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x04, 0x6c,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6d, 0x37, 0x73, 0x2e,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x12, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74,
0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x22, 0x33, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65,
0x61, 0x22, 0xa0, 0x01, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50,
0x68, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61,
0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x4c, 0x0a, 0x04, 0x57, 0x72, 0x61, 0x70, 0x12, 0x1c, 0x0a,
0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d,
0x28, 0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1c, 0x0a, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x73,
0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12,
0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64,
0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x63, 0x61, 0x61, 0x74, 0x61, 0x22, 0xa0, 0x01, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61,
0x6e, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x77, 0x72, 0x61, 0x70, 0x18, 0x05, 0x20, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63,
0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x57, 0x72, 0x61, 0x70, 0x52, 0x04, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63,
0x77, 0x72, 0x61, 0x70, 0x22, 0x78, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02,
0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x32, 0x0a, 0x0a, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12,
0x72, 0x61, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x1c, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01,
0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x52, 0x0a, 0x28, 0x04, 0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a,
0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x75, 0x07, 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07,
0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x77, 0x72, 0x61, 0x70, 0x18,
0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x57, 0x72, 0x61, 0x70,
0x6f, 0x74, 0x52, 0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x22, 0x26, 0x52, 0x04, 0x77, 0x72, 0x61, 0x70, 0x22, 0x78, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x0a, 0x14, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x32, 0x0a, 0x0a, 0x76, 0x69, 0x64, 0x65,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d,
0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x22, 0x31, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, 0x0a, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x12, 0x32, 0x0a, 0x0a,
0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x1f, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x32, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70,
0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x53, 0x68, 0x6f, 0x74, 0x52, 0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x32, 0x80, 0x03, 0x0a, 0x06, 0x47, 0x22, 0x26, 0x0a, 0x14, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x52, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x57, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x22, 0x31, 0x0a, 0x15, 0x53, 0x74, 0x6f, 0x70,
0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1a, 0x82, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01,
0xd3, 0xe4, 0x93, 0x02, 0x14, 0x22, 0x12, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x1f, 0x0a, 0x0d, 0x52,
0x64, 0x6f, 0x77, 0x6e, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x50, 0x0a, 0x07, 0x52, 0x65, 0x73, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02,
0x74, 0x61, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x32, 0xd9, 0x03, 0x0a,
0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x06, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x52, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x6f, 0x77, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x22, 0x11, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x72, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x63, 0x0a, 0x0a, 0x53, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x12, 0x16, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x1a, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x14, 0x22, 0x12, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x50, 0x0a, 0x07, 0x52,
0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x52, 0x65, 0x71,
0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x22, 0x12, 0x20, 0x75, 0x65, 0x73, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x6e, 0x61, 0x70, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70,
0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x74, 0x79, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x22, 0x11, 0x2f, 0x61, 0x70, 0x69,
0x12, 0x6b, 0x0a, 0x0d, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x2f, 0x72, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x12, 0x57, 0x0a,
0x65, 0x12, 0x19, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x16, 0x2e, 0x6d, 0x37,
0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6d, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3,
0xe4, 0x93, 0x02, 0x12, 0x12, 0x10, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x2f, 0x6c, 0x69, 0x73, 0x74, 0x12, 0x63, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x53, 0x6e, 0x61, 0x70, 0x12, 0x16, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d,
0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f,
0x74, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x22, 0x12, 0x20, 0x2f, 0x61, 0x70, 0x69, 0x2f,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x6e, 0x61, 0x70, 0x2f, 0x7b, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x12, 0x6b, 0x0a, 0x0d, 0x53,
0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x19, 0x2e, 0x6d,
0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74,
0x22, 0x18, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x6f, 0x70, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x63, 0x72, 0x69, 0x62, 0x65, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x14, 0x5a, 0x6e, 0x73, 0x65, 0x22, 0x23, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1d, 0x22, 0x18, 0x2f, 0x61, 0x70,
0x12, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x69, 0x2f, 0x73, 0x74, 0x6f, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x2f, 0x7b, 0x69, 0x64, 0x7d, 0x3a, 0x01, 0x2a, 0x42, 0x14, 0x5a, 0x12, 0x6d, 0x37, 0x73, 0x2e,
0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@@ -489,34 +634,40 @@ func file_global_proto_rawDescGZIP() []byte {
return file_global_proto_rawDescData return file_global_proto_rawDescData
} }
var file_global_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_global_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_global_proto_goTypes = []interface{}{ var file_global_proto_goTypes = []interface{}{
(*StreamSnapRequest)(nil), // 0: m7s.StreamSnapRequest (*StreamInfo)(nil), // 0: m7s.StreamInfo
(*Wrap)(nil), // 1: m7s.Wrap (*StreamListRequest)(nil), // 1: m7s.StreamListRequest
(*TrackSnapShot)(nil), // 2: m7s.TrackSnapShot (*StreamListResponse)(nil), // 2: m7s.StreamListResponse
(*StreamSnapShot)(nil), // 3: m7s.StreamSnapShot (*StreamSnapRequest)(nil), // 3: m7s.StreamSnapRequest
(*StopSubscribeRequest)(nil), // 4: m7s.StopSubscribeRequest (*Wrap)(nil), // 4: m7s.Wrap
(*StopSubscribeResponse)(nil), // 5: m7s.StopSubscribeResponse (*TrackSnapShot)(nil), // 5: m7s.TrackSnapShot
(*RequestWithId)(nil), // 6: m7s.RequestWithId (*StreamSnapShot)(nil), // 6: m7s.StreamSnapShot
(*emptypb.Empty)(nil), // 7: google.protobuf.Empty (*StopSubscribeRequest)(nil), // 7: m7s.StopSubscribeRequest
(*StopSubscribeResponse)(nil), // 8: m7s.StopSubscribeResponse
(*RequestWithId)(nil), // 9: m7s.RequestWithId
(*emptypb.Empty)(nil), // 10: google.protobuf.Empty
} }
var file_global_proto_depIdxs = []int32{ var file_global_proto_depIdxs = []int32{
1, // 0: m7s.TrackSnapShot.wrap:type_name -> m7s.Wrap 0, // 0: m7s.StreamListResponse.list:type_name -> m7s.StreamInfo
2, // 1: m7s.StreamSnapShot.videoTrack:type_name -> m7s.TrackSnapShot 4, // 1: m7s.TrackSnapShot.wrap:type_name -> m7s.Wrap
2, // 2: m7s.StreamSnapShot.audioTrack:type_name -> m7s.TrackSnapShot 5, // 2: m7s.StreamSnapShot.videoTrack:type_name -> m7s.TrackSnapShot
6, // 3: m7s.Global.Shutdown:input_type -> m7s.RequestWithId 5, // 3: m7s.StreamSnapShot.audioTrack:type_name -> m7s.TrackSnapShot
6, // 4: m7s.Global.Restart:input_type -> m7s.RequestWithId 9, // 4: m7s.Global.Shutdown:input_type -> m7s.RequestWithId
0, // 5: m7s.Global.StreamSnap:input_type -> m7s.StreamSnapRequest 9, // 5: m7s.Global.Restart:input_type -> m7s.RequestWithId
4, // 6: m7s.Global.StopSubscribe:input_type -> m7s.StopSubscribeRequest 1, // 6: m7s.Global.StreamList:input_type -> m7s.StreamListRequest
7, // 7: m7s.Global.Shutdown:output_type -> google.protobuf.Empty 3, // 7: m7s.Global.StreamSnap:input_type -> m7s.StreamSnapRequest
7, // 8: m7s.Global.Restart:output_type -> google.protobuf.Empty 7, // 8: m7s.Global.StopSubscribe:input_type -> m7s.StopSubscribeRequest
3, // 9: m7s.Global.StreamSnap:output_type -> m7s.StreamSnapShot 10, // 9: m7s.Global.Shutdown:output_type -> google.protobuf.Empty
5, // 10: m7s.Global.StopSubscribe:output_type -> m7s.StopSubscribeResponse 10, // 10: m7s.Global.Restart:output_type -> google.protobuf.Empty
7, // [7:11] is the sub-list for method output_type 2, // 11: m7s.Global.StreamList:output_type -> m7s.StreamListResponse
3, // [3:7] is the sub-list for method input_type 6, // 12: m7s.Global.StreamSnap:output_type -> m7s.StreamSnapShot
3, // [3:3] is the sub-list for extension type_name 8, // 13: m7s.Global.StopSubscribe:output_type -> m7s.StopSubscribeResponse
3, // [3:3] is the sub-list for extension extendee 9, // [9:14] is the sub-list for method output_type
0, // [0:3] is the sub-list for field type_name 4, // [4:9] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
} }
func init() { file_global_proto_init() } func init() { file_global_proto_init() }
@@ -526,7 +677,7 @@ func file_global_proto_init() {
} }
if !protoimpl.UnsafeEnabled { if !protoimpl.UnsafeEnabled {
file_global_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamSnapRequest); i { switch v := v.(*StreamInfo); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -538,7 +689,7 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Wrap); i { switch v := v.(*StreamListRequest); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -550,7 +701,7 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TrackSnapShot); i { switch v := v.(*StreamListResponse); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -562,7 +713,7 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamSnapShot); i { switch v := v.(*StreamSnapRequest); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -574,7 +725,7 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StopSubscribeRequest); i { switch v := v.(*Wrap); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -586,7 +737,7 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StopSubscribeResponse); i { switch v := v.(*TrackSnapShot); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@@ -598,6 +749,42 @@ func file_global_proto_init() {
} }
} }
file_global_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { file_global_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamSnapShot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StopSubscribeRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StopSubscribeResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RequestWithId); i { switch v := v.(*RequestWithId); i {
case 0: case 0:
return &v.state return &v.state
@@ -616,7 +803,7 @@ func file_global_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_global_proto_rawDesc, RawDescriptor: file_global_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 7, NumMessages: 10,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@@ -135,6 +135,24 @@ func local_request_Global_Restart_0(ctx context.Context, marshaler runtime.Marsh
} }
func request_Global_StreamList_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamListRequest
var metadata runtime.ServerMetadata
msg, err := client.StreamList(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_StreamList_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamListRequest
var metadata runtime.ServerMetadata
msg, err := server.StreamList(ctx, &protoReq)
return msg, metadata, err
}
func request_Global_StreamSnap_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { func request_Global_StreamSnap_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamSnapRequest var protoReq StreamSnapRequest
var metadata runtime.ServerMetadata var metadata runtime.ServerMetadata
@@ -303,6 +321,31 @@ func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, ser
}) })
mux.Handle("GET", pattern_Global_StreamList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/StreamList", runtime.WithHTTPPathPattern("/api/stream/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_StreamList_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_StreamList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context()) ctx, cancel := context.WithCancel(req.Context())
defer cancel() defer cancel()
@@ -438,6 +481,28 @@ func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, cli
}) })
mux.Handle("GET", pattern_Global_StreamList_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/StreamList", runtime.WithHTTPPathPattern("/api/stream/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_StreamList_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_StreamList_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context()) ctx, cancel := context.WithCancel(req.Context())
defer cancel() defer cancel()
@@ -490,6 +555,8 @@ var (
pattern_Global_Restart_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"api", "restart", "id"}, "")) pattern_Global_Restart_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"api", "restart", "id"}, ""))
pattern_Global_StreamList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "stream", "list"}, ""))
pattern_Global_StreamSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "snap", "streamPath"}, "")) pattern_Global_StreamSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "snap", "streamPath"}, ""))
pattern_Global_StopSubscribe_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "stop", "subscribe", "id"}, "")) pattern_Global_StopSubscribe_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3}, []string{"api", "stop", "subscribe", "id"}, ""))
@@ -500,6 +567,8 @@ var (
forward_Global_Restart_0 = runtime.ForwardResponseMessage forward_Global_Restart_0 = runtime.ForwardResponseMessage
forward_Global_StreamList_0 = runtime.ForwardResponseMessage
forward_Global_StreamSnap_0 = runtime.ForwardResponseMessage forward_Global_StreamSnap_0 = runtime.ForwardResponseMessage
forward_Global_StopSubscribe_0 = runtime.ForwardResponseMessage forward_Global_StopSubscribe_0 = runtime.ForwardResponseMessage

View File

@@ -15,6 +15,11 @@ service Global {
post: "/api/restart/{id}" post: "/api/restart/{id}"
}; };
} }
rpc StreamList (StreamListRequest) returns (StreamListResponse) {
option (google.api.http) = {
get: "/api/stream/list"
};
}
rpc StreamSnap (StreamSnapRequest) returns (StreamSnapShot) { rpc StreamSnap (StreamSnapRequest) returns (StreamSnapShot) {
option (google.api.http) = { option (google.api.http) = {
get: "/api/stream/snap/{streamPath=**}" get: "/api/stream/snap/{streamPath=**}"
@@ -27,6 +32,15 @@ service Global {
}; };
} }
} }
message StreamInfo {
string path = 1;
}
message StreamListRequest {
}
message StreamListResponse {
repeated StreamInfo list = 1;
}
message StreamSnapRequest { message StreamSnapRequest {
string streamPath = 1; string streamPath = 1;

View File

@@ -25,6 +25,7 @@ const _ = grpc.SupportPackageIsVersion7
type GlobalClient interface { type GlobalClient interface {
Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) Shutdown(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error) Restart(ctx context.Context, in *RequestWithId, opts ...grpc.CallOption) (*emptypb.Empty, error)
StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error)
StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error)
StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error) StopSubscribe(ctx context.Context, in *StopSubscribeRequest, opts ...grpc.CallOption) (*StopSubscribeResponse, error)
} }
@@ -55,6 +56,15 @@ func (c *globalClient) Restart(ctx context.Context, in *RequestWithId, opts ...g
return out, nil return out, nil
} }
func (c *globalClient) StreamList(ctx context.Context, in *StreamListRequest, opts ...grpc.CallOption) (*StreamListResponse, error) {
out := new(StreamListResponse)
err := c.cc.Invoke(ctx, "/m7s.Global/StreamList", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *globalClient) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) { func (c *globalClient) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) {
out := new(StreamSnapShot) out := new(StreamSnapShot)
err := c.cc.Invoke(ctx, "/m7s.Global/StreamSnap", in, out, opts...) err := c.cc.Invoke(ctx, "/m7s.Global/StreamSnap", in, out, opts...)
@@ -79,6 +89,7 @@ func (c *globalClient) StopSubscribe(ctx context.Context, in *StopSubscribeReque
type GlobalServer interface { type GlobalServer interface {
Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error) Shutdown(context.Context, *RequestWithId) (*emptypb.Empty, error)
Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error)
StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error)
StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error)
StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error) StopSubscribe(context.Context, *StopSubscribeRequest) (*StopSubscribeResponse, error)
mustEmbedUnimplementedGlobalServer() mustEmbedUnimplementedGlobalServer()
@@ -94,6 +105,9 @@ func (UnimplementedGlobalServer) Shutdown(context.Context, *RequestWithId) (*emp
func (UnimplementedGlobalServer) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) { func (UnimplementedGlobalServer) Restart(context.Context, *RequestWithId) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented") return nil, status.Errorf(codes.Unimplemented, "method Restart not implemented")
} }
func (UnimplementedGlobalServer) StreamList(context.Context, *StreamListRequest) (*StreamListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamList not implemented")
}
func (UnimplementedGlobalServer) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) { func (UnimplementedGlobalServer) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamSnap not implemented") return nil, status.Errorf(codes.Unimplemented, "method StreamSnap not implemented")
} }
@@ -149,6 +163,24 @@ func _Global_Restart_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Global_StreamList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).StreamList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/StreamList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StreamList(ctx, req.(*StreamListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Global_StreamSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Global_StreamSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest) in := new(StreamSnapRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@@ -200,6 +232,10 @@ var Global_ServiceDesc = grpc.ServiceDesc{
MethodName: "Restart", MethodName: "Restart",
Handler: _Global_Restart_Handler, Handler: _Global_Restart_Handler,
}, },
{
MethodName: "StreamList",
Handler: _Global_StreamList_Handler,
},
{ {
MethodName: "StreamSnap", MethodName: "StreamSnap",
Handler: _Global_StreamSnap_Handler, Handler: _Global_StreamSnap_Handler,

View File

@@ -4,7 +4,7 @@ import (
"io" "io"
) )
const defaultBufSize = 4096 const defaultBufSize = 65536
type BufReader struct { type BufReader struct {
reader io.Reader reader io.Reader
@@ -15,7 +15,7 @@ type BufReader struct {
func NewBufReader(reader io.Reader) (r *BufReader) { func NewBufReader(reader io.Reader) (r *BufReader) {
r = &BufReader{} r = &BufReader{}
r.reader = reader r.reader = reader
r.buf.ScalableMemoryAllocator = NewScalableMemoryAllocator(4096) r.buf.ScalableMemoryAllocator = NewScalableMemoryAllocator(65536)
r.BufLen = defaultBufSize r.BufLen = defaultBufSize
return return
} }
@@ -31,12 +31,11 @@ func (r *BufReader) eat() error {
} }
func (r *BufReader) ReadByte() (byte, error) { func (r *BufReader) ReadByte() (byte, error) {
if r.buf.Length > 0 { for r.buf.Length == 0 {
return r.buf.ReadByte() err := r.eat()
} if err != nil {
err := r.eat() return 0, err
if err != nil { }
return 0, err
} }
return r.buf.ReadByte() return r.buf.ReadByte()
} }
@@ -45,22 +44,46 @@ func (r *BufReader) ReadBE(n int) (num int, err error) {
for i := range n { for i := range n {
b, err := r.ReadByte() b, err := r.ReadByte()
if err != nil { if err != nil {
return -1, err return 0, err
} }
num += int(b) << ((n - i - 1) << 3) num += int(b) << ((n - i - 1) << 3)
} }
return return
} }
func (r *BufReader) ReadLE32(n int) (num uint32, err error) {
for i := range n {
b, err := r.ReadByte()
if err != nil {
return 0, err
}
num += uint32(b) << (i << 3)
}
return
}
func (r *BufReader) ReadBE32(n int) (num uint32, err error) {
for i := range n {
b, err := r.ReadByte()
if err != nil {
return 0, err
}
num += uint32(b) << ((n - i - 1) << 3)
}
return
}
func (r *BufReader) ReadBytes(n int) (mem *RecyclableBuffers, err error) { func (r *BufReader) ReadBytes(n int) (mem *RecyclableBuffers, err error) {
mem = &RecyclableBuffers{ScalableMemoryAllocator: r.buf.ScalableMemoryAllocator} mem = &RecyclableBuffers{ScalableMemoryAllocator: r.buf.ScalableMemoryAllocator}
for r.buf.RecycleFront(); n > 0 && err == nil; err = r.eat() { for r.buf.RecycleFront(); n > 0 && err == nil; err = r.eat() {
if r.buf.Length >= n { if r.buf.Length > 0 {
mem.ReadFromBytes(r.buf.Buffers.Cut(n)...) if r.buf.Length >= n {
return mem.ReadFromBytes(r.buf.Buffers.Cut(n)...)
return
}
n -= r.buf.Length
mem.ReadFromBytes(r.buf.CutAll()...)
} }
n -= r.buf.Length
mem.ReadFromBytes(r.buf.CutAll()...)
} }
return return
} }

View File

@@ -160,6 +160,22 @@ func (buffers *Buffers) ReadBytes(n int) ([]byte, error) {
return b, err return b, err
} }
func (buffers *Buffers) WriteTo(w io.Writer) (n int64, err error) {
var buf net.Buffers
if len(buffers.Buffers) > buffers.offset {
buf = append(buf, buffers.Buffers[buffers.offset:]...)
}
if buffers.curBufLen > 0 {
buf[0] = buffers.curBuf
}
buffers.curBuf = nil
buffers.curBufLen = 0
buffers.offset = len(buffers.Buffers)
buffers.Offset = buffers.Length
buffers.Length = 0
return buf.WriteTo(w)
}
func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) { func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) {
for actual = n; buffers.Length > 0 && n > 0; buffers.skipBuf() { for actual = n; buffers.Length > 0 && n > 0; buffers.skipBuf() {
if buffers.curBufLen > n { if buffers.curBufLen > n {
@@ -248,7 +264,7 @@ func (buffers *Buffers) ClipBack(n int) []byte {
func (buffers *Buffers) CutAll() (r net.Buffers) { func (buffers *Buffers) CutAll() (r net.Buffers) {
r = append(r, buffers.curBuf) r = append(r, buffers.curBuf)
for i := buffers.offset+1; i < len(buffers.Buffers); i++ { for i := buffers.offset + 1; i < len(buffers.Buffers); i++ {
r = append(r, buffers.Buffers[i]) r = append(r, buffers.Buffers[i])
} }
if len(buffers.Buffers[buffers.offset]) == buffers.curBufLen { if len(buffers.Buffers[buffers.offset]) == buffers.curBufLen {

View File

@@ -1,7 +1,6 @@
package hdl package hdl
import ( import (
"bufio"
"errors" "errors"
"io" "io"
"net/http" "net/http"
@@ -15,17 +14,14 @@ import (
) )
type HDLPuller struct { type HDLPuller struct {
*bufio.Reader *util.BufReader
hasAudio bool hasAudio bool
hasVideo bool hasVideo bool
absTS uint32 //绝对时间戳 absTS uint32 //绝对时间戳
pool *util.ScalableMemoryAllocator
} }
func NewHDLPuller() *HDLPuller { func NewHDLPuller() *HDLPuller {
return &HDLPuller{ return &HDLPuller{}
pool: util.NewScalableMemoryAllocator(1024),
}
} }
func (puller *HDLPuller) Connect(p *m7s.Client) (err error) { func (puller *HDLPuller) Connect(p *m7s.Client) (err error) {
@@ -45,24 +41,28 @@ func (puller *HDLPuller) Connect(p *m7s.Client) (err error) {
return io.EOF return io.EOF
} }
p.Closer = res.Body p.Closer = res.Body
puller.Reader = bufio.NewReader(res.Body) puller.BufReader = util.NewBufReader(res.Body)
} }
} else { } else {
var res *os.File var res *os.File
if res, err = os.Open(p.RemoteURL); err == nil { if res, err = os.Open(p.RemoteURL); err == nil {
p.Closer = res p.Closer = res
puller.Reader = bufio.NewReader(res) puller.BufReader = util.NewBufReader(res)
} }
} }
if err == nil { if err == nil {
header := puller.pool.Malloc(13) var head *util.RecyclableBuffers
defer puller.pool.Free(header) head, err = puller.BufReader.ReadBytes(13)
if _, err = io.ReadFull(puller, header); err == nil { defer head.Recycle()
if header[0] != 'F' || header[1] != 'L' || header[2] != 'V' { if err == nil {
var flvHead [3]byte
var version, flag byte
head.ReadByteTo(&flvHead[0], &flvHead[1], &flvHead[2], &version, &flag)
if flvHead != [...]byte{'F', 'L', 'V'} {
err = errors.New("not flv file") err = errors.New("not flv file")
} else { } else {
puller.hasAudio = header[4]&0x04 != 0 puller.hasAudio = flag&0x04 != 0
puller.hasVideo = header[4]&0x01 != 0 puller.hasVideo = flag&0x01 != 0
} }
} }
} }
@@ -71,7 +71,6 @@ func (puller *HDLPuller) Connect(p *m7s.Client) (err error) {
func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) { func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) {
var startTs uint32 var startTs uint32
var buf15 [15]byte
pubConf := p.GetPublishConfig() pubConf := p.GetPublishConfig()
if !puller.hasAudio { if !puller.hasAudio {
pubConf.PubAudio = false pubConf.PubAudio = false
@@ -79,43 +78,48 @@ func (puller *HDLPuller) Pull(p *m7s.Puller) (err error) {
if !puller.hasVideo { if !puller.hasVideo {
pubConf.PubVideo = false pubConf.PubVideo = false
} }
pubaudio, pubvideo := pubConf.PubAudio, pubConf.PubVideo for offsetTs := puller.absTS; err == nil; _, err = puller.ReadBE(4) {
for offsetTs := puller.absTS; err == nil; _, err = io.ReadFull(puller, buf15[11:]) { t, err := puller.ReadByte()
tmp := util.Buffer(buf15[:11])
_, err = io.ReadFull(puller, tmp)
if err != nil { if err != nil {
return return err
} }
t := tmp.ReadByte() dataSize, err := puller.ReadBE32(3)
dataSize := tmp.ReadUint24() if err != nil {
timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24 return err
}
timestamp, err := puller.ReadBE32(3)
if err != nil {
return err
}
h, err := puller.ReadByte()
if err != nil {
return err
}
timestamp = timestamp | uint32(h)<<24
if startTs == 0 { if startTs == 0 {
startTs = timestamp startTs = timestamp
} }
tmp.ReadUint24() puller.ReadBE(3) // stream id always 0
var frame rtmp.RTMPData var frame rtmp.RTMPData
frame.ScalableMemoryAllocator = puller.pool frame.RecyclableBuffers, err = puller.ReadBytes(int(dataSize))
mem := frame.Malloc(int(dataSize))
_, err = io.ReadFull(puller, mem)
if err != nil { if err != nil {
frame.Recycle() frame.Recycle()
return return err
} }
frame.ReadFromBytes(mem)
puller.absTS = offsetTs + (timestamp - startTs) puller.absTS = offsetTs + (timestamp - startTs)
frame.Timestamp = puller.absTS frame.Timestamp = puller.absTS
// fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS) // fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS)
switch t { switch t {
case FLV_TAG_TYPE_AUDIO: case FLV_TAG_TYPE_AUDIO:
if pubaudio { if pubConf.PubAudio {
p.WriteAudio(&rtmp.RTMPAudio{frame}) p.WriteAudio(frame.WrapAudio())
} }
case FLV_TAG_TYPE_VIDEO: case FLV_TAG_TYPE_VIDEO:
if pubvideo { if pubConf.PubVideo {
p.WriteVideo(&rtmp.RTMPVideo{frame}) p.WriteVideo(frame.WrapVideo())
} }
case FLV_TAG_TYPE_SCRIPT: case FLV_TAG_TYPE_SCRIPT:
p.Info("script", "data", mem) p.Info("script")
frame.Recycle() frame.Recycle()
} }
} }

View File

@@ -193,17 +193,13 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
} }
case RTMP_MSG_AUDIO: case RTMP_MSG_AUDIO:
if r, ok := receivers[msg.MessageStreamID]; ok { if r, ok := receivers[msg.MessageStreamID]; ok {
r.WriteAudio(&RTMPAudio{msg.AVData}) r.WriteAudio(msg.AVData.WrapAudio())
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = nc.ReadPool
} else { } else {
logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID) logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID)
} }
case RTMP_MSG_VIDEO: case RTMP_MSG_VIDEO:
if r, ok := receivers[msg.MessageStreamID]; ok { if r, ok := receivers[msg.MessageStreamID]; ok {
r.WriteVideo(&RTMPVideo{msg.AVData}) r.WriteVideo(msg.AVData.WrapVideo())
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = nc.ReadPool
} else { } else {
logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID) logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID)
} }

View File

@@ -29,6 +29,7 @@ type Chunk struct {
ChunkHeader ChunkHeader
AVData RTMPData AVData RTMPData
MsgData RtmpMessage MsgData RtmpMessage
bufLen int
} }
type ChunkHeader struct { type ChunkHeader struct {
@@ -82,10 +83,10 @@ func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) {
} }
type ( type (
ChunkHeader8 ChunkHeader ChunkHeader8 ChunkHeader
ChunkHeader12 ChunkHeader ChunkHeader12 ChunkHeader
ChunkHeader1 ChunkHeader ChunkHeader1 ChunkHeader
IChunkHeader interface { IChunkHeader interface {
WriteTo(*util.Buffer) WriteTo(*util.Buffer)
} }
) )

View File

@@ -110,13 +110,9 @@ func (puller *Client) Pull(p *m7s.Puller) (err error) {
} }
switch msg.MessageTypeID { switch msg.MessageTypeID {
case RTMP_MSG_AUDIO: case RTMP_MSG_AUDIO:
p.WriteAudio(&RTMPAudio{msg.AVData}) p.WriteAudio(msg.AVData.WrapAudio())
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool
case RTMP_MSG_VIDEO: case RTMP_MSG_VIDEO:
p.WriteVideo(&RTMPVideo{msg.AVData}) p.WriteVideo(msg.AVData.WrapVideo())
msg.AVData = RTMPData{}
msg.AVData.ScalableMemoryAllocator = puller.NetConnection.ReadPool
case RTMP_MSG_AMF0_COMMAND: case RTMP_MSG_AMF0_COMMAND:
cmd := msg.MsgData.(Commander).GetCommand() cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName { switch cmd.CommandName {

View File

@@ -18,8 +18,7 @@ const (
type RTMPData struct { type RTMPData struct {
Timestamp uint32 Timestamp uint32
util.Buffers *util.RecyclableBuffers
util.RecyclableMemory
} }
func (avcc *RTMPData) GetSize() int { func (avcc *RTMPData) GetSize() int {
@@ -40,3 +39,11 @@ func (avcc *RTMPData) GetTimestamp() time.Duration {
func (avcc *RTMPData) IsIDR() bool { func (avcc *RTMPData) IsIDR() bool {
return false return false
} }
func (avcc *RTMPData) WrapAudio() *RTMPAudio {
return &RTMPAudio{RTMPData: *avcc}
}
func (avcc *RTMPData) WrapVideo() *RTMPVideo {
return &RTMPVideo{RTMPData: *avcc}
}

View File

@@ -66,16 +66,10 @@ var (
// C2 S2 : 参考C1 S1 // C2 S2 : 参考C1 S1
func (nc *NetConnection) ReadBuf(length int) (buf []byte, err error) { func (nc *NetConnection) Handshake() (err error) {
buf = nc.ReadPool.Malloc(length) C0C1 := nc.writePool.Malloc(C1S1_SIZE + 1)
_, err = io.ReadFull(nc.Reader, buf) defer nc.writePool.Recycle()
return if _, err = io.ReadFull(nc.Conn, C0C1); err != nil {
}
func (nc *NetConnection) Handshake() error {
C0C1, err := nc.ReadBuf(C1S1_SIZE + 1)
defer nc.ReadPool.Free(C0C1)
if err != nil {
return err return err
} }
if C0C1[0] != RTMP_HANDSHAKE_VERSION { if C0C1[0] != RTMP_HANDSHAKE_VERSION {
@@ -96,37 +90,36 @@ func (nc *NetConnection) Handshake() error {
} }
func (client *NetConnection) ClientHandshake() (err error) { func (client *NetConnection) ClientHandshake() (err error) {
C0C1 := client.ReadPool.Malloc(C1S1_SIZE + 1) C0C1 := client.writePool.Malloc(C1S1_SIZE + 1)
defer client.ReadPool.Free(C0C1) defer client.writePool.Recycle()
C0C1[0] = RTMP_HANDSHAKE_VERSION C0C1[0] = RTMP_HANDSHAKE_VERSION
if _, err = client.Write(C0C1); err == nil { if _, err = client.Write(C0C1); err == nil {
// read S0 S1 // read S0 S1
if _, err = io.ReadFull(client.Reader, C0C1); err == nil { if _, err = io.ReadFull(client.Conn, C0C1); err == nil {
if C0C1[0] != RTMP_HANDSHAKE_VERSION { if C0C1[0] != RTMP_HANDSHAKE_VERSION {
err = errors.New("S1 C1 Error") err = errors.New("S1 C1 Error")
// C2 // C2
} else if _, err = client.Write(C0C1[1:]); err == nil { } else if _, err = client.Write(C0C1[1:]); err == nil {
_, err = io.ReadFull(client.Reader, C0C1[1:]) // S2 _, err = io.ReadFull(client.Conn, C0C1[1:]) // S2
} }
} }
} }
return return err
} }
func (nc *NetConnection) simple_handshake(C1 []byte) error { func (nc *NetConnection) simple_handshake(C1 []byte) error {
S0S1 := nc.ReadPool.Malloc(C1S1_SIZE + 1) S0S1 := nc.writePool.Malloc(C1S1_SIZE + 1)
S0S1[0] = RTMP_HANDSHAKE_VERSION S0S1[0] = RTMP_HANDSHAKE_VERSION
util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF) util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF)
copy(S0S1[5:], "Monibuca") copy(S0S1[5:], "Monibuca")
nc.Write(S0S1) nc.Write(S0S1)
nc.Write(C1) // S2 nc.Write(C1) // S2
defer nc.ReadPool.Free(S0S1) C2, err := nc.ReadBytes(C1S1_SIZE)
C2, err := nc.ReadBuf(C1S1_SIZE) defer C2.Recycle()
defer nc.ReadPool.Free(C2)
if err != nil { if err != nil {
return err return err
} }
if !bytes.Equal(C2[8:], S0S1[9:]) { if !bytes.Equal(C2.ToBytes()[8:], S0S1[9:]) {
return errors.New("C2 Error") return errors.New("C2 Error")
} }
return nil return nil
@@ -181,8 +174,8 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error {
buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest} buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest}
buffer.WriteTo(nc) buffer.WriteTo(nc)
b, _ := nc.ReadBuf(1536) b, _ := nc.ReadBytes(1536)
nc.ReadPool.Free(b) b.Recycle()
return nil return nil
} }

View File

@@ -1,10 +1,7 @@
package rtmp package rtmp
import ( import (
"bufio"
"encoding/binary"
"errors" "errors"
"io"
"log/slog" "log/slog"
"net" "net"
"runtime" "runtime"
@@ -47,7 +44,7 @@ const (
type NetConnection struct { type NetConnection struct {
*slog.Logger `json:"-" yaml:"-"` *slog.Logger `json:"-" yaml:"-"`
*bufio.Reader `json:"-" yaml:"-"` *util.BufReader `json:"-" yaml:"-"`
net.Conn `json:"-" yaml:"-"` net.Conn `json:"-" yaml:"-"`
bandwidth uint32 bandwidth uint32
readSeqNum uint32 // 当前读的字节 readSeqNum uint32 // 当前读的字节
@@ -61,8 +58,7 @@ type NetConnection struct {
AppName string AppName string
tmpBuf util.Buffer //用来接收/发送小数据,复用内存 tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeaderBuf util.Buffer chunkHeaderBuf util.Buffer
ReadPool *util.ScalableMemoryAllocator writePool util.RecyclableMemory
WritePool util.RecyclableMemory
writing atomic.Bool // false 可写true 不可写 writing atomic.Bool // false 可写true 不可写
} }
@@ -70,26 +66,18 @@ func NewNetConnection(conn net.Conn, logger *slog.Logger) (ret *NetConnection) {
ret = &NetConnection{ ret = &NetConnection{
Logger: logger, Logger: logger,
Conn: conn, Conn: conn,
Reader: bufio.NewReader(conn), BufReader: util.NewBufReader(conn),
WriteChunkSize: RTMP_DEFAULT_CHUNK_SIZE, WriteChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
incommingChunks: make(map[uint32]*Chunk), incommingChunks: make(map[uint32]*Chunk),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3, bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make(util.Buffer, 4), tmpBuf: make(util.Buffer, 4),
chunkHeaderBuf: make(util.Buffer, 0, 20), chunkHeaderBuf: make(util.Buffer, 0, 20),
ReadPool: util.NewScalableMemoryAllocator(2048),
} }
ret.WritePool.ScalableMemoryAllocator = util.NewScalableMemoryAllocator(1024) ret.writePool.ScalableMemoryAllocator = util.NewScalableMemoryAllocator(1024)
return return
} }
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
n, err = io.ReadFull(conn.Reader, buf)
if err == nil {
conn.readSeqNum += uint32(n)
}
return
}
func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) { func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID}) return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID})
} }
@@ -136,41 +124,47 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
// println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType) // println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType)
chunk, ok := conn.incommingChunks[ChunkStreamID] chunk, ok := conn.incommingChunks[ChunkStreamID]
if ChunkType != 3 && ok && chunk.AVData.Length > 0 { if ChunkType != 3 && ok && chunk.bufLen > 0 {
// 如果块类型不为3,那么这个rtmp的body应该为空. // 如果块类型不为3,那么这个rtmp的body应该为空.
return nil, errors.New("incompleteRtmpBody error") return nil, errors.New("incompleteRtmpBody error")
} }
if !ok { if !ok {
chunk = &Chunk{} chunk = &Chunk{}
conn.incommingChunks[ChunkStreamID] = chunk conn.incommingChunks[ChunkStreamID] = chunk
chunk.AVData.ScalableMemoryAllocator = conn.ReadPool
} }
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil { if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
return nil, errors.New("get chunk type error :" + err.Error()) return nil, errors.New("get chunk type error :" + err.Error())
} }
msgLen := int(chunk.MessageLength) msgLen := int(chunk.MessageLength)
var mem *util.RecyclableBuffers
mem := chunk.AVData.Malloc(conn.readChunkSize) if unRead := msgLen - chunk.bufLen; unRead < conn.readChunkSize {
if unRead := msgLen - chunk.AVData.Length; unRead < conn.readChunkSize { mem, err = conn.ReadBytes(unRead)
mem = mem[:unRead]
}
if n, err := conn.ReadFull(mem); err != nil {
chunk.AVData.Recycle()
return nil, err
} else { } else {
conn.readSeqNum += uint32(n) mem, err = conn.ReadBytes(conn.readChunkSize)
} }
if chunk.AVData.ReadFromBytes(mem); chunk.AVData.Length == msgLen { if err != nil {
mem.Recycle()
return nil, err
}
conn.readSeqNum += uint32(mem.Length)
if chunk.bufLen == 0 {
chunk.AVData.RecyclableBuffers = mem
} else {
chunk.AVData.ReadFromBytes(mem.Buffers.Buffers...)
}
chunk.bufLen += mem.Length
if chunk.AVData.Length == msgLen {
chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp
msg = chunk msg = chunk
switch chunk.MessageTypeID { switch chunk.MessageTypeID {
case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO: case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
msg.AVData.Timestamp = chunk.ChunkHeader.ExtendTimestamp msg.AVData.Timestamp = chunk.ChunkHeader.ExtendTimestamp
default: default:
err = GetRtmpMessage(msg, msg.AVData.ToBytes())
msg.AVData.Recycle() msg.AVData.Recycle()
err = GetRtmpMessage(msg, msg.AVData.ToBytes())
} }
msg.bufLen = 0
} }
return return
} }
@@ -208,22 +202,18 @@ func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32,
} }
func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) { func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) {
conn.tmpBuf.Reset()
b4 := conn.tmpBuf.Malloc(4)
b3 := b4[:3]
if chunkType == 3 { if chunkType == 3 {
// 3个字节的时间戳 // 3个字节的时间戳
} else { } else {
// Timestamp 3 bytes // Timestamp 3 bytes
if _, err = conn.ReadFull(b3); err != nil { if h.Timestamp, err = conn.ReadBE32(3); err != nil {
return err return err
} }
util.GetBE(b3, &h.Timestamp)
if chunkType != 2 { if chunkType != 2 {
if _, err = conn.ReadFull(b3); err != nil { if h.MessageLength, err = conn.ReadBE32(3); err != nil {
return err return err
} }
util.GetBE(b3, &h.MessageLength)
// Message Type ID 1 bytes // Message Type ID 1 bytes
if h.MessageTypeID, err = conn.ReadByte(); err != nil { if h.MessageTypeID, err = conn.ReadByte(); err != nil {
return err return err
@@ -231,20 +221,18 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err er
conn.readSeqNum++ conn.readSeqNum++
if chunkType == 0 { if chunkType == 0 {
// Message Stream ID 4bytes // Message Stream ID 4bytes
if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID if h.MessageStreamID, err = conn.ReadBE32(4); err != nil { // 读取Message Stream ID
return err return err
} }
h.MessageStreamID = binary.LittleEndian.Uint32(b4)
} }
} }
} }
// ExtendTimestamp 4 bytes // ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
if _, err = conn.ReadFull(b4); err != nil { if h.Timestamp, err = conn.ReadBE32(4); err != nil {
return err return err
} }
util.GetBE(b4, &h.Timestamp)
} }
if chunkType == 0 { if chunkType == 0 {
h.ExtendTimestamp = h.Timestamp h.ExtendTimestamp = h.Timestamp
@@ -331,6 +319,5 @@ func (conn *NetConnection) sendChunk(r util.Buffers, head *ChunkHeader, headType
var nw int64 var nw int64
nw, err = chunks.WriteTo(conn.Conn) nw, err = chunks.WriteTo(conn.Conn)
conn.writeSeqNum += uint32(nw) conn.writeSeqNum += uint32(nw)
conn.WritePool.Recycle()
return err return err
} }

View File

@@ -5,9 +5,9 @@ import (
"sync" "sync"
"time" "time"
"m7s.live/m7s/v5/pb"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pb"
) )
type PublisherState int type PublisherState int
@@ -136,7 +136,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
} }
frame.Timestamp = max(1, p.baseTs+ts) frame.Timestamp = max(1, p.baseTs+ts)
p.lastTs = frame.Timestamp p.lastTs = frame.Timestamp
p.Trace("write", "seq", frame.Sequence) p.Trace("write", "seq", frame.Sequence, "ts", frame.Timestamp, "codec", t.Codec.String(), "size", frame.Wrap.GetSize(), "data", frame.Wrap.String())
t.Step() t.Step()
p.speedControl(p.Publish.Speed, p.lastTs) p.speedControl(p.Publish.Speed, p.lastTs)
} }
@@ -167,7 +167,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) {
if t.IDRing != nil { if t.IDRing != nil {
p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence) p.GOP = int(t.Value.Sequence - t.IDRing.Value.Sequence)
if t.HistoryRing == nil { if t.HistoryRing == nil {
if l := t.Size - p.GOP; l > 12 { if l := t.Size - p.GOP; l > 12 && t.Size > 100 {
t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5) t.Debug("resize", "gop", p.GOP, "before", t.Size, "after", t.Size-5)
t.Reduce(5) //缩小缓冲环节省内存 t.Reduce(5) //缩小缓冲环节省内存
} }
@@ -233,7 +233,10 @@ func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) {
return t return t
} }
p.RUnlock() p.RUnlock()
return p.createTransTrack(dataType) if p.AudioTrack != nil {
return p.createTransTrack(dataType)
}
return
} }
func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) { func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) {

View File

@@ -105,10 +105,10 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
} }
func (s *Server) run(ctx context.Context, conf any) (err error) { func (s *Server) run(ctx context.Context, conf any) (err error) {
mux := runtime.NewServeMux(runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), runtime.WithRoutingErrorHandler(runtime.RoutingErrorHandlerFunc(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) {
s.config.HTTP.GetHttpMux().ServeHTTP(w, r)
})))
httpConf, tcpConf := &s.config.HTTP, &s.config.TCP httpConf, tcpConf := &s.config.HTTP, &s.config.TCP
mux := runtime.NewServeMux(runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), runtime.WithRoutingErrorHandler(runtime.RoutingErrorHandlerFunc(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) {
httpConf.GetHttpMux().ServeHTTP(w, r)
})))
httpConf.SetMux(mux) httpConf.SetMux(mux)
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
s.Info("start") s.Info("start")
@@ -146,7 +146,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
} }
s.Logger = slog.New( s.Logger = slog.New(
console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}), console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}),
) ).With("server", s.ID)
s.registerHandler() s.registerHandler()
if httpConf.ListenAddrTLS != "" { if httpConf.ListenAddrTLS != "" {
@@ -322,6 +322,15 @@ func (s *Server) eventLoop() {
v.Fulfill(ErrNotFound) v.Fulfill(ErrNotFound)
} }
continue continue
case *pb.StreamListRequest:
var streams []*pb.StreamInfo
for _, publisher := range s.Streams.Items {
streams = append(streams, &pb.StreamInfo{
Path: publisher.StreamPath,
})
}
v.Resolve(&pb.StreamListResponse{List: streams})
continue
} }
} }
for _, plugin := range s.Plugins { for _, plugin := range s.Plugins {