Add support of SRT to the StreamPlayer
Some checks are pending
rolling-release / build (push) Waiting to run
rolling-release / rolling-release (push) Blocked by required conditions

This commit is contained in:
Dmitrii Okunev
2025-04-20 16:13:56 +01:00
parent de4c5302dd
commit 72e481b9b9
4 changed files with 165 additions and 21 deletions

10
go.mod
View File

@@ -25,6 +25,8 @@ replace github.com/nicklaw5/helix/v2 v2.30.1-0.20240715193454-0151ccccf980 => gi
replace github.com/asticode/go-astiav v0.35.1 => github.com/xaionaro-go/astiav v0.0.0-20250406220418-87d14d2908f9
replace github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d => github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7
require (
github.com/facebookincubator/go-belt v0.0.0-20250308011339-62fb7027b11f
github.com/go-git/go-billy/v5 v5.6.2
@@ -273,7 +275,7 @@ require (
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
github.com/xaionaro-go/audio v0.0.0-20250210102901-abfced9d5ef3
github.com/xaionaro-go/avpipeline v0.0.0-20250419205947-7bb87766fce5
github.com/xaionaro-go/avpipeline v0.0.0-20250420145942-aa8f9c9b9d23
github.com/xaionaro-go/datacounter v1.0.4
github.com/xaionaro-go/go-rtmp v0.0.0-20241009130244-1e3160f27f42
github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9
@@ -284,8 +286,8 @@ require (
github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a
github.com/xaionaro-go/observability v0.0.0-20250309200059-047fad8d76e4
github.com/xaionaro-go/player v0.0.0-20250419161659-14f8c81dc9e3
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932
github.com/xaionaro-go/player v0.0.0-20250420151227-ba7f6a6f220c
github.com/xaionaro-go/recoder v0.0.0-20250419161432-f34684c4ea4a
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2
github.com/xaionaro-go/serializable v0.0.0-20250412140540-5ac572306599
@@ -296,7 +298,7 @@ require (
github.com/xaionaro-go/xfyne v0.0.0-20241018233531-26123724a6c6
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f
github.com/xaionaro-go/xsync v0.0.0-20250113213958-aedf9c8786f5
github.com/xaionaro-go/xsync v0.0.0-20250420144932-1e27f4332d4d
github.com/yutopp/go-flv v0.3.1
golang.org/x/crypto v0.37.0
google.golang.org/grpc v1.71.1

14
go.sum
View File

@@ -165,8 +165,6 @@ github.com/bluenviron/gohlslib/v2 v2.1.4-0.20250210133907-d3dddacbb9fc h1:t1i9fo
github.com/bluenviron/gohlslib/v2 v2.1.4-0.20250210133907-d3dddacbb9fc/go.mod h1:soTVqoidOT+L08hUSDreM7DebNyjjViUiEvpWlr7EIs=
github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 h1:WK8ynLNe5UNxAkB5je95vhwifCWe/GK+ZjW3ybO7rAY=
github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800/go.mod h1:rEwUB2wda1rjnStH/mMu4SVHTLAAkZBalBp/zDlUbPc=
github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d h1:AlIFt4i8ex3cGfoxLS3JoYVzSP4MgL9aMH/rp6kiYN4=
github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
@@ -1055,6 +1053,8 @@ github.com/xaionaro-go/avmediacodec v0.0.0-20250419184228-96422828753a h1:MLcfwY
github.com/xaionaro-go/avmediacodec v0.0.0-20250419184228-96422828753a/go.mod h1:IZ3ZQcDrwrIKV4jmSdUW3PZH9ALcxNKhzmrcIt4jNUY=
github.com/xaionaro-go/avpipeline v0.0.0-20250419205947-7bb87766fce5 h1:gNqO7gVmLMmXgitJJIuGBsjFn70I0XlX73esFRaFB1w=
github.com/xaionaro-go/avpipeline v0.0.0-20250419205947-7bb87766fce5/go.mod h1:A69E7yPb5YCxGXC3uIQAKEo48VhVv9FS0N7gp2EQZp4=
github.com/xaionaro-go/avpipeline v0.0.0-20250420145942-aa8f9c9b9d23 h1:78vwXYNNv6JBY74zLgjoKFCFsIUczjzehWnye4W3QcI=
github.com/xaionaro-go/avpipeline v0.0.0-20250420145942-aa8f9c9b9d23/go.mod h1:a3cpzKODCtLlcIEY/Jb4yKOAwohGSfV96I6OTzT1/yY=
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=
github.com/xaionaro-go/datacounter v1.0.4/go.mod h1:Sf9vBevuV6w5iE6K3qJ9pWVKcyS60clWBUSQLjt5++c=
github.com/xaionaro-go/fyne/v2 v2.0.0-20250215180758-399edb421067 h1:58GgTbQcOCjv1ZZ46m6WQ8zqv0KEJe5C6D5Ls1oSHvc=
@@ -1081,6 +1081,8 @@ github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f h1:mMrVrYtH
github.com/xaionaro-go/logrustash v0.0.0-20240804141650-d48034780a5f/go.mod h1:aszOZHoPPSgKwdbJUgonps3MSODqctkNhwQDDwlw0Eg=
github.com/xaionaro-go/logwriter v0.0.0-20250111154941-c3f7a1a2d567 h1:0b0VfuC0rFdxdBItqtDuQcyxFrwLNnC7PcqZWqzHadw=
github.com/xaionaro-go/logwriter v0.0.0-20250111154941-c3f7a1a2d567/go.mod h1:chl9gIPqx9bvO9isZwNg4LaWBJelX6zCaAdQUcDU0zE=
github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7 h1:pZUl2TLAO2884WKuJ+M069MZJqE7TiyERTKi/MuNRzY=
github.com/xaionaro-go/mediacommon/v2 v2.0.0-20250420012906-03d6d69ac3b7/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k=
github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f h1:JnpJAENtSM91wovcDtSz1reG33GRVbcDHGpz+2xg9dU=
github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f/go.mod h1:ZlYASAIfB/b48MidKYXO4Zq1MiQaV5gFK1qzeGbHLRg=
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c h1:2CIIxTRox9auImHyfbfrqSyrvPaWrw5w2Yw5TkOioZw=
@@ -1089,8 +1091,10 @@ github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a h1:PyX7
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a/go.mod h1:exSKIlCibB0ww+ABDwH+YG/iNdqVfdzXBBg5LYxkxGw=
github.com/xaionaro-go/observability v0.0.0-20250309200059-047fad8d76e4 h1:IhTCWOXWLz7Y45tjm/s6Kcd+jAqV8hbpOYl7IglgEIc=
github.com/xaionaro-go/observability v0.0.0-20250309200059-047fad8d76e4/go.mod h1:j5y9LVYd0v8sJa9Ks7ZyuwFxAUpaNFHBNKBkiYipxPM=
github.com/xaionaro-go/player v0.0.0-20250419161659-14f8c81dc9e3 h1:5j6YyST7aOYQFlS+jSVHkloa0rESQRH4J8gRTrm7vsY=
github.com/xaionaro-go/player v0.0.0-20250419161659-14f8c81dc9e3/go.mod h1:sN3M+AZzf04ivl+fmZNpZgspWSmkcltAzmI+sOVp21A=
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932 h1:uDPBczg4UmPPig0l7DLlIj5XCCVIlW+7KQ4THVtqOU8=
github.com/xaionaro-go/observability v0.0.0-20250420133500-5c4d2e045932/go.mod h1:j5y9LVYd0v8sJa9Ks7ZyuwFxAUpaNFHBNKBkiYipxPM=
github.com/xaionaro-go/player v0.0.0-20250420151227-ba7f6a6f220c h1:8/yiKk07hQ5R4AqIXaNsG4dfJdh9k6sW4JuDlt9kDXc=
github.com/xaionaro-go/player v0.0.0-20250420151227-ba7f6a6f220c/go.mod h1:kPOVXcY1+M21CLRzfKks4NSUmBFTNJq1ZCBkgUC2QGc=
github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638 h1:w7Dt6Mpj36S2cWm0PkT2+D4kxrQbfCwjXZs1HqiILpE=
github.com/xaionaro-go/proxy v0.0.0-20250111150848-1f0e7b262638/go.mod h1:hOkJBFoMsnCDoZgpSPTHYbnevPgtpD16d9Xga91U+Eo=
github.com/xaionaro-go/pulse v0.0.0-20241023202712-7151fa00d4bb h1:9iHPI27CYbmJDhzEuCABQthE/DGVNvT60ybWvv3BV8w=
@@ -1120,6 +1124,8 @@ github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f h1:ofxY1akRlVdJ/
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f/go.mod h1:f0DVcqddOy1RALOgXJHwpQnkp1u1yeBX/+A2+Bf4EGc=
github.com/xaionaro-go/xsync v0.0.0-20250113213958-aedf9c8786f5 h1:l18KTPxJCTppjRNVRtE5PcKCqFBSzVPcwAw2x0vxAdo=
github.com/xaionaro-go/xsync v0.0.0-20250113213958-aedf9c8786f5/go.mod h1:FCpywNAl4a4hgzE8j7Z+TpdhBQi5WHxnI35jOrFpoQw=
github.com/xaionaro-go/xsync v0.0.0-20250420144932-1e27f4332d4d h1:paN6zI8tpStL7gEbt4m24vbOFdkNUwnDqXtT3B2dzSo=
github.com/xaionaro-go/xsync v0.0.0-20250420144932-1e27f4332d4d/go.mod h1:FCpywNAl4a4hgzE8j7Z+TpdhBQi5WHxnI35jOrFpoQw=
github.com/xaionaro-go/zerolog2belt v0.0.0-20241103164018-a3bc1ea487e5 h1:jAy7VLg8y8XE1R8jBte4PRDJzOaAE+sUfmttfB9ZcAY=
github.com/xaionaro-go/zerolog2belt v0.0.0-20241103164018-a3bc1ea487e5/go.mod h1:KJuX7yl27vU+KV6CpsWOe5ZMY4zSg70hvEhwoYdr17w=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=

View File

@@ -11,12 +11,17 @@ import (
"sync"
"time"
"github.com/asticode/go-astiav"
"github.com/davecgh/go-spew/spew"
"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/hashicorp/go-multierror"
"github.com/xaionaro-go/avpipeline/kernel"
"github.com/xaionaro-go/avpipeline/packet"
"github.com/xaionaro-go/avpipeline/processor"
"github.com/xaionaro-go/observability"
player "github.com/xaionaro-go/player/pkg/player/types"
"github.com/xaionaro-go/secret"
"github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver"
"github.com/xaionaro-go/streamctl/pkg/streamtypes"
"github.com/xaionaro-go/xcontext"
@@ -84,6 +89,12 @@ type StreamPlayerHandler struct {
StreamPlayer
Parent *StreamPlayers
Cancel context.CancelFunc
CurrentVideoTrackID int
CurrentAudioTrackID int
NextVideoTrackID int
NextVideoTrackIDCount int
}
func (sp *StreamPlayers) Create(
@@ -203,7 +214,7 @@ func (p *StreamPlayerHandler) startU(ctx context.Context) error {
StreamID2Title(p.StreamID),
playerType,
player.OptionLowLatency(true),
player.OptionCacheDuration(0),
//player.OptionCacheDuration(0),
)
if err != nil {
errmon.ObserveErrorCtx(ctx, p.Close())
@@ -290,7 +301,7 @@ func (p *StreamPlayerHandler) getInternalURL(ctx context.Context) (*url.URL, err
}
switch portSrv.Type {
case streamtypes.ServerTypeSRT:
u.RawQuery = fmt.Sprintf("streamid=publish:%s&latency=%d",
u.RawQuery = fmt.Sprintf("streamid=read:%s&latency=%d",
p.StreamID,
500_000,
)
@@ -300,7 +311,102 @@ func (p *StreamPlayerHandler) getInternalURL(ctx context.Context) (*url.URL, err
return &u, nil
}
func (p *StreamPlayerHandler) openStream(ctx context.Context) (_err error) {
func (p *StreamPlayerHandler) startObserver(
ctx context.Context,
url *url.URL,
restartFn context.CancelFunc,
) {
observability.Go(ctx, func() {
defer restartFn()
logger.Debugf(ctx, "observer started")
defer func() { logger.Debugf(ctx, "observer ended") }()
inputNode, err := processor.NewInputFromURL(ctx, url.String(), secret.New(""), kernel.InputConfig{})
if err != nil {
logger.Errorf(ctx, "unable initialize the input node: %v", err)
return
}
defer func() {
err := inputNode.Close(ctx)
if err != nil {
logger.Errorf(ctx, "unable to close the input: %v", err)
}
}()
for {
select {
case pkt, ok := <-inputNode.OutputPacketCh:
if !ok {
return
}
if err := p.acknowledgeInputPacket(ctx, pkt); err != nil {
logger.Errorf(ctx, "unable to acknowledge a packet: %v", err)
return
}
case <-ctx.Done():
return
}
}
})
}
func (p *StreamPlayerHandler) acknowledgeInputPacket(
ctx context.Context,
pkt packet.Output,
) (_err error) {
logger.Tracef(ctx, "acknowledgeInputPacket")
defer func() { logger.Tracef(ctx, "/acknowledgeInputPacket: %v", _err) }()
if pkt.Stream.CodecParameters().MediaType() != astiav.MediaTypeVideo {
return nil
}
if pkt.Flags().Has(astiav.PacketFlagKey) {
return nil
}
var trackID int
if pkt.StreamIndex() == 0 {
trackID = 1
} else {
trackID = 2
}
if trackID != p.NextVideoTrackID {
p.NextVideoTrackID = trackID
p.NextVideoTrackIDCount = 0
}
p.NextVideoTrackIDCount++
if trackID == p.CurrentVideoTrackID {
return nil
}
if p.NextVideoTrackIDCount > 10 {
p.changeTrackTo(ctx, trackID)
}
return nil
}
func (p *StreamPlayerHandler) changeTrackTo(
ctx context.Context,
trackID int,
) {
p.withPlayer(ctx, func(ctx context.Context, player player.Player) {
if err := player.SetVideoTrack(ctx, int64(trackID)); err != nil {
logger.Errorf(ctx, "unable to rotate the video track: %w", err)
}
if err := player.SetAudioTrack(ctx, int64(trackID)); err != nil {
logger.Errorf(ctx, "unable to rotate the audio track: %w", err)
}
p.CurrentVideoTrackID = trackID
p.CurrentAudioTrackID = trackID
p.NextVideoTrackIDCount = 0
})
}
func (p *StreamPlayerHandler) openStream(
ctx context.Context,
restartFn context.CancelFunc,
) (_err error) {
logger.Debugf(ctx, "openStream")
defer func() { logger.Debugf(ctx, "/openStream: %v", _err) }()
@@ -311,6 +417,9 @@ func (p *StreamPlayerHandler) openStream(ctx context.Context) (_err error) {
return fmt.Errorf("unable to get URL: %w", err)
}
logger.Debugf(ctx, "opening '%s'", u.String())
p.startObserver(ctx, u, restartFn)
err = p.withPlayer(ctx, func(ctx context.Context, player player.Player) {
ctx, cancelFn := context.WithTimeout(ctx, openURLTimeout)
defer cancelFn()
@@ -421,7 +530,7 @@ func (p *StreamPlayerHandler) controllerLoop(
logger.Debugf(ctx, "a stream started, let's open it in the player")
}
logger.Debugf(ctx, "opening the stream")
err = p.openStream(ctx)
err = p.openStream(ctx, restart)
logger.Debugf(ctx, "opened the stream: %v", err)
errmon.ObserveErrorCtx(ctx, err)
time.Sleep(2 * time.Second)
@@ -435,7 +544,7 @@ func (p *StreamPlayerHandler) controllerLoop(
case <-t.C:
}
logger.Debugf(ctx, "opening the external stream")
err := p.openStream(ctx)
err := p.openStream(ctx, restart)
logger.Debugf(ctx, "opened the external stream: %v", err)
if err != nil {
logger.Debugf(ctx, "unable to open the stream: %v", err)
@@ -492,7 +601,7 @@ func (p *StreamPlayerHandler) controllerLoop(
if link, _ := player.GetLink(ctx); link == "" {
logger.Debugf(ctx, "the link is empty for some reason, reopening the link")
observability.Go(ctx, func() {
if err := p.openStream(ctx); err != nil {
if err := p.openStream(ctx, restart); err != nil {
logger.Errorf(ctx, "unable to open link '%s': %v", link, err)
}
})
@@ -533,15 +642,15 @@ func (p *StreamPlayerHandler) controllerLoop(
restart()
return false
}
if l > time.Hour {
logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: the length is more than an hour: %v (we expect only like a second, not an hour)", l, p.StreamID)
if l > 48*time.Hour {
logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: the length is more than 48 hours: %v (we expect only like a second, not 2 days)", l, p.StreamID)
if triedToFixBadLengthViaReopen {
logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: already tried reopening the stream, did not help, so restarting")
restart()
return false
}
observability.Go(ctx, func() {
if err := p.openStream(ctx); err != nil {
if err := p.openStream(ctx, restart); err != nil {
logger.Error(ctx, "unable to re-open the stream: %v", err)
restart()
}
@@ -654,6 +763,11 @@ func (p *StreamPlayerHandler) controllerLoop(
"StreamPlayer[%s].controllerLoop: unable to get the current position: %v",
p.StreamID, err,
)
if now.Sub(posUpdatedAt) > p.Config.ReadTimeout {
logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: now == %v, posUpdatedAt == %v, pos == %v; readTimeout == %v, restarting (cannot even get a position)", p.StreamID, now, posUpdatedAt, pos, p.Config.ReadTimeout)
restart()
return
}
time.Sleep(time.Second)
return
}
@@ -723,7 +837,6 @@ func (p *StreamPlayerHandler) controllerLoop(
posUpdatedAt = now
prevPos = pos
} else {
// PUT TRACK SELECTION HERE
if now.Sub(posUpdatedAt) > p.Config.ReadTimeout {
logger.Debugf(ctx, "StreamPlayer[%s].controllerLoop: now == %v, posUpdatedAt == %v, len == %v; pos == %v; readTimeout == %v, restarting", p.StreamID, now, posUpdatedAt, l, pos, p.Config.ReadTimeout)
restart()
@@ -733,12 +846,35 @@ func (p *StreamPlayerHandler) controllerLoop(
lag := l - pos
logger.Tracef(ctx, "StreamPlayer[%s].controllerLoop: lag == %v", p.StreamID, lag)
if p.Config.JitterBufDuration > time.Second && lag < p.Config.JitterBufDuration/2 {
speed := lag.Seconds() / (p.Config.JitterBufDuration / 2).Seconds()
if speed <= 0 {
return
}
speed = float64(uint(speed*10)) / 10 // to avoid flickering (for example between 1.0001 and 1.0)
if speed < 0.5 {
speed = 0.5
}
if speed == curSpeed {
return
}
curSpeed = speed
logger.Debugf(ctx,
"StreamPlayer[%s].controllerLoop: slowing down to %f",
p.StreamID, curSpeed,
)
err := player.SetSpeed(ctx, curSpeed)
if err != nil {
logger.Errorf(ctx, "unable to slow down to %f: %v", curSpeed, err)
return
}
return
}
if lag <= p.Config.JitterBufDuration {
if curSpeed == 1 {
return
}
logger.Debugf(
ctx,
logger.Debugf(ctx,
"StreamPlayer[%s].controllerLoop: resetting the speed to 1",
p.StreamID,
)

View File

@@ -16,8 +16,8 @@ type ServerType int
const (
ServerTypeUndefined = ServerType(iota)
ServerTypeRTSP
ServerTypeSRT
ServerTypeRTSP
ServerTypeRTMP
ServerTypeHLS
ServerTypeWebRTC