Inject auto-raid/auto-shoutout in appropriate hooking places

This commit is contained in:
Dmitrii Okunev
2025-07-13 21:26:00 +01:00
parent 057eda5c74
commit 74e0bf8124
8 changed files with 231 additions and 3 deletions

View File

@@ -7,5 +7,8 @@ const (
CapabilitySendChatMessage CapabilitySendChatMessage
CapabilityDeleteChatMessage CapabilityDeleteChatMessage
CapabilityBanUser CapabilityBanUser
CapabilityShoutout
CapabilityIsChannelStreaming
CapabilityRaid
EndOfCapability EndOfCapability
) )

View File

@@ -655,6 +655,33 @@ func (k *Kick) IsCapable(
return false return false
case streamcontrol.CapabilityBanUser: case streamcontrol.CapabilityBanUser:
return true return true
case streamcontrol.CapabilityShoutout:
return false
case streamcontrol.CapabilityIsChannelStreaming:
return false
case streamcontrol.CapabilityRaid:
return false
} }
return false return false
} }
func (k *Kick) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (k *Kick) RaidTo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}
func (k *Kick) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}

View File

@@ -295,3 +295,24 @@ func (obs *OBS) IsCapable(
) bool { ) bool {
return false return false
} }
func (obs *OBS) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (obs *OBS) RaidTo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}
func (obs *OBS) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}

View File

@@ -136,6 +136,10 @@ type StreamControllerCommons interface {
BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error BanUser(ctx context.Context, userID ChatUserID, reason string, deadline time.Time) error
IsCapable(context.Context, Capability) bool IsCapable(context.Context, Capability) bool
IsChannelStreaming(ctx context.Context, chanID ChatUserID) (bool, error)
Shoutout(ctx context.Context, chanID ChatUserID) error
RaidTo(ctx context.Context, chanID ChatUserID) error
} }
type StreamController[ProfileType StreamProfile] interface { type StreamController[ProfileType StreamProfile] interface {
@@ -250,6 +254,16 @@ func (c *abstractStreamController) IsCapable(ctx context.Context, cap Capability
return c.StreamController.IsCapable(ctx, cap) return c.StreamController.IsCapable(ctx, cap)
} }
func (c *abstractStreamController) IsChannelStreaming(ctx context.Context, chanID ChatUserID) (bool, error) {
return c.StreamController.IsChannelStreaming(ctx, chanID)
}
func (c *abstractStreamController) Shoutout(ctx context.Context, chanID ChatUserID) error {
return c.StreamController.Shoutout(ctx, chanID)
}
func (c *abstractStreamController) RaidTo(ctx context.Context, chanID ChatUserID) error {
return c.StreamController.RaidTo(ctx, chanID)
}
func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController { func ToAbstract[T StreamProfile](c StreamController[T]) AbstractStreamController {
if c == nil { if c == nil {
return nil return nil

View File

@@ -875,6 +875,33 @@ func (t *Twitch) IsCapable(
return true return true
case streamcontrol.CapabilityBanUser: case streamcontrol.CapabilityBanUser:
return true return true
case streamcontrol.CapabilityShoutout:
return false
case streamcontrol.CapabilityIsChannelStreaming:
return false
case streamcontrol.CapabilityRaid:
return false
} }
return false return false
} }
func (t *Twitch) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (t *Twitch) RaidTo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}
func (t *Twitch) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}

View File

@@ -1508,6 +1508,33 @@ func (yt *YouTube) IsCapable(
return true return true
case streamcontrol.CapabilityBanUser: case streamcontrol.CapabilityBanUser:
return false return false
case streamcontrol.CapabilityShoutout:
return false
case streamcontrol.CapabilityIsChannelStreaming:
return false
case streamcontrol.CapabilityRaid:
return false
} }
return false return false
} }
func (yt *YouTube) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
return false, fmt.Errorf("not implemented")
}
func (yt *YouTube) RaidTo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}
func (yt *YouTube) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) error {
return fmt.Errorf("not implemented")
}

View File

