Files
streamctl/pkg/streamcontrol/stream_control.go
2024-10-31 00:11:04 +00:00

430 lines
11 KiB
Go

package streamcontrol
import (
"context"
"encoding/json"
"fmt"
"io"
"reflect"
"sync"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/streamctl/pkg/observability"
)
type StreamProfileBase struct {
Parent ProfileName
Order int
}
func (profile StreamProfileBase) GetParent() (ProfileName, bool) {
if profile.Parent == "" {
return "", false
}
return profile.Parent, true
}
func (profile StreamProfileBase) GetOrder() int {
return profile.Order
}
func (profile *StreamProfileBase) SetOrder(v int) {
profile.Order = v
}
type AbstractStreamProfile interface {
GetParent() (ProfileName, bool)
GetOrder() int
}
type StreamProfile interface {
AbstractStreamProfile
}
func AssertStreamProfile[T StreamProfile](
ctx context.Context,
v AbstractStreamProfile,
) (*T, error) {
profile, ok := v.(T)
if ok {
return &profile, nil
}
profilePtr, ok := (any)(v).(*T)
if ok {
return profilePtr, nil
}
var zeroProfile T
return nil, ErrInvalidStreamProfileType{Expected: zeroProfile, Received: v}
}
func GetStreamProfile[T StreamProfile](
ctx context.Context,
v AbstractStreamProfile,
) (*T, error) {
profilePtr, err := AssertStreamProfile[T](ctx, v)
if err == nil {
return profilePtr, nil
}
b, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("unable to serialize: %w: %#+v", err, v)
}
logger.Debugf(ctx, "JSON representation: <%s>", b)
var profile T
err = json.Unmarshal(b, &profile)
if err != nil {
return nil, fmt.Errorf("unable to deserialize: %w: <%s>", err, b)
}
logger.Tracef(ctx, "converted %#+v (%s) to %#+v", v, v, profile)
return &profile, nil
}
func ConvertStreamProfiles[T StreamProfile](
ctx context.Context,
m map[ProfileName]AbstractStreamProfile,
) error {
for k, v := range m {
profile, err := GetStreamProfile[T](ctx, v)
if err != nil {
return err
}
m[k] = *profile
logger.Tracef(ctx, "converted %#+v (%s) to %#+v", v, v, profile)
}
return nil
}
type StreamStatus struct {
IsActive bool
ViewersCount *uint `json:",omitempty"`
StartedAt *time.Time `json:",omitempty"`
CustomData any `json:",omitempty"`
}
type ChatUserID string
type ChatMessageID string
type ChatMessage struct {
CreatedAt time.Time
UserID ChatUserID
Username string
MessageID ChatMessageID
Message string
}
type StreamControllerCommons interface {
io.Closer
SetTitle(ctx context.Context, title string) error
SetDescription(ctx context.Context, description string) error
InsertAdsCuePoint(ctx context.Context, ts time.Time, duration time.Duration) error
Flush(ctx context.Context) error
EndStream(ctx context.Context) error
GetStreamStatus(ctx context.Context) (*StreamStatus, error)
GetChatMessagesChan(ctx context.Context) (<-chan ChatMessage, error)
SendChatMessage(ctx context.Context, message string) error
RemoveChatMessage(ctx context.Context, messageID ChatMessageID) error
BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error
IsCapable(context.Context, Capability) bool
}
type StreamController[ProfileType StreamProfile] interface {
StreamControllerCommons
ApplyProfile(ctx context.Context, profile ProfileType, customArgs ...any) error
StartStream(
ctx context.Context,
title string,
description string,
profile ProfileType,
customArgs ...any,
) error
}
type AbstractStreamController interface {
StreamController[AbstractStreamProfile]
GetImplementation() StreamControllerCommons
StreamProfileType() reflect.Type
}
type abstractStreamController struct {
StreamController StreamControllerCommons
applyProfile func(ctx context.Context, profile AbstractStreamProfile, customArgs ...any) error
startStream func(ctx context.Context, title string, description string, profile AbstractStreamProfile, customArgs ...any) error
StreamProfileTypeValue reflect.Type
}
func (c *abstractStreamController) Close() error {
return c.StreamController.Close()
}
func (c *abstractStreamController) GetImplementation() StreamControllerCommons {
return c.StreamController
}
func (c *abstractStreamController) ApplyProfile(
ctx context.Context,
profile AbstractStreamProfile,
customArgs ...any,
) error {
return c.applyProfile(ctx, profile, customArgs...)
}
func (c *abstractStreamController) SetTitle(
ctx context.Context,
title string,
) error {
return c.GetImplementation().SetTitle(ctx, title)
}
func (c *abstractStreamController) SetDescription(
ctx context.Context,
description string,
) error {
return c.GetImplementation().SetDescription(ctx, description)
}
func (c *abstractStreamController) InsertAdsCuePoint(
ctx context.Context,
ts time.Time,
duration time.Duration,
) error {
return c.GetImplementation().InsertAdsCuePoint(ctx, ts, duration)
}
func (c *abstractStreamController) Flush(
ctx context.Context,
) error {
return c.GetImplementation().Flush(ctx)
}
func (c *abstractStreamController) StartStream(
ctx context.Context,
title string,
description string,
profile AbstractStreamProfile,
customArgs ...any,
) error {
return c.startStream(ctx, title, description, profile, customArgs...)
}
func (c *abstractStreamController) EndStream(
ctx context.Context,
) error {
return c.GetImplementation().EndStream(ctx)
}
func (c *abstractStreamController) GetStreamStatus(
ctx context.Context,
) (*StreamStatus, error) {
return c.GetImplementation().GetStreamStatus(ctx)
}
func (c *abstractStreamController) StreamProfileType() reflect.Type {
return c.StreamProfileTypeValue
}
func (c *abstractStreamController) GetChatMessagesChan(ctx context.Context) (<-chan ChatMessage, error) {
return c.StreamController.GetChatMessagesChan(ctx)
}
func (c *abstractStreamController) SendChatMessage(ctx context.Context, message string) error {
return c.StreamController.SendChatMessage(ctx, message)
}
func (c *abstractStreamController) RemoveChatMessage(ctx context.Context, messageID ChatMessageID) error {
return c.StreamController.RemoveChatMessage(ctx, messageID)
}
func (c *abstractStreamController) BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error {
return c.StreamController.BanUser(ctx, userID, reason, deadline)
}
func (c *abstractStreamController) IsCapable(ctx context.Context, cap Capability) bool {
return c.StreamController.IsCapable(ctx, cap)
}
func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController {
if c == nil {
return nil
}
var zeroProfile T
profileType := reflect.TypeOf(zeroProfile)
return &abstractStreamController{
StreamController: c,
applyProfile: func(ctx context.Context, _profile AbstractStreamProfile, customArgs ...any) error {
profile, err := AssertStreamProfile[T](ctx, _profile)
if err != nil {
return err
}
return c.ApplyProfile(ctx, *profile, customArgs...)
},
startStream: func(ctx context.Context, title string, description string, _profile AbstractStreamProfile, customArgs ...any) error {
profile, err := AssertStreamProfile[T](ctx, _profile)
if err != nil {
return err
}
return c.StartStream(ctx, title, description, *profile, customArgs...)
},
StreamProfileTypeValue: profileType,
}
}
type StreamControllers []AbstractStreamController
func (s StreamControllers) ApplyProfiles(
ctx context.Context,
profiles []AbstractStreamProfile,
) error {
m := map[reflect.Type]AbstractStreamController{}
for _, c := range s {
m[c.StreamProfileType()] = c
}
var wg sync.WaitGroup
errCh := make(chan error)
for _, p := range profiles {
wg.Add(1)
{
p := p
observability.Go(ctx, func() {
defer wg.Done()
profileType := reflect.TypeOf(p)
c, ok := m[profileType]
if !ok {
errCh <- ErrNoStreamControllerForProfile{StreamProfile: p}
return
}
if err := c.ApplyProfile(ctx, p); err != nil {
errCh <- fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
return
}
})
}
}
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
})
var result error
for err := range errCh {
result = multierror.Append(result, err)
}
return result
}
func (s StreamControllers) SetTitle(
ctx context.Context,
title string,
) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
err := c.SetTitle(ctx, title)
logger.Debugf(ctx, "SetTitle: %T: <%s>: %v", c.GetImplementation(), title, err)
if err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) SetDescription(
ctx context.Context,
description string,
) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
logger.Debugf(ctx, "SetDescription: %T: <%s>", c.GetImplementation(), description)
if err := c.SetDescription(ctx, description); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) InsertAdsCuePoint(
ctx context.Context,
ts time.Time,
duration time.Duration,
) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.InsertAdsCuePoint(ctx, ts, duration); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) StartStream(
ctx context.Context,
title string,
description string,
profiles []AbstractStreamProfile,
customArgs ...any,
) error {
m := map[reflect.Type]AbstractStreamProfile{}
for _, p := range profiles {
m[reflect.TypeOf(p)] = p
}
return s.concurrently(ctx, func(c AbstractStreamController) error {
profile := m[c.StreamProfileType()]
logger.Debugf(ctx, "profile == %#+v", profile)
if err := c.StartStream(ctx, title, description, profile, customArgs...); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) EndStream(
ctx context.Context,
) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.EndStream(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) Flush(
ctx context.Context,
) error {
return s.concurrently(ctx, func(c AbstractStreamController) error {
if err := c.Flush(ctx); err != nil {
return fmt.Errorf("StreamController %T return error: %w", c.GetImplementation(), err)
}
return nil
})
}
func (s StreamControllers) concurrently(
ctx context.Context,
callback func(c AbstractStreamController) error,
) error {
var wg sync.WaitGroup
errCh := make(chan error)
for _, c := range s {
wg.Add(1)
{
c := c
observability.Go(ctx, func() {
defer wg.Done()
if err := callback(c); err != nil {
errCh <- err
}
})
}
}
observability.Go(ctx, func() {
wg.Wait()
close(errCh)
})
var result error
for err := range errCh {
result = multierror.Append(result, err)
}
return result
}