diff --git a/pkg/streamcontrol/stream_control.go b/pkg/streamcontrol/stream_control.go index bd2c57d..c22a6ed 100644 --- a/pkg/streamcontrol/stream_control.go +++ b/pkg/streamcontrol/stream_control.go @@ -43,16 +43,39 @@ type StreamProfile interface { AbstractStreamProfile } +func AssertStreamProfile[T StreamProfile]( + ctx context.Context, + v AbstractStreamProfile, +) (*T, error) { + profile, ok := v.(T) + if ok { + return &profile, nil + } + + profilePtr, ok := (any)(v).(*T) + if ok { + return profilePtr, nil + } + + var zeroProfile T + return nil, ErrInvalidStreamProfileType{Expected: zeroProfile, Received: v} +} + func GetStreamProfile[T StreamProfile]( ctx context.Context, v AbstractStreamProfile, ) (*T, error) { - var profile T + profilePtr, err := AssertStreamProfile[T](ctx, v) + if err == nil { + return profilePtr, nil + } + b, err := json.Marshal(v) if err != nil { return nil, fmt.Errorf("unable to serialize: %w: %#+v", err, v) } logger.Debugf(ctx, "JSON representation: <%s>", b) + var profile T err = json.Unmarshal(b, &profile) if err != nil { return nil, fmt.Errorf("unable to deserialize: %w: <%s>", err, b) @@ -192,24 +215,18 @@ func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController return &abstractStreamController{ StreamController: c, applyProfile: func(ctx context.Context, _profile AbstractStreamProfile, customArgs ...any) error { - if _profile == nil { - return nil + profile, err := AssertStreamProfile[T](ctx, _profile) + if err != nil { + return err } - profile, ok := _profile.(T) - if !ok { - return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile} - } - return c.ApplyProfile(ctx, profile, customArgs...) + return c.ApplyProfile(ctx, *profile, customArgs...) }, startStream: func(ctx context.Context, title string, description string, _profile AbstractStreamProfile, customArgs ...any) error { - if _profile == nil { - return nil + profile, err := AssertStreamProfile[T](ctx, _profile) + if err != nil { + return err } - profile, ok := _profile.(T) - if !ok { - return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile} - } - return c.StartStream(ctx, title, description, profile, customArgs...) + return c.StartStream(ctx, title, description, *profile, customArgs...) }, StreamProfileTypeValue: profileType, } diff --git a/pkg/streampanel/panel.go b/pkg/streampanel/panel.go index 4bc6060..f99042a 100644 --- a/pkg/streampanel/panel.go +++ b/pkg/streampanel/panel.go @@ -1804,12 +1804,14 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) { p.getUpdatedStatus(ctx) observability.Go(ctx, func() { + t := time.NewTicker(time.Second * 5) for { select { case <-ctx.Done(): return case <-chStreams: case <-chConfigs: + case <-t.C: } p.getUpdatedStatus(ctx) }