diff --git a/pkg/streamd/image_data_provider.go b/pkg/streamd/image_data_provider.go index 336036f..28b7089 100644 --- a/pkg/streamd/image_data_provider.go +++ b/pkg/streamd/image_data_provider.go @@ -8,40 +8,53 @@ import ( "github.com/xaionaro-go/recoder" "github.com/xaionaro-go/streamctl/pkg/streamd/config" "github.com/xaionaro-go/streamctl/pkg/streamtypes" + "github.com/xaionaro-go/xsync" ) type imageDataProvider struct { - OBSServer obs_grpc.OBSServer - OBSState *OBSState + StreamD *StreamD + OBSServer obs_grpc.OBSServer + StreamImageTakerLocker xsync.Mutex + StreamImageTakes map[streamtypes.StreamID]*streamImageTaker } var _ config.ImageDataProvider = (*imageDataProvider)(nil) func newImageDataProvider( + streamD *StreamD, obsServer obs_grpc.OBSServer, - obsState *OBSState, ) *imageDataProvider { return &imageDataProvider{ + StreamD: streamD, OBSServer: obsServer, - OBSState: obsState, } } -func (img *imageDataProvider) GetOBSServer( +func (p *imageDataProvider) GetOBSServer( ctx context.Context, ) (obs_grpc.OBSServer, error) { - return img.OBSServer, nil + return p.OBSServer, nil } -func (img *imageDataProvider) GetOBSState( +func (p *imageDataProvider) GetOBSState( ctx context.Context, ) (*streamtypes.OBSState, error) { - return img.OBSState, nil + return &p.StreamD.OBSState, nil } -func (img *imageDataProvider) GetCurrentStreamFrame( +func (p *imageDataProvider) GetCurrentStreamFrame( ctx context.Context, streamID streamtypes.StreamID, ) ([]byte, recoder.VideoCodec, error) { - return nil, 0, fmt.Errorf("not implemented") + return xsync.DoR3(ctx, &p.StreamImageTakerLocker, func() ([]byte, recoder.VideoCodec, error) { + streamImageTaker := p.StreamImageTakes[streamID] + if streamImageTaker == nil || !streamImageTaker.Keepalive() { + var err error + streamImageTaker, err = p.newStreamImageTaker(ctx, streamID) + if err != nil { + return nil, 0, fmt.Errorf("unable to initialize a stream image taker: %w", err) + } + } + return streamImageTaker.GetLastFrame(ctx) + }) } diff --git a/pkg/streamd/image_taker.go b/pkg/streamd/image_taker.go index 2054421..3f31862 100644 --- a/pkg/streamd/image_taker.go +++ b/pkg/streamd/image_taker.go @@ -120,7 +120,7 @@ func (d *StreamD) restartImageTakerNoLock(ctx context.Context) error { return } - imageDataProvider := newImageDataProvider(obsServer, &d.OBSState) + imageDataProvider := newImageDataProvider(d, obsServer) for { var ( diff --git a/pkg/streamd/stream_image_taker.go b/pkg/streamd/stream_image_taker.go new file mode 100644 index 0000000..cc8ed39 --- /dev/null +++ b/pkg/streamd/stream_image_taker.go @@ -0,0 +1,97 @@ +package streamd + +import ( + "context" + "fmt" + "net/url" + "sort" + + "github.com/facebookincubator/go-belt/tool/logger" + "github.com/xaionaro-go/recoder" + "github.com/xaionaro-go/recoder/libav" + "github.com/xaionaro-go/streamctl/pkg/streamserver/types/streamportserver" + "github.com/xaionaro-go/streamctl/pkg/streamtypes" +) + +func (p *imageDataProvider) newStreamImageTaker( + ctx context.Context, + streamID streamtypes.StreamID, +) (_ret *streamImageTaker, _err error) { + factory := libav.NewRecoderFactory() + + r, err := factory.New(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get a recoder factory: %w", err) + } + defer func() { + if _err != nil { + r.Close() + } + }() + + myURL, err := getLocalhostEndpoint(ctx, p.StreamD.StreamServer) + if err != nil { + return nil, fmt.Errorf("unable to get an URL to myself: %w", err) + } + myURL.Path = string(streamID) + + input, err := r.NewInputFromURL(ctx, myURL.String(), "", recoder.InputConfig{}) + if err != nil { + return nil, fmt.Errorf("unable to open URL '%s': %w", myURL.String(), err) + } + defer func() { + if _err != nil { + input.Close() + } + }() + + return nil, fmt.Errorf("not implemented") +} + +type streamImageTaker struct { +} + +func (p *streamImageTaker) Keepalive() bool { + panic("not implemented") +} + +func (p *streamImageTaker) GetLastFrame( + ctx context.Context, +) ([]byte, recoder.VideoCodec, error) { + return nil, 0, fmt.Errorf("not implemented") +} + +func getLocalhostEndpoint( + ctx context.Context, + streamServer streamportserver.GetPortServerser, +) (_ret *url.URL, _err error) { + defer func() { logger.Debugf(ctx, "getLocalhostEndpoint result: %v %v", _ret, _err) }() + + portSrvs, err := streamServer.GetPortServers(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get port servers info: %w", err) + } + + sort.Slice(portSrvs, func(i, j int) bool { + a := &portSrvs[i] + b := &portSrvs[j] + if a.IsTLS != b.IsTLS { + return b.IsTLS + } + return false + }) + portSrv := portSrvs[0] + logger.Debugf(ctx, "getLocalhostEndpoint: chosen portSrv == %#+v", portSrv) + + protoString := portSrv.Type.String() + if portSrv.IsTLS { + protoString += "s" + } + urlString := fmt.Sprintf("%s://%s", protoString, portSrv.ListenAddr) + urlParsed, err := url.Parse(urlString) + if err != nil { + return nil, fmt.Errorf("unable to parse '%s': %w", urlString, err) + } + + return urlParsed, nil +} diff --git a/pkg/streampanel/dashboard.go b/pkg/streampanel/dashboard.go index dc13293..12fcb3d 100644 --- a/pkg/streampanel/dashboard.go +++ b/pkg/streampanel/dashboard.go @@ -421,7 +421,7 @@ func (w *dashboardWindow) startUpdatingNoLock( w.renderStreamStatus(ctx) observability.Go(ctx, func() { - t := time.NewTicker(200 * time.Millisecond) + t := time.NewTicker(500 * time.Millisecond) for { select { case <-ctx.Done(): diff --git a/pkg/streamserver/streamforward/stream_forward.go b/pkg/streamserver/streamforward/stream_forward.go index e62bf5e..65ac372 100644 --- a/pkg/streamserver/streamforward/stream_forward.go +++ b/pkg/streamserver/streamforward/stream_forward.go @@ -346,7 +346,7 @@ func (fwd *ActiveStreamForwarding) openInputFor( recoderInstance recoder.Recoder, publisher types.Publisher, ) (recoder.Input, error) { - inputURL, err := fwd.getLocalhostEndpoint(ctx) + inputURL, err := fwd.GetLocalhostEndpoint(ctx) if err != nil { return nil, fmt.Errorf("unable to get a localhost endpoint: %w", err) } diff --git a/pkg/streamserver/streamforward/stream_forwards.go b/pkg/streamserver/streamforward/stream_forwards.go index 3b67cdf..5e8fd02 100644 --- a/pkg/streamserver/streamforward/stream_forwards.go +++ b/pkg/streamserver/streamforward/stream_forwards.go @@ -850,8 +850,8 @@ func (s *StreamForwards) findStreamDestinationByID( ) } -func (s *StreamForwards) getLocalhostEndpoint(ctx context.Context) (_ret *url.URL, _err error) { - defer func() { logger.Debugf(ctx, "getLocalhostEndpoint result: %v %v", _ret, _err) }() +func (s *StreamForwards) GetLocalhostEndpoint(ctx context.Context) (_ret *url.URL, _err error) { + defer func() { logger.Debugf(ctx, "GetLocalhostEndpoint result: %v %v", _ret, _err) }() portSrvs, err := s.StreamServer.GetPortServers(ctx) if err != nil {