Compare commits

...

14 Commits

Author SHA1 Message Date
Alex X
fce41f4fc1 Update wyoming readme about events 2025-04-24 22:06:36 +03:00
Alex X
c50e894a42 Add PlayFile function to wyoming server 2025-04-24 21:23:16 +03:00
Alex X
890fd78a6a Remove errors from wyoming server handlers 2025-04-24 18:32:42 +03:00
Alex X
518cae1476 Add support events to wyoming server 2025-04-24 17:13:51 +03:00
Alex X
545a105ba0 Add support body to expr fetch func 2025-04-22 16:37:10 +03:00
Alex X
70b4bf779e Change wyoming Event.Data type to string 2025-04-22 16:35:44 +03:00
Alex X
7cf672da84 Add readme for exec and wyoming modules 2025-04-22 14:19:40 +03:00
Alex X
80f57a0292 Add support snd mode for wyoming module 2025-04-22 13:16:57 +03:00
Alex X
3b7309d9f7 Add support mic mode for wyoming module 2025-04-22 11:49:08 +03:00
Alex X
6df1e68a5f Update wyoming producer and backchannel 2025-04-22 10:26:00 +03:00
Alex X
df2e982090 Add logs to wyoming module 2025-04-22 10:25:22 +03:00
Alex X
902af5e5d7 Add wyoming module 2025-04-22 06:37:42 +03:00
Alex X
7fe23c7bc5 Add wav backchannel (not used yet) 2025-04-21 20:33:13 +03:00
Alex X
d0c3cb066c Rewrite exec backchannel 2025-04-21 20:33:13 +03:00
30 changed files with 1767 additions and 166 deletions

12
internal/exec/README.md Normal file
View File

@@ -0,0 +1,12 @@
## Backchannel
- You can check audio card names in the **Go2rtc > WebUI > Add**
- You can specify multiple backchannel lines with different codecs
```yaml
sources:
two_way_audio_win:
- exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav -
- exec:ffplay -nodisp -probesize 32 -f s16le -ar 16000 -#backchannel=1#audio=s16le/16000
- exec:ffplay -nodisp -probesize 32 -f alaw -ar 8000 -#backchannel=1#audio=alaw/8000
```

View File

