Initial commit, part 1

This commit is contained in:
Dmitrii Okunev
2024-02-12 13:26:48 +00:00
commit b98d94a66d
14 changed files with 1810 additions and 0 deletions

View File

@@ -0,0 +1,269 @@
package streamctl
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
)
type StreamProfile interface {
}
type StreamControllerCommons interface {
SetTitle(ctx context.Context, title string) error
SetDescription(ctx context.Context, description string) error
InsertAdsCuePoint(ctx context.Context, ts time.Time, duration time.Duration) error
Flush(ctx context.Context) error
EndStream(ctx context.Context) error
}
type StreamController[ProfileType StreamProfile] interface {
StreamControllerCommons
ApplyProfile(ctx context.Context, profile ProfileType) error
StartStream(ctx context.Context, title string, description string, profile ProfileType, customArgs ...any) error
}
type AbstractStreamController interface {
StreamController[StreamProfile]
GetImplementation() StreamControllerCommons
StreamProfileType() reflect.Type
}
type abstractStreamController struct {
StreamController StreamControllerCommons
applyProfile func(ctx context.Context, profile StreamProfile) error
startStream func(ctx context.Context, title string, description string, profile StreamProfile, customArgs ...any) error
StreamProfileTypeValue reflect.Type
}
func (c *abstractStreamController) GetImplementation() StreamControllerCommons {
return c.StreamController
}
func (c *abstractStreamController) ApplyProfile(
ctx context.Context,
profile StreamProfile,
) error {
return c.applyProfile(ctx, profile)
}
func (c *abstractStreamController) SetTitle(
ctx context.Context,
title string,
) error {
return c.GetImplementation().SetTitle(ctx, title)
}
func (c *abstractStreamController) SetDescription(
ctx context.Context,
description string,
) error {
return c.GetImplementation().SetDescription(ctx, description)
}
func (c *abstractStreamController) InsertAdsCuePoint(
ctx context.Context,
ts time.Time,
duration time.Duration,
) error {
return c.GetImplementation().InsertAdsCuePoint(ctx, ts, duration)
}
func (c *abstractStreamController) Flush(
ctx context.Context,
) error {
return c.GetImplementation().Flush(ctx)
}
func (c *abstractStreamController) StartStream(
ctx context.Context,
title string,
description string,
profile StreamProfile,
customArgs ...any,
) error {
return c.startStream(ctx, title, description, profile)
}
func (c *abstractStreamController) EndStream(
ctx context.Context,
) error {
return c.GetImplementation().EndStream(ctx)
}
func (c *abstractStreamController) StreamProfileType() reflect.Type {
return c.StreamProfileTypeValue
}
func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController {
var zeroProfile T
profileType := reflect.TypeOf(zeroProfile)
return &abstractStreamController{
StreamController: c,
applyProfile: func(ctx context.Context, _profile StreamProfile) error {
profile, ok := _profile.(T)
if !ok {
return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile}
}
return c.ApplyProfile(ctx, profile)
},
startStream: func(ctx context.Context, title string, description string, _profile StreamProfile, customArgs ...any) error {
profile, ok := _profile.(T)
if !ok {
return ErrInvalidStreamProfileType{Expected: zeroProfile, Received: _profile}
}
return c.StartStream(ctx, title, description, profile, customArgs...)
},
StreamProfileTypeValue: profileType,
}
}
type StreamControllers []AbstractStreamController
func (s StreamControllers) ApplyProfiles(
ctx context.Context,
profiles []StreamProfile,
) error {
m := map[reflect.Type]AbstractStreamController{}
for _, c := range s {
m[c.StreamProfileType()] = c
}
var wg sync.WaitGroup
errCh := make(chan error)
for _, p := range profiles {
wg.Add(1)
go func(p StreamProfile) {
defer wg.Done()
profileType := reflect.TypeOf(p)
c, ok := m[profileType]
if !ok {
errCh <- ErrNoStreamControllerForProfile{StreamProfile: p}
return
}
if err := c.ApplyProfile(ctx, p); err != nil {
errCh <- fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
return
}
}(p)
}
go func() {
wg.Wait()
close(errCh)
}()
var result error
for err := range errCh {
result = multierror.Append(result, err)
}
return result
}
func (s StreamControllers) SetTitle(
ctx context.Context,
title string,
) error {
return s.concurrently(func(c AbstractStreamController) error {
err := c.SetTitle(ctx, title)
logger.Debugf(ctx, "SetTitle: %T: <%s>: %v", c.GetImplementation(), title, err)
if err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) SetDescription(
ctx context.Context,
description string,
) error {
return s.concurrently(func(c AbstractStreamController) error {
logger.Debugf(ctx, "SetDescription: %T: <%s>", c.GetImplementation(), description)
if err := c.SetDescription(ctx, description); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) InsertAdsCuePoint(
ctx context.Context,
ts time.Time,
duration time.Duration,
) error {
return s.concurrently(func(c AbstractStreamController) error {
if err := c.InsertAdsCuePoint(ctx, ts, duration); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) StartStream(
ctx context.Context,
title string,
description string,
profiles []StreamProfile,
customArgs ...any,
) error {
m := map[reflect.Type]StreamProfile{}
for _, p := range profiles {
m[reflect.TypeOf(p)] = p
}
return s.concurrently(func(c AbstractStreamController) error {
if err := c.StartStream(ctx, title, description, m[c.StreamProfileType()], customArgs...); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) EndStream(
ctx context.Context,
) error {
return s.concurrently(func(c AbstractStreamController) error {
if err := c.EndStream(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) Flush(
ctx context.Context,
) error {
return s.concurrently(func(c AbstractStreamController) error {
if err := c.Flush(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) concurrently(callback func(c AbstractStreamController) error) error {
var wg sync.WaitGroup
errCh := make(chan error)
for _, c := range s {
wg.Add(1)
go func(c AbstractStreamController) {
defer wg.Done()
if err := callback(c); err != nil {
errCh <- err
}
}(c)
}
go func() {
wg.Wait()
close(errCh)
}()
var result error
for err := range errCh {
result = multierror.Append(result, err)
}
return result
}