From d0c3cb066ce3762dfdb84c406a1e598ed62c6e6c Mon Sep 17 00:00:00 2001 From: Alex X Date: Mon, 21 Apr 2025 20:08:16 +0300 Subject: [PATCH] Rewrite exec backchannel --- internal/exec/exec.go | 4 +-- pkg/core/codec.go | 33 +++++++++++++++++++ pkg/pcm/backchannel.go | 69 ++++++++++++++++++++++++++++++++++++++++ pkg/stdin/backchannel.go | 59 ---------------------------------- pkg/stdin/client.go | 33 ------------------- 5 files changed, 104 insertions(+), 94 deletions(-) create mode 100644 pkg/pcm/backchannel.go delete mode 100644 pkg/stdin/backchannel.go delete mode 100644 pkg/stdin/client.go diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 89add393..711be8a2 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -19,9 +19,9 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/magic" + "github.com/AlexxIT/go2rtc/pkg/pcm" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" - "github.com/AlexxIT/go2rtc/pkg/stdin" "github.com/rs/zerolog" ) @@ -86,7 +86,7 @@ func execHandle(rawURL string) (prod core.Producer, err error) { } if query.Get("backchannel") == "1" { - return stdin.NewClient(cmd) + return pcm.NewBackchannel(cmd, query.Get("audio")) } if path == "" { diff --git a/pkg/core/codec.go b/pkg/core/codec.go index 708839b3..c7791df9 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -249,3 +249,36 @@ func DecodeH264(fmtp string) (profile string, level byte) { } return } + +func ParseCodecString(s string) *Codec { + var codec Codec + + ss := strings.Split(s, "/") + switch strings.ToLower(ss[0]) { + case "pcm_s16be", "s16be", "pcm": + codec.Name = CodecPCM + case "pcm_s16le", "s16le", "pcml": + codec.Name = CodecPCML + case "pcm_alaw", "alaw", "pcma": + codec.Name = CodecPCMA + case "pcm_mulaw", "mulaw", "pcmu": + codec.Name = CodecPCMU + case "aac", "mpeg4-generic": + codec.Name = CodecAAC + case "opus": + codec.Name = CodecOpus + case "flac": + codec.Name = CodecFLAC + default: + return nil + } + + if len(ss) >= 2 { + codec.ClockRate = uint32(Atoi(ss[1])) + } + if len(ss) >= 3 { + codec.Channels = uint16(Atoi(ss[1])) + } + + return &codec +} diff --git a/pkg/pcm/backchannel.go b/pkg/pcm/backchannel.go new file mode 100644 index 00000000..99b6e3aa --- /dev/null +++ b/pkg/pcm/backchannel.go @@ -0,0 +1,69 @@ +package pcm + +import ( + "errors" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + cmd *shell.Command +} + +func NewBackchannel(cmd *shell.Command, audio string) (core.Producer, error) { + var codec *core.Codec + + if audio == "" { + // default codec + codec = &core.Codec{Name: core.CodecPCML, ClockRate: 16000} + } else if codec = core.ParseCodecString(audio); codec == nil { + return nil, errors.New("pcm: unsupported audio format: " + audio) + } + + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{codec}, + }, + } + + return &Backchannel{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Protocol: "pipe", + Medias: medias, + Transport: cmd, + }, + cmd: cmd, + }, nil +} + +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + wr, err := c.cmd.StdinPipe() + if err != nil { + return err + } + + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := wr.Write(packet.Payload); err != nil { + c.Send += n + } + } + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Backchannel) Start() error { + return c.cmd.Run() +} diff --git a/pkg/stdin/backchannel.go b/pkg/stdin/backchannel.go deleted file mode 100644 index b154a291..00000000 --- a/pkg/stdin/backchannel.go +++ /dev/null @@ -1,59 +0,0 @@ -package stdin - -import ( - "encoding/json" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -func (c *Client) GetMedias() []*core.Media { - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - return nil, core.ErrCantGetTrack -} - -func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - if c.sender == nil { - stdin, err := c.cmd.StdinPipe() - if err != nil { - return err - } - - c.sender = core.NewSender(media, track.Codec) - c.sender.Handler = func(packet *rtp.Packet) { - _, _ = stdin.Write(packet.Payload) - c.send += len(packet.Payload) - } - } - - c.sender.HandleRTP(track) - return nil -} - -func (c *Client) Start() (err error) { - return c.cmd.Run() -} - -func (c *Client) Stop() (err error) { - if c.sender != nil { - c.sender.Close() - } - return c.cmd.Close() -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Connection{ - ID: core.ID(c), - FormatName: "exec", - Protocol: "pipe", - Medias: c.medias, - Send: c.send, - } - if c.sender != nil { - info.Senders = []*core.Sender{c.sender} - } - return json.Marshal(info) -} diff --git a/pkg/stdin/client.go b/pkg/stdin/client.go deleted file mode 100644 index a77d4459..00000000 --- a/pkg/stdin/client.go +++ /dev/null @@ -1,33 +0,0 @@ -package stdin - -import ( - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/shell" -) - -// Deprecated: should be rewritten to core.Connection -type Client struct { - cmd *shell.Command - - medias []*core.Media - sender *core.Sender - send int -} - -func NewClient(cmd *shell.Command) (*Client, error) { - c := &Client{ - cmd: cmd, - medias: []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecPCMA, ClockRate: 8000}, - {Name: core.CodecPCM}, - }, - }, - }, - } - - return c, nil -}