Add PlayFile function to wyoming server

This commit is contained in:
Alex X
2025-04-24 21:23:16 +03:00
parent 890fd78a6a
commit c50e894a42
8 changed files with 226 additions and 162 deletions

View File

@@ -2,6 +2,7 @@ package pcm
import (
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
@@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl
}
}
func BytesPerFrame(codec *core.Codec) byte {
channels := byte(codec.Channels)
if channels == 0 {
channels = 1
}
func BytesPerSample(codec *core.Codec) int {
switch codec.Name {
case core.CodecPCML, core.CodecPCM:
return 2 * channels
return 2
case core.CodecPCMU, core.CodecPCMA:
return channels
return 1
}
return 0
}
func BytesPerFrame(codec *core.Codec) int {
if codec.Channels <= 1 {
return BytesPerSample(codec)
}
return int(codec.Channels) * BytesPerSample(codec)
}
func FramesPerDuration(codec *core.Codec, duration time.Duration) int {
return int(time.Duration(codec.ClockRate) * duration / time.Second)
}
func BytesPerDuration(codec *core.Codec, duration time.Duration) int {
return BytesPerFrame(codec) * FramesPerDuration(codec, duration)
}

View File

@@ -1,13 +1,25 @@
package pcm
import "github.com/AlexxIT/go2rtc/pkg/core"
import (
"math"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func ceil(x float32) int {
d, fract := math.Modf(float64(x))
if fract == 0.0 {
return int(d)
}
return int(d) + 1
}
func Downsample(k float32) func([]int16) []int16 {
var sampleN, sampleSum float32
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int((float32(len(src))+sampleN)/k))
dst = make([]int16, ceil((float32(len(src))+sampleN)/k))
for _, sample := range src {
sampleSum += float32(sample)
sampleN++
@@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 {
return func(src []int16) (dst []int16) {
var i int
dst = make([]int16, int(k*float32(len(src))))
dst = make([]int16, ceil(k*float32(len(src))))
for _, sample := range src {
sampleN += k
for sampleN > 0 {

96
pkg/pcm/producer_sync.go Normal file
View File

@@ -0,0 +1,96 @@
package pcm
import (
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type ProducerSync struct {
core.Connection
src *core.Codec
rd io.Reader
onClose func()
}
func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: ProducerCodecs(),
},
}
return &ProducerSync{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "pcm",
Medias: medias,
Transport: rd,
},
src: codec,
rd: rd,
}
}
func (p *ProducerSync) OnClose(f func()) {
p.onClose = f
}
func (p *ProducerSync) Start() error {
if len(p.Receivers) == 0 {
return nil
}
var pktSeq uint16
var pktTS uint32 // time in frames
var pktTime time.Duration // time in seconds
t0 := time.Now()
dst := p.Receivers[0].Codec
transcode := Transcode(dst, p.src)
const chunkDuration = 20 * time.Millisecond
chunkBytes := BytesPerDuration(p.src, chunkDuration)
chunkFrames := uint32(FramesPerDuration(dst, chunkDuration))
for {
buf := make([]byte, chunkBytes)
n, _ := io.ReadFull(p.rd, buf)
if n == 0 {
break
}
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: pktSeq,
Timestamp: pktTS,
},
Payload: transcode(buf[:n]),
}
if d := pktTime - time.Since(t0); d > 0 {
time.Sleep(d)
}
p.Receivers[0].WriteRTP(pkt)
p.Recv += n
pktSeq++
pktTS += chunkFrames
pktTime += chunkDuration
}
if p.onClose != nil {
p.onClose()
}
return nil
}

View File

@@ -2,7 +2,6 @@ package wav
import (
"bufio"
"encoding/binary"
"errors"
"io"
@@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) {
// https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
rd := bufio.NewReaderSize(r, core.BufferSize)
// skip Master RIFF chunk
if _, err := rd.Discard(12); err != nil {
codec, err := ReadHeader(r)
if err != nil {
return nil, err
}
codec := &core.Codec{}
for {
chunkID, data, err := readChunk(rd)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
if codec.Name == "" {
return nil, errors.New("waw: unsupported codec")
}
@@ -110,18 +81,3 @@ func (c *Producer) Start() error {
ts += PacketSize
}
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}

View File

@@ -2,6 +2,7 @@ package wav
import (
"encoding/binary"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -48,3 +49,55 @@ func Header(codec *core.Codec) []byte {
return b
}
func ReadHeader(r io.Reader) (*core.Codec, error) {
// skip Master RIFF chunk
if _, err := io.ReadFull(r, make([]byte, 12)); err != nil {
return nil, err
}
var codec core.Codec
for {
chunkID, data, err := readChunk(r)
if err != nil {
return nil, err
}
if chunkID == "data" {
break
}
if chunkID == "fmt " {
// https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt
switch data[0] {
case 1:
codec.Name = core.CodecPCML
case 6:
codec.Name = core.CodecPCMA
case 7:
codec.Name = core.CodecPCMU
}
codec.Channels = data[2]
codec.ClockRate = binary.LittleEndian.Uint32(data[4:])
}
}
return &codec, nil
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
return
}
if chunkID = string(b[:4]); chunkID != "data" {
size := binary.LittleEndian.Uint32(b[4:])
data = make([]byte, size)
_, err = io.ReadFull(r, data)
}
return
}

View File

@@ -1,11 +1,13 @@
package wyoming
import (
"bytes"
"fmt"
"os"
"time"
"github.com/AlexxIT/go2rtc/pkg/expr"
"golang.org/x/net/context"
"github.com/AlexxIT/go2rtc/pkg/wav"
)
type env struct {
@@ -109,16 +111,21 @@ func (s *satellite) WriteEvent(args ...string) bool {
}
func (s *satellite) PlayAudio() bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio))
}
prod := newSndProducer(s.sndAudio, cancel)
if err := s.srv.SndHandler(prod); err != nil {
func (s *satellite) PlayFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
} else {
<-ctx.Done()
return true
}
codec, err := wav.ReadHeader(f)
if err != nil {
return false
}
return s.playAudio(codec, f)
}
func (e *env) Sleep(s string) bool {

View File

@@ -1,10 +1,11 @@
package wyoming
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
@@ -55,13 +56,8 @@ func (s *Server) Handle(conn net.Conn) {
sat.sndAudio = sat.sndAudio[:0]
case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0}
sat.sndAudio = append(sat.sndAudio, evt.Payload...)
}
if s.Event == nil || s.Event[evt.Type] == "" {
sat.handleEvent(evt)
} else {
// run async because there may be sleeps
go sat.handleScript(evt)
default:
sat.handleScript(evt)
}
}
}
@@ -196,6 +192,21 @@ func (s *satellite) onMicChunk(chunk []byte) {
s.micTS += len(chunk) / 2
}
func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
prod := pcm.OpenSync(codec, rd)
prod.OnClose(cancel)
if err := s.srv.SndHandler(prod); err != nil {
return false
} else {
<-ctx.Done()
return true
}
}
type micConsumer struct {
core.Connection
onData func(chunk []byte)
@@ -247,90 +258,6 @@ func (c *micConsumer) Stop() error {
return c.Connection.Stop()
}
type sndProducer struct {
core.Connection
data []byte
onClose func()
}
func newSndProducer(data []byte, onClose func()) *sndProducer {
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: pcm.ProducerCodecs(),
},
}
return &sndProducer{
core.Connection{
ID: core.NewID(),
FormatName: "wyoming",
Protocol: "tcp",
Medias: medias,
},
data,
onClose,
}
}
func (s *sndProducer) Start() error {
if len(s.Receivers) == 0 {
return nil
}
var pts time.Duration
var seq uint16
t0 := time.Now()
src := &core.Codec{Name: core.CodecPCML, ClockRate: 22050}
dst := s.Receivers[0].Codec
f := pcm.Transcode(dst, src)
bps := uint32(pcm.BytesPerFrame(dst))
chunkBytes := int(2 * src.ClockRate / 50) // 20ms
for {
n := len(s.data)
if n == 0 {
break
}
if chunkBytes > n {
chunkBytes = n
}
pkt := &core.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: seq,
Timestamp: uint32(s.Recv/2) * bps,
},
Payload: f(s.data[:chunkBytes]),
}
if d := pts - time.Since(t0); d > 0 {
time.Sleep(d)
}
s.Receivers[0].WriteRTP(pkt)
s.Recv += chunkBytes
s.data = s.data[chunkBytes:]
pts += 20 * time.Millisecond
seq++
}
if s.onClose != nil {
s.onClose()
}
return nil
}
func repack(handler core.HandlerFunc) core.HandlerFunc {
const PacketSize = 2 * 16000 / 50 // 20ms

View File

@@ -1,8 +1,11 @@
package wyoming
import (
"bytes"
"net"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pcm"
)
func (s *Server) HandleSnd(conn net.Conn) {
@@ -25,9 +28,7 @@ func (s *Server) HandleSnd(conn net.Conn) {
case "audio-chunk":
snd = append(snd, evt.Payload...)
case "audio-stop":
prod := newSndProducer(snd, func() {
time.Sleep(time.Second) // some extra delay before close
})
prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd))
if err = s.SndHandler(prod); err != nil {
s.Error("snd error: %s", err)
return
@@ -35,3 +36,5 @@ func (s *Server) HandleSnd(conn net.Conn) {
}
}
}
var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050}