@@ -19,9 +19,9 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/pcm"
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/AlexxIT/go2rtc/pkg/stdin"
"github.com/rs/zerolog"
)
@@ -86,7 +86,7 @@ func execHandle(rawURL string) (prod core.Producer, err error) {
}
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
return pcm.NewBackchannel(cmd, query.Get("audio"))
}
if path == "" {

View File

@@ -12,7 +12,7 @@ func Init() {
log := app.GetLogger("expr")
streams.RedirectFunc("expr", func(url string) (string, error) {
v, err := expr.Run(url[5:])
v, err := expr.Eval(url[5:], nil)
if err != nil {
return "", err
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Stream) Play(source string) error {
func (s *Stream) Play(urlOrProd any) error {
s.mu.Lock()
for _, producer := range s.producers {
if producer.state == stateInternal && producer.conn != nil {
@@ -16,12 +16,18 @@ func (s *Stream) Play(source string) error {
}
s.mu.Unlock()
if source == "" {
return nil
}
var source string
var src core.Producer
switch urlOrProd.(type) {
case string:
if source = urlOrProd.(string); source == "" {
return nil
}
case core.Producer:
src = urlOrProd.(core.Producer)
}
for _, producer := range s.producers {
if producer.conn == nil {
continue

267
internal/wyoming/README.md Normal file
View File

@@ -0,0 +1,267 @@
# Wyoming
This module provide [Wyoming Protocol](https://www.home-assistant.io/integrations/wyoming/) support to create local voice assistants using [Home Assistant](https://www.home-assistant.io/).
- go2rtc can act as [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite)
- go2rtc can act as [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external)
- go2rtc can act as [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external)
- any supported audio source with PCM codec can be used as audio input
- any supported two-way audio source with PCM codec can be used as audio output
- any desktop/server microphone/speaker can be used as two-way audio source
- supported any OS via FFmpeg or any similar software
- supported Linux via alsa source
- you can change the behavior using the built-in scripting engine
## Typical Voice Pipeline
1. Audio stream (MIC)
- any audio source with PCM codec support (include PCMA/PCMU)
2. Voice Activity Detector (VAD)
3. Wake Word (WAKE)
- [OpenWakeWord](https://www.home-assistant.io/voice_control/create_wake_word/)
4. Speech-to-Text (STT)
- [Whisper](https://github.com/home-assistant/addons/blob/master/whisper/README.md)
- [Vosk](https://github.com/rhasspy/hassio-addons/blob/master/vosk/README.md)
5. Conversation agent (INTENT)
- [Home Assistant](https://www.home-assistant.io/integrations/conversation/)
6. Text-to-speech (TTS)
- [Google Translate](https://www.home-assistant.io/integrations/google_translate/)
- [Piper](https://github.com/home-assistant/addons/blob/master/piper/README.md)
7. Audio stream (SND)
- any source with two-way audio (backchannel) and PCM codec support (include PCMA/PCMU)
You can use a large number of different projects for WAKE, STT, INTENT and TTS thanks to the Home Assistant.
And you can use a large number of different technologies for MIC and SND thanks to Go2rtc.
## Configuration
You can optionally specify WAKE service. So go2rtc will start transmitting audio to Home Assistant only after WAKE word. If the WAKE service cannot be connected to or not specified - go2rtc will pass all audio to Home Assistant. In this case WAKE service must be configured in your Voice Assistant pipeline.
You can optionally specify VAD threshold. So go2rtc will start transmitting audio to WAKE service only after some audio noise.
Your stream must support audio transmission in PCM codec (include PCMA/PCMU).
```yaml
wyoming:
stream_name_from_streams_section:
listen: :10700
name: "My Satellite" # optional name
wake_uri: tcp://192.168.1.23:10400 # optional WAKE service
vad_threshold: 1 # optional VAD threshold (from 0.1 to 3.5)
```
Home Assistant -> Settings -> Integrations -> Add -> Wyoming Protocol -> Host + Port from `go2rtc.yaml`
Select one or multiple wake words:
```yaml
wake_uri: tcp://192.168.1.23:10400?name=alexa_v0.1&name=hey_jarvis_v0.1&name=hey_mycroft_v0.1&name=hey_rhasspy_v0.1&name=ok_nabu_v0.1
```
## Events
You can add wyoming event handling using the [expr](https://github.com/AlexxIT/go2rtc/blob/master/internal/expr/README.md) language. For example, to pronounce TTS on some media player from HA.
Turn on the logs to see what kind of events happens.
This is what the default scripts look like:
```yaml
wyoming:
script_example:
event:
run-satellite: Detect()
pause-satellite: Stop()
voice-stopped: Pause()
audio-stop: PlayAudio() && WriteEvent("played") && Detect()
error: Detect()
internal-run: WriteEvent("run-pipeline", '{"start_stage":"wake","end_stage":"tts"}') && Stream()
internal-detection: WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream()
```
If you write a script for an event - the default action is no longer executed. You need to repeat the necessary steps yourself.
In addition to the standard events, there are two additional events:
- `internal-run` - called after `Detect()` when VAD detected, but WAKE service unavailable
- `internal-detection` - called after `Detect()` when WAKE word detected
**Example 1.** You want to play a sound file when a wake word detected (only `wav` supported):
- `PlayFile` and `PlayAudio` functions are executed synchronously, the following steps will be executed only after they are completed
```yaml
wyoming:
script_example:
event:
internal-detection: PlayFile('/media/beep.wav') && WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream()
```
**Example 2.** You want to play TTS on a Home Assistant media player:
Each event has a `Type` and `Data` in JSON format. You can use their values in scripts.
- in the `synthesize` step, we get the value of the `text` and call the HA REST API
- in the `audio-stop` step we get the duration of the TTS in seconds, wait for this time and start the pipeline again
```yaml
wyoming:
script_example:
event:
synthesize: |
let text = fromJSON(Data).text;
let token = 'eyJhbGci...';
fetch('http://localhost:8123/api/services/tts/speak', {
method: 'POST',
headers: {'Authorization': 'Bearer '+token,'Content-Type': 'application/json'},
body: toJSON({
entity_id: 'tts.google_translate_com',
media_player_entity_id: 'media_player.google_nest',
message: text,
language: 'en',
}),
}).ok
audio-stop: |
let timestamp = fromJSON(Data).timestamp;
let delay = string(timestamp)+'s';
Sleep(delay) && WriteEvent("played") && Detect()
```
## Config examples
Satellite on Windows server using FFmpeg and FFplay.
```yaml
streams:
satellite_win:
- exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav -
- exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050
wyoming:
satellite_win:
listen: :10700
name: "Windows Satellite"
wake_uri: tcp://192.168.1.23:10400
vad_threshold: 1
```
Satellite on Dahua camera with two-way audio support.
```yaml
streams:
dahua_camera:
- rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1&unicast=true&proto=Onvif
wyoming:
dahua_camera:
listen: :10700
name: "Dahua Satellite"
wake_uri: tcp://192.168.1.23:10400
vad_threshold: 1
```
Satellite on Dahua camera with two-way audio support.
```yaml
streams:
wyoming_external:
- wyoming://192.168.1.23:10600 # wyoming-mic-external
- wyoming://192.168.1.23:10601?backchannel=1 # wyoming-snd-external
wyoming:
wyoming_external:
listen: :10700
name: "Wyoming Satellite"
wake_uri: tcp://192.168.1.23:10400
vad_threshold: 1
```
## Wyoming External Microphone and Sound
Advanced users, who want to enjoy the [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite) project, can use go2rtc as a [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external) or [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external).
**go2rtc.yaml**
```yaml
streams:
wyoming_mic_external:
- exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav -
wyoming_snd_external:
- exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050
wyoming:
wyoming_mic_external:
listen: :10600
mode: mic
wyoming_snd_external:
listen: :10601
mode: snd
```
**docker-compose.yml**
```yaml
version: "3.8"
services:
satellite:
build: wyoming-satellite # https://github.com/rhasspy/wyoming-satellite
ports:
- "10700:10700"
command:
- "--name"
- "my satellite"
- "--mic-uri"
- "tcp://192.168.1.23:10600"
- "--snd-uri"
- "tcp://192.168.1.23:10601"
- "--debug"
```
## Wyoming External Source
**go2rtc.yaml**
```yaml
streams:
wyoming_external:
- wyoming://192.168.1.23:10600
- wyoming://192.168.1.23:10601?backchannel=1
```
**docker-compose.yml**
```yaml
version: "3.8"
services:
microphone:
build: wyoming-mic-external # https://github.com/rhasspy/wyoming-mic-external
ports:
- "10600:10600"
devices:
- /dev/snd:/dev/snd
group_add:
- audio
command:
- "--device"
- "sysdefault"
- "--debug"
playback:
build: wyoming-snd-external # https://github.com/rhasspy/wyoming-snd-external
ports:
- "10601:10601"
devices:
- /dev/snd:/dev/snd
group_add:
- audio
command:
- "--device"
- "sysdefault"
- "--debug"
```
## Debug
```yaml
log:
wyoming: trace
```

106
internal/wyoming/wyoming.go Normal file
View File

@@ -0,0 +1,106 @@
package wyoming
import (
"net"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/wyoming"
"github.com/rs/zerolog"
)
func Init() {
streams.HandleFunc("wyoming", wyoming.Dial)
// server
var cfg struct {
Mod map[string]struct {
Listen string `yaml:"listen"`
Name string `yaml:"name"`
Mode string `yaml:"mode"`
Event map[string]string `yaml:"event"`
WakeURI string `yaml:"wake_uri"`
VADThreshold float32 `yaml:"vad_threshold"`
} `yaml:"wyoming"`
}
app.LoadConfig(&cfg)
log = app.GetLogger("wyoming")
for name, cfg := range cfg.Mod {
stream := streams.Get(name)
if stream == nil {
log.Warn().Msgf("[wyoming] missing stream: %s", name)
continue
}
if cfg.Name == "" {
cfg.Name = name
}
srv := &wyoming.Server{
Name: cfg.Name,
Event: cfg.Event,
VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000
WakeURI: cfg.WakeURI,
MicHandler: func(cons core.Consumer) error {
if err := stream.AddConsumer(cons); err != nil {
return err
}
// not best solution
if i, ok := cons.(interface{ OnClose(func()) }); ok {
i.OnClose(func() {
stream.RemoveConsumer(cons)
})
}
return nil
},
SndHandler: func(prod core.Producer) error {
return stream.Play(prod)
},
Trace: func(format string, v ...any) {
log.Trace().Msgf("[wyoming] "+format, v...)
},
Error: func(format string, v ...any) {
log.Error().Msgf("[wyoming] "+format, v...)
},
}
go serve(srv, cfg.Mode, cfg.Listen)
}
}
var log zerolog.Logger
func serve(srv *wyoming.Server, mode, address string) {
ln, err := net.Listen("tcp", address)
if err != nil {
log.Warn().Err(err).Msgf("[wyoming] listen")
}
for {
conn, err := ln.Accept()
if err != nil {
return
}
go handle(srv, mode, conn)
}
}
func handle(srv *wyoming.Server, mode string, conn net.Conn) {
addr := conn.RemoteAddr()
log.Trace().Msgf("[wyoming] %s connected", addr)
switch mode {
case "mic":
srv.HandleMic(conn)
case "snd":
srv.HandleSnd(conn)
default:
srv.Handle(conn)
}
log.Trace().Msgf("[wyoming] %s disconnected", addr)
}

View File

@@ -38,6 +38,7 @@ import (
"github.com/AlexxIT/go2rtc/internal/v4l2"
"github.com/AlexxIT/go2rtc/internal/webrtc"
"github.com/AlexxIT/go2rtc/internal/webtorrent"
"github.com/AlexxIT/go2rtc/internal/wyoming"
"github.com/AlexxIT/go2rtc/pkg/shell"
)
@@ -69,6 +70,7 @@ func main() {
hass.Init() // hass source, Hass API server
onvif.Init() // onvif source, ONVIF API server
webtorrent.Init() // webtorrent source, WebTorrent module
wyoming.Init()
// 5. Other sources

View File

@@ -249,3 +249,36 @@ func DecodeH264(fmtp string) (profile string, level byte) {
}
return
}
func ParseCodecString(s string) *Codec {
var codec Codec
ss := strings.Split(s, "/")
switch strings.ToLower(ss[0]) {
case "pcm_s16be", "s16be", "pcm":
codec.Name = CodecPCM
case "pcm_s16le", "s16le", "pcml":
codec.Name = CodecPCML
case "pcm_alaw", "alaw", "pcma":
codec.Name = CodecPCMA
case "pcm_mulaw", "mulaw", "pcmu":
codec.Name = CodecPCMU
case "aac", "mpeg4-generic":
codec.Name = CodecAAC
case "opus":
codec.Name = CodecOpus
case "flac":
codec.Name = CodecFLAC
default:
return nil
}
if len(ss) >= 2 {
codec.ClockRate = uint32(Atoi(ss[1]))
}
if len(ss) >= 3 {
codec.Channels = uint8(Atoi(ss[1]))
}
return &codec
}

View File

@@ -6,17 +6,24 @@ import (
"io"
"net/http"
"regexp"
"strings"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
)
func newRequest(method, url string, headers map[string]any) (*http.Request, error) {
func newRequest(method, url string, headers map[string]any, body string) (*http.Request, error) {
var rd io.Reader
if method == "" {
method = "GET"
}
if body != "" {
rd = strings.NewReader(body)
}
req, err := http.NewRequest(method, url, nil)
req, err := http.NewRequest(method, url, rd)
if err != nil {
return nil, err
}
@@ -55,7 +62,8 @@ var Options = []expr.Option{
options := params[1].(map[string]any)
method, _ := options["method"].(string)
headers, _ := options["headers"].(map[string]any)
req, err = newRequest(method, url, headers)
body, _ := options["body"].(string)
req, err = newRequest(method, url, headers, body)
} else {
req, err = http.NewRequest("GET", url, nil)
}
@@ -105,11 +113,19 @@ var Options = []expr.Option{
),
}
func Run(input string) (any, error) {
program, err := expr.Compile(input, Options...)
func Compile(input string) (*vm.Program, error) {
return expr.Compile(input, Options...)
}
func Eval(input string, env any) (any, error) {
program, err := Compile(input)
if err != nil {
return nil, err
}
return expr.Run(program, nil)
return expr.Run(program, env)
}
func Run(program *vm.Program, env any) (any, error) {
return vm.Run(program, env)
}

View File

@@ -7,11 +7,11 @@ import (
)
func TestMatchHost(t *testing.T) {
v, err := Run(`
v, err := Eval(`
let url = "rtsp://user:pass@192.168.1.123/cam/realmonitor?...";
let host = match(url, "//[^/]+")[0][2:];
host
`)
`, nil)
require.Nil(t, err)
require.Equal(t, "user:pass@192.168.1.123", v)
}

69
pkg/pcm/backchannel.go Normal file
View File

@@ -0,0 +1,69 @@
package pcm
import (
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
cmd *shell.Command
}
func NewBackchannel(cmd *shell.Command, audio string) (core.Producer, error) {
var codec *core.Codec
if audio == "" {
// default codec
codec = &core.Codec{Name: core.CodecPCML, ClockRate: 16000}
} else if codec = core.ParseCodecString(audio); codec == nil {
return nil, errors.New("pcm: unsupported audio format: " + audio)
}
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{codec},
},
}
return &Backchannel{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "pcm",
Protocol: "pipe",
Medias: medias,
Transport: cmd,
},
cmd: cmd,
}, nil
}
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
wr, err := c.cmd.StdinPipe()
if err != nil {
return err
}
sender := core.NewSender(media, track.Codec)
sender.Handler = func(packet *rtp.Packet) {
if n, err := wr.Write(packet.Payload); err != nil {
c.Send += n
}
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Backchannel) Start() error {
return c.cmd.Run()
}

View File

@@ -2,6 +2,7 @@ package pcm
import (
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
@@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl
}
}
func BytesPerFrame(codec *core.Codec) byte {
channels := byte(codec.Channels)
if channels == 0 {
channels = 1
}
func BytesPerSample(codec *core.Codec) int {
switch codec.Name {
case core.CodecPCML, core.CodecPCM:
return 2 * channels
return 2
case core.CodecPCMU, core.CodecPCMA:
return channels
return 1
}
return 0
}
func BytesPerFrame(codec *core.Codec) int {
if codec.Channels <= 1 {
return BytesPerSample(codec)
}
return int(codec.Channels) * BytesPerSample(codec)
}
func FramesPerDuration(codec *core.Codec, duration time.Duration) int {
return int(time.Duration(codec.ClockRate) * duration / time.Second)
}
func BytesPerDuration(codec *core.Codec, duration time.Duration) int {
return BytesPerFrame(codec) * FramesPerDuration(codec, duration)
}

View File

@@ -1,13 +1,25 @@
package pcm
import "github.com/AlexxIT/go2rtc/pkg/core"
import (
"math"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func ceil(x float32) int {
d, fract := math.Modf(float64(x))
if fract == 0.0 {
return int(d)
}
return int(d) + 1
}
func Downsample(k float32) func([]int16) []int16 {
var sampleN, sampleSum float32
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int((float32(len(src))+sampleN)/k))
dst = make([]int16, ceil((float32(len(src))+sampleN)/k))
for _, sample := range src {
sampleSum += float32(sample)
sampleN++
@@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 {
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int(k*float32(len(src))))
dst = make([]int16, ceil(k*float32(len(src))))
for _, sample := range src {
sampleN += k
for sampleN > 0 {
@@ -185,3 +197,24 @@ func Transcode(dst, src *core.Codec) func([]byte) []byte {
return writer(samples)
}
}
func ConsumerCodecs() []*core.Codec {
return []*core.Codec{
{Name: core.CodecPCML},
{Name: core.CodecPCM},
{Name: core.CodecPCMA},
{Name: core.CodecPCMU},
}
}
func ProducerCodecs() []*core.Codec {
return []*core.Codec{
{Name: core.CodecPCML, ClockRate: 16000},
{Name: core.CodecPCM, ClockRate: 16000},
{Name: core.CodecPCML, ClockRate: 8000},
{Name: core.CodecPCM, ClockRate: 8000},
{Name: core.CodecPCMA, ClockRate: 8000},
{Name: core.CodecPCMU, ClockRate: 8000},
{Name: core.CodecPCML, ClockRate: 22050}, // wyoming-snd-external
}
}

96
pkg/pcm/producer_sync.go Normal file
View File

@@ -0,0 +1,96 @@
package pcm
import (
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type ProducerSync struct {
core.Connection
src *core.Codec
rd io.Reader
onClose func()
}
func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: ProducerCodecs(),
},
}
return &ProducerSync{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "pcm",
Medias: medias,
Transport: rd,
},
src: codec,
rd: rd,
}
}
func (p *ProducerSync) OnClose(f func()) {
p.onClose = f
}
func (p *ProducerSync) Start() error {
if len(p.Receivers) == 0 {
return nil
}
var pktSeq uint16
var pktTS uint32 // time in frames
var pktTime time.Duration // time in seconds
t0 := time.Now()
dst := p.Receivers[0].Codec
transcode := Transcode(dst, p.src)
const chunkDuration = 20 * time.Millisecond
chunkBytes := BytesPerDuration(p.src, chunkDuration)
chunkFrames := uint32(FramesPerDuration(dst, chunkDuration))
for {
buf := make([]byte, chunkBytes)
n, _ := io.ReadFull(p.rd, buf)
if n == 0 {
break
}
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: pktSeq,
Timestamp: pktTS,
},
Payload: transcode(buf[:n]),
}
if d := pktTime - time.Since(t0); d > 0 {
time.Sleep(d)
}
p.Receivers[0].WriteRTP(pkt)
p.Recv += n
pktSeq++
pktTS += chunkFrames
pktTime += chunkDuration
}
if p.onClose != nil {
p.onClose()
}
return nil
}

42
pkg/pcm/s16le/s16le.go Normal file
View File

@@ -0,0 +1,42 @@
package s16le
func PeaksRMS(b []byte) int16 {
// RMS of sine wave = peak / sqrt2
// https://en.wikipedia.org/wiki/Root_mean_square
// https://www.youtube.com/watch?v=MUDkL4KZi0I
var peaks int32
var peaksSum int32
var prevSample int16
var prevUp bool
var i int
for n := len(b); i < n; {
lo := b[i]
i++
hi := b[i]
i++
sample := int16(hi)<<8 | int16(lo)
up := sample >= prevSample
if i >= 4 {
if up != prevUp {
if prevSample >= 0 {
peaksSum += int32(prevSample)
} else {
peaksSum -= int32(prevSample)
}
peaks++
}
}
prevSample = sample
prevUp = up
}
if peaks == 0 {
return 0
}
return int16(peaksSum / peaks)
}

View File

@@ -1,59 +0,0 @@
package stdin
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
if c.sender == nil {
stdin, err := c.cmd.StdinPipe()
if err != nil {
return err
}
c.sender = core.NewSender(media, track.Codec)
c.sender.Handler = func(packet *rtp.Packet) {
_, _ = stdin.Write(packet.Payload)
c.send += len(packet.Payload)
}
}
c.sender.HandleRTP(track)
return nil
}
func (c *Client) Start() (err error) {
return c.cmd.Run()
}
func (c *Client) Stop() (err error) {
if c.sender != nil {
c.sender.Close()
}
return c.cmd.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Connection{
ID: core.ID(c),
FormatName: "exec",
Protocol: "pipe",
Medias: c.medias,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
}
return json.Marshal(info)
}

View File

@@ -1,33 +0,0 @@
package stdin
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *shell.Command
medias []*core.Media
sender *core.Sender
send int
}
func NewClient(cmd *shell.Command) (*Client, error) {
c := &Client{
cmd: cmd,
medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecPCMA, ClockRate: 8000},
{Name: core.CodecPCM},
},
},
},
}
return c, nil
}

67
pkg/wav/backchannel.go Normal file
View File

@@ -0,0 +1,67 @@
package wav
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
cmd *shell.Command
}
func NewBackchannel(cmd *shell.Command) (core.Producer, error) {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
//{Name: core.CodecPCML},
{Name: core.CodecPCMA},
{Name: core.CodecPCMU},
},
},
}
return &Backchannel{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wav",
Protocol: "pipe",
Medias: medias,
Transport: cmd,
},
cmd: cmd,
}, nil
}
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
wr, err := c.cmd.StdinPipe()
if err != nil {
return err
}
b := Header(track.Codec)
if _, err = wr.Write(b); err != nil {
return err
}
sender := core.NewSender(media, track.Codec)
sender.Handler = func(packet *rtp.Packet) {
if n, err := wr.Write(packet.Payload); err != nil {
c.Send += n
}
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Backchannel) Start() error {
return c.cmd.Run()
}

View File

@@ -2,7 +2,6 @@ package wav
import (
"bufio"
"encoding/binary"
"errors"
"io"
@@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) {
// https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
rd := bufio.NewReaderSize(r, core.BufferSize)
// skip Master RIFF chunk
if _, err := rd.Discard(12); err != nil {
codec, err := ReadHeader(r)
if err != nil {
return nil, err
}
codec := &core.Codec{}
for {
chunkID, data, err := readChunk(rd)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
if codec.Name == "" {
return nil, errors.New("waw: unsupported codec")
}
@@ -110,18 +81,3 @@ func (c *Producer) Start() error {
ts += PacketSize
}
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}

103
pkg/wav/wav.go Normal file
View File

@@ -0,0 +1,103 @@
package wav
import (
"encoding/binary"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func Header(codec *core.Codec) []byte {
var fmt, size, extra byte
switch codec.Name {
case core.CodecPCML:
fmt = 1
size = 2
case core.CodecPCMA:
fmt = 6
size = 1
extra = 2
case core.CodecPCMU:
fmt = 7
size = 1
extra = 2
default:
return nil
}
channels := byte(codec.Channels)
if channels == 0 {
channels = 1
}
b := make([]byte, 0, 46) // cap with extra
b = append(b, "RIFF\xFF\xFF\xFF\xFFWAVEfmt "...)
b = append(b, 0x10+extra, 0, 0, 0)
b = append(b, fmt, 0)
b = append(b, channels, 0)
b = binary.LittleEndian.AppendUint32(b, codec.ClockRate)
b = binary.LittleEndian.AppendUint32(b, uint32(size*channels)*codec.ClockRate)
b = append(b, size*channels, 0)
b = append(b, size*8, 0)
if extra > 0 {
b = append(b, 0, 0) // ExtraParamSize (if PCM, then doesn't exist)
}
b = append(b, "data\xFF\xFF\xFF\xFF"...)
return b
}
func ReadHeader(r io.Reader) (*core.Codec, error) {
// skip Master RIFF chunk
if _, err := io.ReadFull(r, make([]byte, 12)); err != nil {
return nil, err
}
var codec core.Codec
for {
chunkID, data, err := readChunk(r)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
return &codec, nil
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}

14
pkg/wyoming/README.md Normal file
View File

@@ -0,0 +1,14 @@
## Default wake words
- alexa_v0.1
- hey_jarvis_v0.1
- hey_mycroft_v0.1
- hey_rhasspy_v0.1
- ok_nabu_v0.1
## Useful Links
- https://github.com/rhasspy/wyoming-satellite
- https://github.com/rhasspy/wyoming-openwakeword
- https://github.com/fwartner/home-assistant-wakewords-collection
- https://github.com/esphome/micro-wake-word-models/tree/main?tab=readme-ov-file

99
pkg/wyoming/api.go Normal file
View File

@@ -0,0 +1,99 @@
package wyoming
import (
"bufio"
"encoding/json"
"io"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type API struct {
conn net.Conn
rd *bufio.Reader
}
func DialAPI(address string) (*API, error) {
conn, err := net.DialTimeout("tcp", address, core.ConnDialTimeout)
if err != nil {
return nil, err
}
return NewAPI(conn), nil
}
const Version = "1.5.4"
func NewAPI(conn net.Conn) *API {
return &API{conn: conn, rd: bufio.NewReader(conn)}
}
func (w *API) WriteEvent(evt *Event) (err error) {
hdr := EventHeader{
Type: evt.Type,
Version: Version,
DataLength: len(evt.Data),
PayloadLength: len(evt.Payload),
}
buf, err := json.Marshal(hdr)
if err != nil {
return err
}
buf = append(buf, '\n')
buf = append(buf, evt.Data...)
buf = append(buf, evt.Payload...)
_, err = w.conn.Write(buf)
return err
}
func (w *API) ReadEvent() (*Event, error) {
data, err := w.rd.ReadBytes('\n')
if err != nil {
return nil, err
}
var hdr EventHeader
if err = json.Unmarshal(data, &hdr); err != nil {
return nil, err
}
evt := Event{Type: hdr.Type}
if hdr.DataLength > 0 {
data = make([]byte, hdr.DataLength)
if _, err = io.ReadFull(w.rd, data); err != nil {
return nil, err
}
evt.Data = string(data)
}
if hdr.PayloadLength > 0 {
evt.Payload = make([]byte, hdr.PayloadLength)
if _, err = io.ReadFull(w.rd, evt.Payload); err != nil {
return nil, err
}
}
return &evt, nil
}
func (w *API) Close() error {
return w.conn.Close()
}
type Event struct {
Type string
Data string
Payload []byte
}
type EventHeader struct {
Type string `json:"type"`
Version string `json:"version"`
DataLength int `json:"data_length,omitempty"`
PayloadLength int `json:"payload_length,omitempty"`
}

View File

@@ -0,0 +1,63 @@
package wyoming
import (
"fmt"
"net"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Backchannel struct {
core.Connection
api *API
}
func newBackchannel(conn net.Conn) *Backchannel {
return &Backchannel{
core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecPCML, ClockRate: 22050},
},
},
},
Transport: conn,
},
NewAPI(conn),
}
}
func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
sender := core.NewSender(media, codec)
sender.Handler = func(pkt *rtp.Packet) {
ts := time.Now().Nanosecond()
evt := &Event{
Type: "audio-chunk",
Data: fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts),
Payload: pkt.Payload,
}
_ = b.api.WriteEvent(evt)
}
sender.HandleRTP(track)
b.Senders = append(b.Senders, sender)
return nil
}
func (b *Backchannel) Start() error {
for {
if _, err := b.api.ReadEvent(); err != nil {
return err
}
}
}

138
pkg/wyoming/expr.go Normal file
View File

@@ -0,0 +1,138 @@
package wyoming
import (
"bytes"
"fmt"
"os"
"time"
"github.com/AlexxIT/go2rtc/pkg/expr"
"github.com/AlexxIT/go2rtc/pkg/wav"
)
type env struct {
*satellite
Type string
Data string
}
func (s *satellite) handleEvent(evt *Event) {
switch evt.Type {
case "describe":
// {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}}
data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.srv.Name)
s.WriteEvent("info", data)
case "run-satellite":
s.Detect()
case "pause-satellite":
s.Stop()
case "detect": // WAKE_WORD_START {"names": null}
case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null}
case "transcribe": // STT_START {"language": "en"}
case "voice-started": // STT_VAD_START {"timestamp": 1160}
case "voice-stopped": // STT_VAD_END {"timestamp": 2470}
s.Pause()
case "transcript": // STT_END {"text": "how are you"}
case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}}
case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
case "audio-stop": // {"timestamp": 2.880000000000002}
// run async because PlayAudio takes some time
go func() {
s.PlayAudio()
s.WriteEvent("played")
s.Detect()
}()
case "error":
s.Detect()
case "internal-run":
s.WriteEvent("run-pipeline", `{"start_stage":"wake","end_stage":"tts"}`)
s.Stream()
case "internal-detection":
s.WriteEvent("run-pipeline", `{"start_stage":"asr","end_stage":"tts"}`)
s.Stream()
}
}
func (s *satellite) handleScript(evt *Event) {
var script string
if s.srv.Event != nil {
script = s.srv.Event[evt.Type]
}
s.srv.Trace("event=%s data=%s payload size=%d", evt.Type, evt.Data, len(evt.Payload))
if script == "" {
s.handleEvent(evt)
return
}
// run async because script can have sleeps
go func() {
e := &env{satellite: s, Type: evt.Type, Data: evt.Data}
if res, err := expr.Eval(script, e); err != nil {
s.srv.Trace("event=%s expr error=%s", evt.Type, err)
s.handleEvent(evt)
} else {
s.srv.Trace("event=%s expr result=%v", evt.Type, res)
}
}()
}
func (s *satellite) Detect() bool {
return s.setMicState(stateWaitVAD)
}
func (s *satellite) Stream() bool {
return s.setMicState(stateActive)
}
func (s *satellite) Pause() bool {
return s.setMicState(stateIdle)
}
func (s *satellite) Stop() bool {
s.micStop()
return true
}
func (s *satellite) WriteEvent(args ...string) bool {
if len(args) == 0 {
return false
}
evt := &Event{Type: args[0]}
if len(args) > 1 {
evt.Data = args[1]
}
if err := s.api.WriteEvent(evt); err != nil {
return false
}
return true
}
func (s *satellite) PlayAudio() bool {
return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio))
}
func (s *satellite) PlayFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
codec, err := wav.ReadHeader(f)
if err != nil {
return false
}
return s.playAudio(codec, f)
}
func (e *env) Sleep(s string) bool {
d, err := time.ParseDuration(s)
if err != nil {
return false
}
time.Sleep(d)
return true
}

