Multiple field-tested bugfixes
Some checks failed
rolling-release / build (push) Has been cancelled
rolling-release / rolling-release (push) Has been cancelled

This commit is contained in:
Dmitrii Okunev
2025-07-27 22:07:43 +01:00
parent d7325ef61b
commit ee80a08343
9 changed files with 276 additions and 65 deletions

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
@@ -136,16 +137,19 @@ func (k *Kick) keepAliveLoop(
for {
if k.Channel == nil { // TODO: fix non-atomicity
logger.Warnf(ctx, "channel info is not set, yet")
time.Sleep(time.Second)
continue
}
client := k.GetClient()
if client == nil {
logger.Errorf(ctx, "client is not initialized")
time.Sleep(time.Second)
continue
}
_, err := client.GetLivestreams(k.CloseCtx, gokick.NewLivestreamListFilter().SetBroadcasterUserIDs(int(k.Channel.UserID)))
if err != nil {
logger.Errorf(ctx, "unable to get my stream status: %v", err)
time.Sleep(time.Second)
continue
}
select {
@@ -822,56 +826,110 @@ func (k *Kick) RaidTo(
return fmt.Errorf("not implemented")
}
func (k *Kick) getChanInfoViaOldClient(
ctx context.Context,
idOrLogin streamcontrol.ChatUserID,
) (_ret *gokick.ChannelResponse, _err error) {
logger.Debugf(ctx, "getChanInfoViaOldClient(ctx, '%s')")
defer func() { logger.Debugf(ctx, "/getChanInfoViaOldClient(ctx, '%s'): %v %v", _ret, _err) }()
chanInfo, err := k.ClientOBSOLETE.GetChannelV1(ctx, string(idOrLogin))
if err != nil {
return nil, fmt.Errorf("unable to get chan info of '%s': %w", idOrLogin, err)
}
result := &gokick.ChannelResponse{
BannerPicture: chanInfo.BannerImage.URL,
BroadcasterUserID: int(chanInfo.UserID),
Slug: chanInfo.Slug,
StreamTitle: chanInfo.Livestream.SessionTitle,
}
if len(chanInfo.RecentCategories) > 0 {
cat := chanInfo.RecentCategories[0]
result.Category = gokick.CategoryResponse{
ID: int(cat.ID),
Name: cat.Name,
Thumbnail: cat.Category.Icon,
}
}
return result, nil
}
func (k *Kick) getChanInfo(
ctx context.Context,
idOrLogin streamcontrol.ChatUserID,
) (_ret *gokick.ChannelResponse, _err error) {
logger.Debugf(ctx, "getChanInfo(ctx, '%s')")
defer func() { logger.Debugf(ctx, "/getChanInfo(ctx, '%s'): %v %v", _ret, _err) }()
id, idConvErr := strconv.ParseInt(string(idOrLogin), 10, 64)
client := k.GetClient()
if client == nil {
err := fmt.Errorf("kick client is not initialized")
if idConvErr != nil {
logger.Errorf(ctx, "%v", err)
return k.getChanInfoViaOldClient(ctx, idOrLogin)
}
return nil, err
}
if idConvErr == nil {
resp, err := client.GetChannels(ctx, gokick.NewChannelListFilter().SetBroadcasterUserIDs([]int{int(id)}))
if err != nil {
return nil, fmt.Errorf("unable to request channel info by id %d: %w", id, err)
}
if len(resp.Result) != 0 {
return &resp.Result[0], nil
}
}
resp, err := client.GetChannels(ctx, gokick.NewChannelListFilter().SetSlug([]string{string(idOrLogin)}))
if err != nil {
logger.Errorf(ctx, "unable to request channel info by slug '%s': %v", idOrLogin, err)
return k.getChanInfoViaOldClient(ctx, idOrLogin) // TODO: use an multierror to combine errors from both variants
}
if len(resp.Result) == 0 {
return nil, fmt.Errorf("user with slug or ID '%s' is not found", idOrLogin)
}
return &resp.Result[0], nil
}
func (k *Kick) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
idOrLogin streamcontrol.ChatUserID,
) (_err error) {
logger.Debugf(ctx, "Shoutout(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/Shoutout(ctx, '%s'): %v", chanID, _err) }()
logger.Debugf(ctx, "Shoutout(ctx, '%s')", idOrLogin)
defer func() { logger.Debugf(ctx, "/Shoutout(ctx, '%s'): %v", idOrLogin, _err) }()
if err := k.prepare(ctx); err != nil {
return fmt.Errorf("unable to get a prepared client: %w", err)
}
reply, err := k.ClientOBSOLETE.GetChannelV1(ctx, string(chanID))
chanInfo, err := k.getChanInfo(ctx, idOrLogin)
if err != nil {
logger.Errorf(ctx, "unable to get channel info ('%s'): %w", chanID, err)
return k.sendShoutoutMessageWithoutChanInfo(ctx, chanID)
}
if len(reply.PreviousLivestreams) == 0 {
return k.sendShoutoutMessageWithoutChanInfo(ctx, chanID)
}
return k.sendShoutoutMessage(ctx, chanID, reply.PreviousLivestreams[0])
return fmt.Errorf("unable to get channel info ('%s'): %w", idOrLogin, err)
}
func (k *Kick) sendShoutoutMessageWithoutChanInfo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (_err error) {
logger.Debugf(ctx, "sendShoutoutMessageWithoutChanInfo(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/sendShoutoutMessageWithoutChanInfo(ctx, '%s'): %v", chanID, _err) }()
if err := k.prepare(ctx); err != nil {
return fmt.Errorf("unable to get a prepared client: %w", err)
}
err := k.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! Great creator! Take a look at their channel and click that follow button! https://www.twitch.tv/%s", chanID, chanID))
if err != nil {
return fmt.Errorf("unable to send the message (case #0): %w", err)
}
return nil
return k.sendShoutoutMessage(ctx, *chanInfo)
}
func (k *Kick) sendShoutoutMessage(
ctx context.Context,
chanID streamcontrol.ChatUserID,
stream kickcom.LivestreamV1,
chanInfo gokick.ChannelResponse,
) (_err error) {
logger.Debugf(ctx, "sendShoutoutMessage(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/sendShoutoutMessage(ctx, '%s'): %v", chanID, _err) }()
logger.Debugf(ctx, "sendShoutoutMessage(ctx, '%s')", spew.Sdump(chanInfo))
defer func() { logger.Debugf(ctx, "/sendShoutoutMessage(ctx, '%s'): %v", spew.Sdump(chanInfo), _err) }()
err := k.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! Great creator! Their last stream: '%s'. Take a look at their channel and click that follow button! https://kick.com/%s", chanID, stream.SessionTitle, chanID))
var message []string
message = append(message, fmt.Sprintf("Shoutout to %s!", chanInfo.Slug))
if chanInfo.StreamTitle != "" {
message = append(message, fmt.Sprintf("Their latest stream: '%s'.", chanInfo.StreamTitle))
}
message = append(message, fmt.Sprintf("Take a look at their channel and click that follow button! https://kick.com/%s", chanInfo.Slug))
err := k.SendChatMessage(ctx, strings.Join(message, " "))
if err != nil {
return fmt.Errorf("unable to send the message (case #1): %w", err)
}

