diff --git a/pkg/pcm/handlers.go b/pkg/pcm/handlers.go index 39075199..18a96468 100644 --- a/pkg/pcm/handlers.go +++ b/pkg/pcm/handlers.go @@ -2,6 +2,7 @@ package pcm import ( "sync" + "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" @@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl } } -func BytesPerFrame(codec *core.Codec) byte { - channels := byte(codec.Channels) - if channels == 0 { - channels = 1 - } - +func BytesPerSample(codec *core.Codec) int { switch codec.Name { case core.CodecPCML, core.CodecPCM: - return 2 * channels + return 2 case core.CodecPCMU, core.CodecPCMA: - return channels + return 1 } - return 0 } + +func BytesPerFrame(codec *core.Codec) int { + if codec.Channels <= 1 { + return BytesPerSample(codec) + } + return int(codec.Channels) * BytesPerSample(codec) +} + +func FramesPerDuration(codec *core.Codec, duration time.Duration) int { + return int(time.Duration(codec.ClockRate) * duration / time.Second) +} + +func BytesPerDuration(codec *core.Codec, duration time.Duration) int { + return BytesPerFrame(codec) * FramesPerDuration(codec, duration) +} diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index 13405ad4..5395621e 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -1,13 +1,25 @@ package pcm -import "github.com/AlexxIT/go2rtc/pkg/core" +import ( + "math" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func ceil(x float32) int { + d, fract := math.Modf(float64(x)) + if fract == 0.0 { + return int(d) + } + return int(d) + 1 +} func Downsample(k float32) func([]int16) []int16 { var sampleN, sampleSum float32 return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int((float32(len(src))+sampleN)/k)) + dst = make([]int16, ceil((float32(len(src))+sampleN)/k)) for _, sample := range src { sampleSum += float32(sample) sampleN++ @@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 { return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int(k*float32(len(src)))) + dst = make([]int16, ceil(k*float32(len(src)))) for _, sample := range src { sampleN += k for sampleN > 0 { diff --git a/pkg/pcm/producer_sync.go b/pkg/pcm/producer_sync.go new file mode 100644 index 00000000..fedef268 --- /dev/null +++ b/pkg/pcm/producer_sync.go @@ -0,0 +1,96 @@ +package pcm + +import ( + "io" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type ProducerSync struct { + core.Connection + src *core.Codec + rd io.Reader + onClose func() +} + +func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: ProducerCodecs(), + }, + } + + return &ProducerSync{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Medias: medias, + Transport: rd, + }, + src: codec, + rd: rd, + } +} + +func (p *ProducerSync) OnClose(f func()) { + p.onClose = f +} + +func (p *ProducerSync) Start() error { + if len(p.Receivers) == 0 { + return nil + } + + var pktSeq uint16 + var pktTS uint32 // time in frames + var pktTime time.Duration // time in seconds + + t0 := time.Now() + + dst := p.Receivers[0].Codec + transcode := Transcode(dst, p.src) + + const chunkDuration = 20 * time.Millisecond + chunkBytes := BytesPerDuration(p.src, chunkDuration) + chunkFrames := uint32(FramesPerDuration(dst, chunkDuration)) + + for { + buf := make([]byte, chunkBytes) + n, _ := io.ReadFull(p.rd, buf) + + if n == 0 { + break + } + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: pktSeq, + Timestamp: pktTS, + }, + Payload: transcode(buf[:n]), + } + + if d := pktTime - time.Since(t0); d > 0 { + time.Sleep(d) + } + + p.Receivers[0].WriteRTP(pkt) + p.Recv += n + + pktSeq++ + pktTS += chunkFrames + pktTime += chunkDuration + } + + if p.onClose != nil { + p.onClose() + } + + return nil +} diff --git a/pkg/wav/producer.go b/pkg/wav/producer.go index b9b3a878..60bdeaa1 100644 --- a/pkg/wav/producer.go +++ b/pkg/wav/producer.go @@ -2,7 +2,6 @@ package wav import ( "bufio" - "encoding/binary" "errors" "io" @@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) { // https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html rd := bufio.NewReaderSize(r, core.BufferSize) - // skip Master RIFF chunk - if _, err := rd.Discard(12); err != nil { + codec, err := ReadHeader(r) + if err != nil { return nil, err } - codec := &core.Codec{} - - for { - chunkID, data, err := readChunk(rd) - if err != nil { - return nil, err - } - - if chunkID == "data" { - break - } - - if chunkID == "fmt " { - // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt - switch data[0] { - case 1: - codec.Name = core.CodecPCML - case 6: - codec.Name = core.CodecPCMA - case 7: - codec.Name = core.CodecPCMU - } - - codec.Channels = data[2] - codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) - } - } - if codec.Name == "" { return nil, errors.New("waw: unsupported codec") } @@ -110,18 +81,3 @@ func (c *Producer) Start() error { ts += PacketSize } } - -func readChunk(r io.Reader) (chunkID string, data []byte, err error) { - b := make([]byte, 8) - if _, err = io.ReadFull(r, b); err != nil { - return - } - - if chunkID = string(b[:4]); chunkID != "data" { - size := binary.LittleEndian.Uint32(b[4:]) - data = make([]byte, size) - _, err = io.ReadFull(r, data) - } - - return -} diff --git a/pkg/wav/wav.go b/pkg/wav/wav.go index bf48fdf9..9fe857d4 100644 --- a/pkg/wav/wav.go +++ b/pkg/wav/wav.go @@ -2,6 +2,7 @@ package wav import ( "encoding/binary" + "io" "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -48,3 +49,55 @@ func Header(codec *core.Codec) []byte { return b } + +func ReadHeader(r io.Reader) (*core.Codec, error) { + // skip Master RIFF chunk + if _, err := io.ReadFull(r, make([]byte, 12)); err != nil { + return nil, err + } + + var codec core.Codec + + for { + chunkID, data, err := readChunk(r) + if err != nil { + return nil, err + } + + if chunkID == "data" { + break + } + + if chunkID == "fmt " { + // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt + switch data[0] { + case 1: + codec.Name = core.CodecPCML + case 6: + codec.Name = core.CodecPCMA + case 7: + codec.Name = core.CodecPCMU + } + + codec.Channels = data[2] + codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) + } + } + + return &codec, nil +} + +func readChunk(r io.Reader) (chunkID string, data []byte, err error) { + b := make([]byte, 8) + if _, err = io.ReadFull(r, b); err != nil { + return + } + + if chunkID = string(b[:4]); chunkID != "data" { + size := binary.LittleEndian.Uint32(b[4:]) + data = make([]byte, size) + _, err = io.ReadFull(r, data) + } + + return +} diff --git a/pkg/wyoming/expr.go b/pkg/wyoming/expr.go index 1b184cc3..f2f58933 100644 --- a/pkg/wyoming/expr.go +++ b/pkg/wyoming/expr.go @@ -1,11 +1,13 @@ package wyoming import ( + "bytes" "fmt" + "os" "time" "github.com/AlexxIT/go2rtc/pkg/expr" - "golang.org/x/net/context" + "github.com/AlexxIT/go2rtc/pkg/wav" ) type env struct { @@ -109,16 +111,21 @@ func (s *satellite) WriteEvent(args ...string) bool { } func (s *satellite) PlayAudio() bool { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio)) +} - prod := newSndProducer(s.sndAudio, cancel) - if err := s.srv.SndHandler(prod); err != nil { +func (s *satellite) PlayFile(path string) bool { + f, err := os.Open(path) + if err != nil { return false - } else { - <-ctx.Done() - return true } + + codec, err := wav.ReadHeader(f) + if err != nil { + return false + } + + return s.playAudio(codec, f) } func (e *env) Sleep(s string) bool { diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index c45bc50e..0c0e6f30 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -1,10 +1,11 @@ package wyoming import ( + "context" "fmt" + "io" "net" "sync" - "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/pcm" @@ -55,13 +56,8 @@ func (s *Server) Handle(conn net.Conn) { sat.sndAudio = sat.sndAudio[:0] case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} sat.sndAudio = append(sat.sndAudio, evt.Payload...) - } - - if s.Event == nil || s.Event[evt.Type] == "" { - sat.handleEvent(evt) - } else { - // run async because there may be sleeps - go sat.handleScript(evt) + default: + sat.handleScript(evt) } } } @@ -196,6 +192,21 @@ func (s *satellite) onMicChunk(chunk []byte) { s.micTS += len(chunk) / 2 } +func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prod := pcm.OpenSync(codec, rd) + prod.OnClose(cancel) + + if err := s.srv.SndHandler(prod); err != nil { + return false + } else { + <-ctx.Done() + return true + } +} + type micConsumer struct { core.Connection onData func(chunk []byte) @@ -247,90 +258,6 @@ func (c *micConsumer) Stop() error { return c.Connection.Stop() } -type sndProducer struct { - core.Connection - data []byte - onClose func() -} - -func newSndProducer(data []byte, onClose func()) *sndProducer { - medias := []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: pcm.ProducerCodecs(), - }, - } - - return &sndProducer{ - core.Connection{ - ID: core.NewID(), - FormatName: "wyoming", - Protocol: "tcp", - Medias: medias, - }, - data, - onClose, - } -} - -func (s *sndProducer) Start() error { - if len(s.Receivers) == 0 { - return nil - } - - var pts time.Duration - var seq uint16 - - t0 := time.Now() - - src := &core.Codec{Name: core.CodecPCML, ClockRate: 22050} - dst := s.Receivers[0].Codec - f := pcm.Transcode(dst, src) - - bps := uint32(pcm.BytesPerFrame(dst)) - - chunkBytes := int(2 * src.ClockRate / 50) // 20ms - - for { - n := len(s.data) - if n == 0 { - break - } - if chunkBytes > n { - chunkBytes = n - } - - pkt := &core.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - SequenceNumber: seq, - Timestamp: uint32(s.Recv/2) * bps, - }, - Payload: f(s.data[:chunkBytes]), - } - - if d := pts - time.Since(t0); d > 0 { - time.Sleep(d) - } - - s.Receivers[0].WriteRTP(pkt) - - s.Recv += chunkBytes - s.data = s.data[chunkBytes:] - - pts += 20 * time.Millisecond - seq++ - } - - if s.onClose != nil { - s.onClose() - } - - return nil -} - func repack(handler core.HandlerFunc) core.HandlerFunc { const PacketSize = 2 * 16000 / 50 // 20ms diff --git a/pkg/wyoming/snd.go b/pkg/wyoming/snd.go index 822c1ed4..e26ca7ea 100644 --- a/pkg/wyoming/snd.go +++ b/pkg/wyoming/snd.go @@ -1,8 +1,11 @@ package wyoming import ( + "bytes" "net" - "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" ) func (s *Server) HandleSnd(conn net.Conn) { @@ -25,9 +28,7 @@ func (s *Server) HandleSnd(conn net.Conn) { case "audio-chunk": snd = append(snd, evt.Payload...) case "audio-stop": - prod := newSndProducer(snd, func() { - time.Sleep(time.Second) // some extra delay before close - }) + prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd)) if err = s.SndHandler(prod); err != nil { s.Error("snd error: %s", err) return @@ -35,3 +36,5 @@ func (s *Server) HandleSnd(conn net.Conn) { } } } + +var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050}