35
pkg/wyoming/mic.go Normal file
View File

@@ -0,0 +1,35 @@
package wyoming
import (
"fmt"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (s *Server) HandleMic(conn net.Conn) {
defer conn.Close()
var closed core.Waiter
var timestamp int
api := NewAPI(conn)
mic := newMicConsumer(func(chunk []byte) {
data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, timestamp)
evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk}
if err := api.WriteEvent(evt); err != nil {
closed.Done(nil)
}
timestamp += len(chunk) / 2
})
mic.RemoteAddr = api.conn.RemoteAddr().String()
if err := s.MicHandler(mic); err != nil {
s.Error("mic error: %s", err)
return
}
_ = closed.Wait()
_ = mic.Stop()
}

65
pkg/wyoming/producer.go Normal file
View File

@@ -0,0 +1,65 @@
package wyoming
import (
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
api *API
}
func newProducer(conn net.Conn) *Producer {
return &Producer{
core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Medias: []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{Name: core.CodecPCML, ClockRate: 16000},
},
},
},
Transport: conn,
},
NewAPI(conn),
}
}
func (p *Producer) Start() error {
var seq uint16
var ts uint32
for {
evt, err := p.api.ReadEvent()
if err != nil {
return err
}
if evt.Type != "audio-chunk" {
continue
}
p.Recv += len(evt.Payload)
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: seq,
Timestamp: ts,
},
Payload: evt.Payload,
}
p.Receivers[0].WriteRTP(pkt)
seq++
ts += uint32(len(evt.Payload) / 2)
}
}

