Initial commit, pt. 73

This commit is contained in:
Dmitrii Okunev
2024-08-04 23:06:49 +01:00
parent 2a7d947b78
commit c00508fc72
2 changed files with 34 additions and 15 deletions

View File

@@ -43,16 +43,39 @@ type StreamProfile interface {
AbstractStreamProfile AbstractStreamProfile
} }
func AssertStreamProfile[T StreamProfile](
ctx context.Context,
v AbstractStreamProfile,
) (*T, error) {
profile, ok := v.(T)
if ok {
return &profile, nil
}
profilePtr, ok := (any)(v).(*T)
if ok {
return profilePtr, nil
}
var zeroProfile T
return nil, ErrInvalidStreamProfileType{Expected: zeroProfile, Received: v}
}
func GetStreamProfile[T StreamProfile]( func GetStreamProfile[T StreamProfile](
ctx context.Context, ctx context.Context,
v AbstractStreamProfile, v AbstractStreamProfile,
) (*T, error) { ) (*T, error) {
var profile T profilePtr, err := AssertStreamProfile[T](ctx, v)
if err == nil {
return profilePtr, nil
}
b, err := json.Marshal(v) b, err := json.Marshal(v)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to serialize: %w: %#+v", err, v) return nil, fmt.Errorf("unable to serialize: %w: %#+v", err, v)
} }
logger.Debugf(ctx, "JSON representation: <%s>", b) logger.Debugf(ctx, "JSON representation: <%s>", b)
var profile T
err = json.Unmarshal(b, &profile) err = json.Unmarshal(b, &profile)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to deserialize: %w: <%s>", err, b) return nil, fmt.Errorf("unable to deserialize: %w: <%s>", err, b)
@@ -192,24 +215,18 @@ func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController
return &abstractStreamController{ return &abstractStreamController{
StreamController: c, StreamController: c,
applyProfile: func(ctx context.Context, _profile AbstractStreamProfile, customArgs ...any) error { applyProfile: func(ctx context.Context, _profile AbstractStreamProfile, customArgs ...any) error {
if _profile == nil { profile, err := AssertStreamProfile[T](ctx, _profile)
return nil if err != nil {
return err
} }
profile, ok := _profile.(T) return c.ApplyProfile(ctx, *profile, customArgs...)
if !ok {
return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile}
}
return c.ApplyProfile(ctx, profile, customArgs...)
}, },
startStream: func(ctx context.Context, title string, description string, _profile AbstractStreamProfile, customArgs ...any) error { startStream: func(ctx context.Context, title string, description string, _profile AbstractStreamProfile, customArgs ...any) error {
if _profile == nil { profile, err := AssertStreamProfile[T](ctx, _profile)
return nil if err != nil {
return err
} }
profile, ok := _profile.(T) return c.StartStream(ctx, title, description, *profile, customArgs...)
if !ok {
return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile}
}
return c.StartStream(ctx, title, description, profile, customArgs...)
}, },
StreamProfileTypeValue: profileType, StreamProfileTypeValue: profileType,
} }

View File

@@ -1804,12 +1804,14 @@ func (p *Panel) subscribeUpdateControlPage(ctx context.Context) {
p.getUpdatedStatus(ctx) p.getUpdatedStatus(ctx)
observability.Go(ctx, func() { observability.Go(ctx, func() {
t := time.NewTicker(time.Second * 5)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-chStreams: case <-chStreams:
case <-chConfigs: case <-chConfigs:
case <-t.C:
} }
p.getUpdatedStatus(ctx) p.getUpdatedStatus(ctx)
} }