View File

@@ -0,0 +1,74 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/facebookincubator/go-belt/tool/logger/implementation/zap"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/twitch"
)
func assertNoError(err error) {
if err == nil {
return
}
log.Panic(err)
}
func main() {
l := zap.Default().WithLevel(logger.LevelTrace)
ctx := context.Background()
ctx = logger.CtxWithLogger(ctx, l)
ctx = observability.OnInsecureDebug(ctx)
logger.Default = func() logger.Logger {
return l
}
defer belt.Flush(ctx)
oldUsage := flag.Usage
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "syntax: chatlistener [options] <channel_id>\n")
oldUsage()
}
channelID := flag.String("channel-id", "", "")
clientID := flag.String("client-id", "", "")
clientSecret := flag.String("client-secret", "", "")
flag.Parse()
if flag.NArg() != 1 {
flag.Usage()
os.Exit(1)
}
user := flag.Arg(0)
cfg := twitch.Config{
Enable: new(bool),
Config: twitch.PlatformSpecificConfig{
Channel: *channelID,
ClientID: *clientID,
GetOAuthListenPorts: func() []uint16 {
return []uint16{8092}
},
},
}
cfg.Config.ClientSecret.Set(*clientSecret)
c, err := twitch.New(ctx, cfg, func(c twitch.Config) error {
return nil
})
if err != nil {
panic(err)
}
userInfo, err := c.GetUser(user)
if err != nil {
panic(err)
}
spew.Dump(userInfo)
}

