fix: config map empty

This commit is contained in:
langhuihui
2024-11-26 13:11:21 +08:00
parent f0d2d8a502
commit 201c779e90
4 changed files with 721 additions and 636 deletions

67
api.go
View File

@@ -261,7 +261,7 @@ func (s *Server) GetSubscribers(context.Context, *pb.SubscribersRequest) (res *p
func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Streams.Call(func() error { s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() { if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasAudioTrack() {
res = &pb.TrackSnapShotResponse{} data := &pb.TrackSnapShotData{}
if pub.AudioTrack.Allocator != nil { if pub.AudioTrack.Allocator != nil {
for _, memlist := range pub.AudioTrack.Allocator.GetChildren() { for _, memlist := range pub.AudioTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock var list []*pb.MemoryBlock
@@ -271,38 +271,33 @@ func (s *Server) AudioTrackSnap(_ context.Context, req *pb.StreamSnapRequest) (r
E: uint32(block.End), E: uint32(block.End),
}) })
} }
res.Memory = append(res.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
} }
} }
res.Reader = make(map[uint32]uint32)
for sub := range pub.SubscriberRange {
if sub.AudioReader == nil {
continue
}
res.Reader[uint32(sub.ID)] = sub.AudioReader.Value.Sequence
}
pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) { pub.AudioTrack.Ring.Do(func(v *pkg.AVFrame) {
if v.TryRLock() { if len(v.Wraps) > 0 {
if len(v.Wraps) > 0 { var snap pb.TrackSnapShot
var snap pb.TrackSnapShot snap.Sequence = v.Sequence
snap.Sequence = v.Sequence snap.Timestamp = uint32(v.Timestamp / time.Millisecond)
snap.Timestamp = uint32(v.Timestamp / time.Millisecond) snap.WriteTime = timestamppb.New(v.WriteTime)
snap.WriteTime = timestamppb.New(v.WriteTime) snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) snap.KeyFrame = v.IDR
snap.KeyFrame = v.IDR data.RingDataSize += uint32(v.Wraps[0].GetSize())
res.RingDataSize += uint32(v.Wraps[0].GetSize()) for i, wrap := range v.Wraps {
for i, wrap := range v.Wraps { snap.Wrap[i] = &pb.Wrap{
snap.Wrap[i] = &pb.Wrap{ Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), Size: uint32(wrap.GetSize()),
Size: uint32(wrap.GetSize()), Data: wrap.String(),
Data: wrap.String(),
}
} }
res.Ring = append(res.Ring, &snap)
} }
v.RUnlock() data.Ring = append(data.Ring, &snap)
} }
}) })
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else { } else {
err = pkg.ErrNotFound err = pkg.ErrNotFound
} }
@@ -342,7 +337,7 @@ func (s *Server) api_VideoTrack_SSE(rw http.ResponseWriter, r *http.Request) {
func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) { func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.TrackSnapShotResponse, err error) {
s.Streams.Call(func() error { s.Streams.Call(func() error {
if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() { if pub, ok := s.Streams.Get(req.StreamPath); ok && pub.HasVideoTrack() {
res = &pb.TrackSnapShotResponse{} data := &pb.TrackSnapShotData{}
if pub.VideoTrack.Allocator != nil { if pub.VideoTrack.Allocator != nil {
for _, memlist := range pub.VideoTrack.Allocator.GetChildren() { for _, memlist := range pub.VideoTrack.Allocator.GetChildren() {
var list []*pb.MemoryBlock var list []*pb.MemoryBlock
@@ -352,16 +347,9 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
E: uint32(block.End), E: uint32(block.End),
}) })
} }
res.Memory = append(res.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)}) data.Memory = append(data.Memory, &pb.MemoryBlockGroup{List: list, Size: uint32(memlist.Size)})
} }
} }
res.Reader = make(map[uint32]uint32)
for sub := range pub.SubscriberRange {
if sub.VideoReader == nil {
continue
}
res.Reader[sub.ID] = sub.VideoReader.Value.Sequence
}
pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) { pub.VideoTrack.Ring.Do(func(v *pkg.AVFrame) {
//if v.TryRLock() { //if v.TryRLock() {
if len(v.Wraps) > 0 { if len(v.Wraps) > 0 {
@@ -371,7 +359,7 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
snap.WriteTime = timestamppb.New(v.WriteTime) snap.WriteTime = timestamppb.New(v.WriteTime)
snap.Wrap = make([]*pb.Wrap, len(v.Wraps)) snap.Wrap = make([]*pb.Wrap, len(v.Wraps))
snap.KeyFrame = v.IDR snap.KeyFrame = v.IDR
res.RingDataSize += uint32(v.Wraps[0].GetSize()) data.RingDataSize += uint32(v.Wraps[0].GetSize())
for i, wrap := range v.Wraps { for i, wrap := range v.Wraps {
snap.Wrap[i] = &pb.Wrap{ snap.Wrap[i] = &pb.Wrap{
Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond), Timestamp: uint32(wrap.GetTimestamp() / time.Millisecond),
@@ -379,11 +367,16 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
Data: wrap.String(), Data: wrap.String(),
} }
} }
res.Ring = append(res.Ring, &snap) data.Ring = append(data.Ring, &snap)
} }
//v.RUnlock() //v.RUnlock()
//} //}
}) })
res = &pb.TrackSnapShotResponse{
Code: 0,
Message: "success",
Data: data,
}
} else { } else {
err = pkg.ErrNotFound err = pkg.ErrNotFound
} }