275
pkg/wyoming/satellite.go Normal file
View File

@@ -0,0 +1,275 @@
package wyoming
import (
"context"
"fmt"
"io"
"net"
"sync"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
"github.com/AlexxIT/go2rtc/pkg/pcm/s16le"
"github.com/pion/rtp"
)
type Server struct {
Name string
Event map[string]string
VADThreshold int16
WakeURI string
MicHandler func(cons core.Consumer) error
SndHandler func(prod core.Producer) error
Trace func(format string, v ...any)
Error func(format string, v ...any)
}
func (s *Server) Serve(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
go s.Handle(conn)
}
}
func (s *Server) Handle(conn net.Conn) {
api := NewAPI(conn)
sat := newSatellite(api, s)
defer sat.Close()
for {
evt, err := api.ReadEvent()
if err != nil {
return
}
switch evt.Type {
case "ping": // {"text": null}
_ = api.WriteEvent(&Event{Type: "pong", Data: evt.Data})
case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
sat.sndAudio = sat.sndAudio[:0]
case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
sat.sndAudio = append(sat.sndAudio, evt.Payload...)
default:
sat.handleScript(evt)
}
}
}
// states like http.ConnState
const (
stateError = -2
stateClosed = -1
stateNew = 0
stateIdle = 1
stateWaitVAD = 2 // aka wait VAD
stateWaitWakeWord = 3
stateActive = 4
)
type satellite struct {
api *API
srv *Server
micState int8
micTS int
micMu sync.Mutex
sndAudio []byte
mic *micConsumer
wake *WakeWord
}
func newSatellite(api *API, srv *Server) *satellite {
sat := &satellite{api: api, srv: srv}
return sat
}
func (s *satellite) Close() error {
s.Stop()
return s.api.Close()
}
const wakeTimeout = 5 * 2 * 16000 // 5 seconds
func (s *satellite) setMicState(state int8) bool {
s.micMu.Lock()
defer s.micMu.Unlock()
if s.micState == stateNew {
s.mic = newMicConsumer(s.onMicChunk)
s.mic.RemoteAddr = s.api.conn.RemoteAddr().String()
if err := s.srv.MicHandler(s.mic); err != nil {
s.micState = stateError
s.srv.Error("can't get mic: %w", err)
_ = s.api.Close()
} else {
s.micState = stateIdle
}
}
if s.micState < stateIdle {
return false
}
s.micState = state
s.micTS = 0
return true
}
func (s *satellite) micStop() {
s.micMu.Lock()
s.micState = stateClosed
if s.mic != nil {
_ = s.mic.Stop()
s.mic = nil
}
if s.wake != nil {
_ = s.wake.Close()
s.wake = nil
}
s.micMu.Unlock()
}
func (s *satellite) onMicChunk(chunk []byte) {
s.micMu.Lock()
defer s.micMu.Unlock()
if s.micState == stateIdle {
return
}
if s.micState == stateWaitVAD {
// tests show that values over 1000 are most likely speech
if s.srv.VADThreshold == 0 || s16le.PeaksRMS(chunk) > s.srv.VADThreshold {
if s.wake == nil && s.srv.WakeURI != "" {
s.wake, _ = DialWakeWord(s.srv.WakeURI)
}
if s.wake == nil {
// some problems with wake word - redirect to HA
s.micState = stateIdle
go s.handleScript(&Event{Type: "internal-run"})
} else {
s.micState = stateWaitWakeWord
}
s.micTS = 0
}
}
if s.micState == stateWaitWakeWord {
if s.wake.Detection != "" {
// check if wake word detected
s.micState = stateIdle
go s.handleScript(&Event{Type: "internal-detection", Data: `{"name":"` + s.wake.Detection + `"}`})
} else if err := s.wake.WriteChunk(chunk); err != nil {
// wake word service failed
s.micState = stateWaitVAD
_ = s.wake.Close()
s.wake = nil
} else if s.micTS > wakeTimeout {
// wake word detection timeout
s.micState = stateWaitVAD
}
} else if s.wake != nil {
_ = s.wake.Close()
s.wake = nil
}
if s.micState == stateActive {
data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.micTS)
evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk}
_ = s.api.WriteEvent(evt)
}
s.micTS += len(chunk) / 2
}
func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
prod := pcm.OpenSync(codec, rd)
prod.OnClose(cancel)
if err := s.srv.SndHandler(prod); err != nil {
return false
} else {
<-ctx.Done()
return true
}
}
type micConsumer struct {
core.Connection
onData func(chunk []byte)
onClose func()
}
func newMicConsumer(onData func(chunk []byte)) *micConsumer {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: pcm.ConsumerCodecs(),
},
}
return &micConsumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Protocol: "tcp",
Medias: medias,
},
onData: onData,
}
}
func (c *micConsumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
src := track.Codec
dst := &core.Codec{
Name: core.CodecPCML,
ClockRate: 16000,
Channels: 1,
}
sender := core.NewSender(media, dst)
sender.Handler = pcm.TranscodeHandler(dst, src,
repack(func(packet *core.Packet) {
c.onData(packet.Payload)
}),
)
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *micConsumer) Stop() error {
if c.onClose != nil {
c.onClose()
}
return c.Connection.Stop()
}
func repack(handler core.HandlerFunc) core.HandlerFunc {
const PacketSize = 2 * 16000 / 50 // 20ms
var buf []byte
return func(pkt *rtp.Packet) {
buf = append(buf, pkt.Payload...)
for len(buf) >= PacketSize {
pkt = &core.Packet{Payload: buf[:PacketSize]}
buf = buf[PacketSize:]
handler(pkt)
}
}
}