View File

@@ -9,6 +9,7 @@ import (
"unicode"
"unicode/utf8"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
@@ -894,7 +895,10 @@ func (t *Twitch) IsCapable(
func (t *Twitch) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
) (_ret bool, _err error) {
logger.Debugf(ctx, "IsChannelStreaming")
defer func() { logger.Debugf(ctx, "/IsChannelStreaming: %v %v", _ret, _err) }()
reply, err := t.client.GetStreams(&helix.StreamsParams{
UserIDs: []string{string(chanID)},
})
@@ -912,15 +916,19 @@ func (t *Twitch) IsChannelStreaming(
func (t *Twitch) RaidTo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
idOrLogin streamcontrol.ChatUserID,
) (_err error) {
logger.Debugf(ctx, "RaidTo(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/RaidTo(ctx, '%s'): %v", chanID, _err) }()
logger.Debugf(ctx, "RaidTo(ctx, '%s')", idOrLogin)
defer func() { logger.Debugf(ctx, "/RaidTo(ctx, '%s'): %v", idOrLogin, _err) }()
user, err := t.GetUser(string(idOrLogin))
if err != nil {
return fmt.Errorf("unable to get user '%s': %w", idOrLogin, err)
}
params := &helix.StartRaidParams{
FromBroadcasterID: t.broadcasterID,
ToBroadcasterID: string(chanID),
ToBroadcasterID: string(user.ID),
}
logger.Debugf(ctx, "RaidTo(ctx, '%s'): %#+v", chanID, params)
logger.Debugf(ctx, "RaidTo(ctx, '%s'): %#+v", idOrLogin, params)
resp, err := t.client.StartRaid(params)
if err != nil {
return fmt.Errorf("unable to raid %#+v: %v", params, err)
@@ -929,43 +937,71 @@ func (t *Twitch) RaidTo(
return nil
}
func (t *Twitch) GetUser(idOrLogin string) (*helix.User, error) {
users, err := t.client.GetUsers(&helix.UsersParams{
IDs: []string{string(idOrLogin)},
})
if err != nil {
return nil, fmt.Errorf("unable to get user info for userID '%s': %w", idOrLogin, err)
}
if len(users.Data.Users) == 0 {
users, err = t.client.GetUsers(&helix.UsersParams{
Logins: []string{string(idOrLogin)},
})
if err != nil {
return nil, fmt.Errorf("unable to get user info for login '%s': %w", idOrLogin, err)
}
}
if len(users.Data.Users) == 0 {
return nil, fmt.Errorf("user with ID-or-login '%s' not found", idOrLogin)
}
return &users.Data.Users[0], nil
}
func (t *Twitch) Shoutout(
ctx context.Context,
chanID streamcontrol.ChatUserID,
userIDOrLogin streamcontrol.ChatUserID,
) (_err error) {
logger.Debugf(ctx, "Shoutout(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/Shoutout(ctx, '%s'): %v", chanID, _err) }()
logger.Debugf(ctx, "Shoutout(ctx, '%s')", userIDOrLogin)
defer func() { logger.Debugf(ctx, "/Shoutout(ctx, '%s'): %v", userIDOrLogin, _err) }()
params := &helix.SendShoutoutParams{
FromBroadcasterID: t.broadcasterID,
ToBroadcasterID: string(chanID),
ToBroadcasterID: string(userIDOrLogin),
ModeratorID: t.broadcasterID,
}
logger.Debugf(ctx, "Shoutout(ctx, '%s'): %#+v", chanID, params)
logger.Debugf(ctx, "Shoutout(ctx, '%s'): %#+v", userIDOrLogin, params)
_, err := t.client.SendShoutout(params)
if err != nil {
return fmt.Errorf("unable to send the shoutout (%#+v): %w", params, err)
}
user, err := t.GetUser(string(userIDOrLogin))
if err != nil {
return fmt.Errorf("unable to get user '%s': %w", userIDOrLogin, err)
}
reply, err := t.client.GetStreams(&helix.StreamsParams{
UserIDs: []string{string(chanID)},
UserIDs: []string{string(user.ID)},
})
if err != nil {
logger.Errorf(ctx, "unable to get channel info ('%s'): %w", chanID, err)
return t.sendShoutoutMessageWithoutChanInfo(ctx, chanID)
logger.Errorf(ctx, "unable to get streams info (userID: %v): %w", user.ID, err)
return t.sendShoutoutMessageWithoutChanInfo(ctx, *user)
}
if len(reply.Data.Streams) == 0 {
return t.sendShoutoutMessageWithoutChanInfo(ctx, chanID)
return t.sendShoutoutMessageWithoutChanInfo(ctx, *user)
}
return t.sendShoutoutMessage(ctx, chanID, reply.Data.Streams[0])
return t.sendShoutoutMessage(ctx, *user, reply.Data.Streams[0])
}
func (t *Twitch) sendShoutoutMessageWithoutChanInfo(
ctx context.Context,
chanID streamcontrol.ChatUserID,
user helix.User,
) (_err error) {
logger.Debugf(ctx, "sendShoutoutMessageWithoutChanInfo(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/sendShoutoutMessageWithoutChanInfo(ctx, '%s'): %v", chanID, _err) }()
err := t.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! Great creator! Take a look at their channel and click that follow button! https://www.twitch.tv/%s", chanID, chanID))
logger.Debugf(ctx, "sendShoutoutMessageWithoutChanInfo(ctx, '%s')", spew.Sdump(user))
defer func() {
logger.Debugf(ctx, "/sendShoutoutMessageWithoutChanInfo(ctx, '%s'): %v", spew.Sdump(user), _err)
}()
yearsExists := float64(int(time.Since(user.CreatedAt.Time).Hours()/24/364*10)) / 10
err := t.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! A great creator (%.1f years on Twitch)! Their self-description: '%s'. Take a look at their channel and click that follow button! https://www.twitch.tv/%s", user.DisplayName, yearsExists, user.Description, user.Login))
if err != nil {
return fmt.Errorf("unable to send the message (case #0): %w", err)
}
@@ -974,12 +1010,13 @@ func (t *Twitch) sendShoutoutMessageWithoutChanInfo(
func (t *Twitch) sendShoutoutMessage(
ctx context.Context,
chanID streamcontrol.ChatUserID,
user helix.User,
stream helix.Stream,
) (_err error) {
logger.Debugf(ctx, "sendShoutoutMessage(ctx, '%s')", chanID)
defer func() { logger.Debugf(ctx, "/sendShoutoutMessage(ctx, '%s'): %v", chanID, _err) }()
err := t.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! Great creator! Their last stream: '%s'. Take a look at their channel and click that follow button! https://www.twitch.tv/%s", chanID, stream.Title, chanID))
logger.Debugf(ctx, "sendShoutoutMessage(ctx, '%s')", spew.Sdump(user))
defer func() { logger.Debugf(ctx, "/sendShoutoutMessage(ctx, '%s'): %v", spew.Sdump(user), _err) }()
yearsExists := float64(int(time.Since(user.CreatedAt.Time).Hours()/24/364*10)) / 10
err := t.SendChatMessage(ctx, fmt.Sprintf("Shoutout to %s! A great creator (%.1f years on Twitch)! Their last stream: '%s'. Their self-description: '%s'. Take a look at their channel and click that follow button! https://www.twitch.tv/%s", user.DisplayName, yearsExists, stream.Title, user.Description, user.Login))
if err != nil {
return fmt.Errorf("unable to send the message (case #1): %w", err)
}

View File

@@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
@@ -1425,15 +1426,15 @@ func (yt *YouTube) GetChatMessagesChan(
func (yt *YouTube) SendChatMessage(
ctx context.Context,
message string,
) error {
) (_err error) {
logger.Debugf(ctx, "SendChatMessage(ctx, '%s')", message)
defer func() { logger.Debugf(ctx, "/SendChatMessage(ctx, '%s'): %v", message, _err) }()
return xsync.DoR1(ctx, &yt.currentLiveBroadcastsLocker, func() error {
var result *multierror.Error
for _, broadcast := range yt.currentLiveBroadcasts {
err := yt.YouTubeClient.InsertCommentThread(ctx, &youtube.CommentThread{
commentInfo := &youtube.CommentThread{
Snippet: &youtube.CommentThreadSnippet{
CanReply: true,
ChannelId: yt.Config.Config.ChannelID,
IsPublic: true,
TopLevelComment: &youtube.Comment{
Snippet: &youtube.CommentSnippet{
TextOriginal: message,
@@ -1441,9 +1442,11 @@ func (yt *YouTube) SendChatMessage(
},
VideoId: broadcast.Id,
},
}, []string{"snippet"})
}
logger.Tracef(ctx, "commentInfo: %s", spew.Sdump(commentInfo))
err := yt.YouTubeClient.InsertCommentThread(ctx, commentInfo, []string{"snippet"})
if err != nil {
result = multierror.Append(result, fmt.Errorf("unable to post the comment under video '%s': %w", broadcast.Id, err))
result = multierror.Append(result, fmt.Errorf("unable to post the comment under video '%s': %w: %s", broadcast.Id, err, spew.Sdump(commentInfo)))
}
}
return result.ErrorOrNil()
@@ -1453,7 +1456,9 @@ func (yt *YouTube) SendChatMessage(
func (yt *YouTube) RemoveChatMessage(
ctx context.Context,
messageID streamcontrol.ChatMessageID,
) error {
) (_err error) {
logger.Debugf(ctx, "RemoveChatMessage(ctx, '%s')", messageID)
defer func() { logger.Debugf(ctx, "/RemoveChatMessage(ctx, '%s'): %v", messageID, _err) }()
// TODO: The `messageID` value below is not a message ID, unfortunately.
// It just contains the author and the message as a temporary solution.
// Find a way to extract the message ID.
@@ -1525,7 +1530,10 @@ func (yt *YouTube) IsCapable(
func (yt *YouTube) IsChannelStreaming(
ctx context.Context,
chanID streamcontrol.ChatUserID,
) (bool, error) {
) (_ret bool, _err error) {
logger.Debugf(ctx, "IsChannelStreaming")
defer func() { logger.Debugf(ctx, "/IsChannelStreaming: %v %v", _ret, _err) }()
resp, err := yt.YouTubeClient.Search(ctx, string(chanID), EventTypeLive, []string{"snippet"})
if err != nil {
return false, fmt.Errorf("unable to search: %w", err)

View File

@@ -87,6 +87,10 @@ func (d *StreamD) shoutoutIfNeeded(
Platform: msg.Platform,
User: streamcontrol.ChatUserID(strings.ToLower(string(msg.UserID))),
}
userIDByName := config.ChatUserID{
Platform: msg.Platform,
User: streamcontrol.ChatUserID(strings.ToLower(string(msg.Username))),
}
lastShoutoutAt := d.lastShoutoutAt[userID]
logger.Debugf(ctx, "lastShoutoutAt(%#+v): %v", userID, lastShoutoutAt)
if v := time.Since(lastShoutoutAt); v < time.Hour {
@@ -113,6 +117,10 @@ func (d *StreamD) shoutoutIfNeeded(
found = true
break
}
if candidate == userIDByName {
found = true
break
}
}
if !found {

View File

@@ -305,7 +305,7 @@ func (d *StreamD) initYouTubeBackend(ctx context.Context) error {
return d.setPlatformConfig(ctx, youtube.ID, cfg)
},
d.UI.OAuthHandlerYouTube,
d.GetOAuthListenPorts,
func() []uint16 { return []uint16{8091} }, // TODO: replace with: d.GetOAuthListenPorts,
)
if err != nil {
return fmt.Errorf("unable to initialize the backend 'YouTube': %w", err)

View File

@@ -6,6 +6,7 @@ import (
"net"
"os"
"reflect"
"slices"
"sort"
"sync"
"sync/atomic"
@@ -822,6 +823,9 @@ func (d *StreamD) EndStream(ctx context.Context, platID streamcontrol.PlatformNa
logger.Errorf(ctx, "unable to raid to '%s': %v", userID.User, err)
continue
}
logger.Debugf(ctx, "sleeping for 2 seconds, just in case")
time.Sleep(2 * time.Second)
break
}
}
@@ -1146,9 +1150,7 @@ func (d *StreamD) getOAuthListenPorts() []uint16 {
ports = append(ports, k)
}
sort.Slice(ports, func(i, j int) bool {
return ports[i] < ports[j]
})
slices.Sort(ports)
logger.Default().Debugf("oauth ports: %#+v", ports)
return ports

View File

@@ -56,6 +56,8 @@ import (
// https://developers.google.com/youtube/v3/docs/videos
const youtubeTitleLength = 100
const browserDedupTimeout = time.Second * 5
type Panel struct {
StreamD api.StreamD
Screenshoter Screenshoter
@@ -178,6 +180,10 @@ type Panel struct {
currentlyPlayingChatMessageSoundCount int32
chatUIsLocker xsync.Mutex
chatUIs []chatUIInterface
lastOpenedBrowserURL string
lastOpenedBrowserURLAt time.Time
lastOpenedBrowserURLLocker xsync.Mutex
}
func New(
@@ -810,7 +816,24 @@ func (p *Panel) openBrowser(
urlString string,
reason string,
) (_err error) {
return newBrowser(p).openBrowser(ctx, urlString, reason)
logger.Debugf(ctx, "openBrowser(ctx, '%s', '%s')")
defer func() { logger.Debugf(ctx, "/openBrowser(ctx, '%s', '%s'): %v", _err) }()
return xsync.DoR1(ctx, &p.lastOpenedBrowserURLLocker, func() error {
now := time.Now()
if now.Sub(p.lastOpenedBrowserURLAt) <= browserDedupTimeout {
if p.lastOpenedBrowserURL == urlString {
logger.Debugf(ctx, "the URL was already opened recently, skipping")
return nil
}
}
err := newBrowser(p).openBrowser(ctx, urlString, reason)
if err != nil {
return err
}
p.lastOpenedBrowserURL = urlString
p.lastOpenedBrowserURLAt = now
return nil
})
}
var twitchAppsCreateLink = must(url.Parse("https://dev.twitch.tv/console/apps/create"))

View File

@@ -925,6 +925,7 @@ func (p *StreamPlayerHandler) controllerLoop(
logger.Errorf(ctx, "unable to slow down to %f: %v", curSpeed, err)
return
}
time.Sleep(100 * time.Millisecond) // let it catch up at least a bit, before changing the speed back (to avoid flickering)
return
}
if lag <= p.Config.JitterBufDuration {