@@ -3,12 +3,14 @@ package streamd
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/facebookincubator/go-belt/tool/logger" "github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability" "github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol" "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
"github.com/xaionaro-go/streamctl/pkg/streamd/api" "github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/config"
) )
const ( const (
@@ -56,12 +58,87 @@ func (d *StreamD) startListeningForChatMessages(
logger.Errorf(ctx, "unable to add the message %#+v to the chat messages storage: %v", msg, err) logger.Errorf(ctx, "unable to add the message %#+v to the chat messages storage: %v", msg, err)
} }
publishEvent(ctx, d.EventBus, msg) publishEvent(ctx, d.EventBus, msg)
d.shoutoutIfNeeded(ctx, msg)
} }
} }
}) })
return nil return nil
} }
func (d *StreamD) shoutoutIfNeeded(
ctx context.Context,
msg api.ChatMessage,
) {
if !msg.IsLive {
logger.Tracef(ctx, "is not a live message")
return
}
d.lastShoutoutAtLocker.Lock()
defer d.lastShoutoutAtLocker.Unlock()
userID := config.ChatUserID{
Platform: msg.Platform,
User: streamcontrol.ChatUserID(strings.ToLower(string(msg.UserID))),
}
lastShoutoutAt := d.lastShoutoutAt[userID]
if v := time.Since(lastShoutoutAt); v < time.Hour {
logger.Tracef(ctx, "the previous shoutout was too soon: %v < %v", v, time.Hour)
return
}
cfg, err := d.GetConfig(ctx)
if err != nil {
logger.Errorf(ctx, "unable to get the config: %v", err)
return
}
found := false
for _, _candidate := range cfg.Shoutout.AutoShoutoutOnMessage {
if _candidate.Platform != msg.Platform {
continue
}
candidate := config.ChatUserID{
Platform: _candidate.Platform,
User: streamcontrol.ChatUserID(strings.ToLower(string(_candidate.User))),
}
if candidate == userID {
found = true
break
}
}
if !found {
logger.Tracef(ctx, "not in the list for auto-shoutout")
return
}
d.shoutoutIfCan(ctx, userID.Platform, userID.User)
}
func (d *StreamD) shoutoutIfCan(
ctx context.Context,
platID streamcontrol.PlatformName,
userID streamcontrol.ChatUserID,
) {
ctrl, err := d.streamController(ctx, platID)
if err != nil {
logger.Errorf(ctx, "unable to get a stream controller '%s': %v", platID, err)
return
}
if !ctrl.IsCapable(ctx, streamcontrol.CapabilityShoutout) {
logger.Errorf(ctx, "the controller '%s' does not support shoutouts", platID)
return
}
err = ctrl.Shoutout(ctx, userID)
if err != nil {
logger.Errorf(ctx, "unable to shoutout '%s' at '%s': %v", userID, platID, err)
return
}
}
func (d *StreamD) RemoveChatMessage( func (d *StreamD) RemoveChatMessage(
ctx context.Context, ctx context.Context,
platID streamcontrol.PlatformName, platID streamcontrol.PlatformName,

View File

@@ -111,6 +111,9 @@ type StreamD struct {
obsRestarter *obsRestarter obsRestarter *obsRestarter
llm *llm llm *llm
lastShoutoutAtLocker sync.Mutex
lastShoutoutAt map[config.ChatUserID]time.Time
} }
type imageHash uint64 type imageHash uint64
@@ -146,9 +149,10 @@ func New(
OBSState: OBSState{ OBSState: OBSState{
VolumeMeters: map[string][][3]float64{}, VolumeMeters: map[string][][3]float64{},
}, },
Timers: map[api.TimerID]*Timer{}, Timers: map[api.TimerID]*Timer{},
Options: Options(options).Aggregate(), Options: Options(options).Aggregate(),
ReadyChan: make(chan struct{}), ReadyChan: make(chan struct{}),
lastShoutoutAt: map[config.ChatUserID]time.Time{},
} }
// TODO: move this to Run() // TODO: move this to Run()
@@ -782,6 +786,11 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
defer publishEvent(ctx, d.EventBus, api.DiffStreams{}) defer publishEvent(ctx, d.EventBus, api.DiffStreams{})
cfg, err := d.GetConfig(ctx)
if err != nil {
return fmt.Errorf("unable to get the config: %w", err)
}
return xsync.RDoR1(ctx, &d.ControllersLocker, func() error { return xsync.RDoR1(ctx, &d.ControllersLocker, func() error {
defer d.StreamStatusCache.InvalidateCache(ctx) defer d.StreamStatusCache.InvalidateCache(ctx)
@@ -794,6 +803,29 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
return fmt.Errorf("'%s' is not initialized", platID) return fmt.Errorf("'%s' is not initialized", platID)
} }
if streamController.IsCapable(ctx, streamcontrol.CapabilityIsChannelStreaming) && streamController.IsCapable(ctx, streamcontrol.CapabilityRaid) {
for _, userID := range cfg.Raid.AutoRaidOnStreamEnd {
if userID.Platform != platID {
continue
}
isStreaming, err := streamController.IsChannelStreaming(ctx, userID.User)
if err != nil {
logger.Errorf(ctx, "unable to check if '%s' is streaming: %v", userID.User, err)
continue
}
if !isStreaming {
logger.Debugf(ctx, "checking if can raid to %v: user is not streaming", userID.User)
continue
}
err = streamController.RaidTo(ctx, userID.User)
if err != nil {
logger.Errorf(ctx, "unable to raid to '%s': %v", userID.User, err)
continue
}
break
}
}
err = streamController.EndStream(ctx) err = streamController.EndStream(ctx)
if err != nil { if err != nil {
return err return err