File diff suppressed because it is too large Load Diff

View File

@@ -348,13 +348,19 @@ message AudioTrackInfo {
uint32 channels =7; uint32 channels =7;
} }
message TrackSnapShotResponse { message TrackSnapShotData {
repeated TrackSnapShot ring = 1; repeated TrackSnapShot ring = 1;
uint32 ringDataSize = 2; uint32 ringDataSize = 2;
map<uint32, uint32> reader = 3; map<uint32, uint32> reader = 3;
repeated MemoryBlockGroup memory = 4; repeated MemoryBlockGroup memory = 4;
} }
message TrackSnapShotResponse {
int32 code = 1;
string message = 2;
TrackSnapShotData data = 3;
}
message VideoTrackInfo { message VideoTrackInfo {
string codec = 1; string codec = 1;
string delta = 2; string delta = 2;

View File

@@ -332,30 +332,32 @@ func (config *Config) assign(k string, v any) (target reflect.Value) {
default: default:
if ft.Kind() == reflect.Map { if ft.Kind() == reflect.Map {
target = reflect.MakeMap(ft) target = reflect.MakeMap(ft)
tmpStruct := reflect.StructOf([]reflect.StructField{ if v != nil {
{ tmpStruct := reflect.StructOf([]reflect.StructField{
Name: "Key", {
Type: ft.Key(), Name: "Key",
}, Type: ft.Key(),
}) },
tmpValue := reflect.New(tmpStruct) })
for k, v := range v.(map[string]any) { tmpValue := reflect.New(tmpStruct)
_ = yaml.Unmarshal([]byte(fmt.Sprintf("key: %s", k)), tmpValue.Interface()) for k, v := range v.(map[string]any) {
var value reflect.Value _ = yaml.Unmarshal([]byte(fmt.Sprintf("key: %s", k)), tmpValue.Interface())
if ft.Elem().Kind() == reflect.Struct { var value reflect.Value
value = reflect.New(ft.Elem()) if ft.Elem().Kind() == reflect.Struct {
defaults.SetDefaults(value.Interface()) value = reflect.New(ft.Elem())
if reflect.TypeOf(v).Kind() != reflect.Map { defaults.SetDefaults(value.Interface())
value.Elem().Field(0).Set(reflect.ValueOf(v)) if reflect.TypeOf(v).Kind() != reflect.Map {
value.Elem().Field(0).Set(reflect.ValueOf(v))
} else {
out, _ := yaml.Marshal(v)
_ = yaml.Unmarshal(out, value.Interface())
}
value = value.Elem()
} else { } else {
out, _ := yaml.Marshal(v) value = reflect.ValueOf(v)
_ = yaml.Unmarshal(out, value.Interface())
} }
value = value.Elem() target.SetMapIndex(tmpValue.Elem().Field(0), value)
} else {
value = reflect.ValueOf(v)
} }
target.SetMapIndex(tmpValue.Elem().Field(0), value)
} }
} else { } else {
tmpStruct := reflect.StructOf([]reflect.StructField{ tmpStruct := reflect.StructOf([]reflect.StructField{