40
pkg/wyoming/snd.go Normal file
View File

@@ -0,0 +1,40 @@
package wyoming
import (
"bytes"
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
)
func (s *Server) HandleSnd(conn net.Conn) {
defer conn.Close()
var snd []byte
api := NewAPI(conn)
for {
evt, err := api.ReadEvent()
if err != nil {
return
}
s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload))
switch evt.Type {
case "audio-start":
snd = snd[:0]
case "audio-chunk":
snd = append(snd, evt.Payload...)
case "audio-stop":
prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd))
if err = s.SndHandler(prod); err != nil {
s.Error("snd error: %s", err)
return
}
}
}
}
var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050}

120
pkg/wyoming/wakeword.go Normal file
View File

@@ -0,0 +1,120 @@
package wyoming
import (
"encoding/json"
"fmt"
"net/url"
)
type WakeWord struct {
*API
names []string
send int
Detection string
}
func DialWakeWord(rawURL string) (*WakeWord, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
api, err := DialAPI(u.Host)
if err != nil {
return nil, err
}
names := u.Query()["name"]
if len(names) == 0 {
names = []string{"ok_nabu_v0.1"}
}
wake := &WakeWord{API: api, names: names}
if err = wake.Start(); err != nil {
_ = wake.Close()
return nil, err
}
go wake.handle()
return wake, nil
}
func (w *WakeWord) handle() {
defer w.Close()
for {
evt, err := w.ReadEvent()
if err != nil {
return
}
if evt.Type == "detection" {
var data struct {
Name string `json:"name"`
}
if err = json.Unmarshal([]byte(evt.Data), &data); err != nil {
return
}
w.Detection = data.Name
}
}
}
//func (w *WakeWord) Describe() error {
// if err := w.WriteEvent(&Event{Type: "describe"}); err != nil {
// return err
// }
//
// evt, err := w.ReadEvent()
// if err != nil {
// return err
// }
//
// var info struct {
// Wake []struct {
// Models []struct {
// Name string `json:"name"`
// } `json:"models"`
// } `json:"wake"`
// }
// if err = json.Unmarshal(evt.Data, &info); err != nil {
// return err
// }
//
// return nil
//}
func (w *WakeWord) Start() error {
msg := struct {
Names []string `json:"names"`
}{
Names: w.names,
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
evt := &Event{Type: "detect", Data: string(data)}
if err := w.WriteEvent(evt); err != nil {
return err
}
evt = &Event{Type: "audio-start", Data: audioData(0)}
return w.WriteEvent(evt)
}
func (w *WakeWord) Close() error {
return w.conn.Close()
}
func (w *WakeWord) WriteChunk(payload []byte) error {
evt := &Event{Type: "audio-chunk", Data: audioData(w.send), Payload: payload}
w.send += len(payload)
return w.WriteEvent(evt)
}
func audioData(send int) string {
// timestamp in ms = send / 2 * 1000 / 16000 = send / 32
return fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32)
}

26
pkg/wyoming/wyoming.go Normal file
View File

@@ -0,0 +1,26 @@
package wyoming
import (
"net"
"net/url"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func Dial(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout)
if err != nil {
return nil, err
}
if u.Query().Get("backchannel") != "1" {
return newProducer(conn), nil
} else {
return newBackchannel(conn), nil
}
}