mirror of
https://github.com/hsnks100/liveflow.git
synced 2025-09-26 20:21:12 +08:00
Fix whep's memory leak
fix3
This commit is contained in:
20
Dockerfile
20
Dockerfile
@@ -1,6 +1,4 @@
|
||||
# ubuntu
|
||||
#FROM ubuntu:latest
|
||||
FROM golang:1.21-bullseye
|
||||
FROM golang:1.23-bullseye
|
||||
RUN apt-get update
|
||||
RUN apt-get upgrade -y
|
||||
RUN apt-get install -y build-essential git pkg-config libunistring-dev libaom-dev libdav1d-dev bzip2 nasm wget yasm ca-certificates
|
||||
@@ -8,12 +6,14 @@ COPY install-ffmpeg.sh /install-ffmpeg.sh
|
||||
RUN chmod +x /install-ffmpeg.sh && /install-ffmpeg.sh
|
||||
ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH}
|
||||
ENV PATH="/usr/local/go/bin:${PATH}"
|
||||
COPY ./ /app
|
||||
WORKDIR /app
|
||||
RUN go mod download
|
||||
RUN go build -o /app/bin/liveflow
|
||||
RUN cp config.toml /app/bin/config.toml
|
||||
RUN cp -r static /app/bin/static
|
||||
COPY go.mod go.sum ./
|
||||
RUN --mount=type=cache,target=/go/pkg/mod \
|
||||
go mod download
|
||||
COPY ./ /app
|
||||
RUN mkdir -p /app/bin/videos
|
||||
WORKDIR /app/bin
|
||||
ENTRYPOINT ["/app/bin/liveflow"]
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build \
|
||||
go build -o /app
|
||||
WORKDIR /app
|
||||
ENV GOGC=10
|
||||
ENTRYPOINT ["/app/liveflow"]
|
||||
|
@@ -1,16 +1,16 @@
|
||||
# docker-compose
|
||||
version: "3.4"
|
||||
version: "2.1"
|
||||
services:
|
||||
liveflow:
|
||||
image: liveflow_custom:latest
|
||||
mem_limit: 350m
|
||||
stdin_open: true # docker run -i
|
||||
tty: true # docker run -t
|
||||
volumes:
|
||||
- "~/.store:/app/bin/videos"
|
||||
- "~/.store:/app/videos"
|
||||
ports:
|
||||
- "8044:8044"
|
||||
- "1930:1930"
|
||||
- "40000-41000:40000-41000/udp"
|
||||
- "40000-40010:40000-40010/udp"
|
||||
environment:
|
||||
DOCKER_MODE: "true"
|
||||
build:
|
||||
|
@@ -1,4 +1,4 @@
|
||||
FROM golang:1.21-bullseye
|
||||
FROM golang:1.23-bullseye
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get upgrade -y && \
|
||||
@@ -10,15 +10,17 @@ RUN chmod +x /install-ffmpeg-lsan.sh && /install-ffmpeg-lsan.sh
|
||||
ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH}
|
||||
ENV PATH="/usr/local/go/bin:${PATH}"
|
||||
|
||||
COPY ./ /app
|
||||
WORKDIR /app
|
||||
|
||||
COPY go.mod go.sum ./
|
||||
ENV CGO_LDFLAGS='-fsanitize=address'
|
||||
ENV CGO_CFLAGS='-fsanitize=address'
|
||||
RUN go mod download
|
||||
RUN go build -o /app/bin/liveflow
|
||||
RUN cp config.toml /app/bin/config.toml
|
||||
RUN cp -r static /app/bin/static
|
||||
RUN --mount=type=cache,target=/go/pkg/mod \
|
||||
go mod download
|
||||
COPY ./ /app
|
||||
RUN mkdir -p /app/bin/videos
|
||||
WORKDIR /app/bin
|
||||
ENTRYPOINT ["/app/bin/liveflow"]
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build \
|
||||
go build -o /app
|
||||
WORKDIR /app
|
||||
ENV GOGC 10
|
||||
ENV ASAN_OPTIONS quarantine_size_mb=32
|
||||
ENTRYPOINT ["/app/liveflow"]
|
29
main.go
29
main.go
@@ -5,7 +5,10 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // pprof을 사용하기 위한 패키지
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"liveflow/config"
|
||||
"liveflow/media/streamer/egress/hls"
|
||||
@@ -28,9 +31,18 @@ import (
|
||||
"liveflow/media/streamer/ingress/rtmp"
|
||||
)
|
||||
|
||||
/*
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
void __lsan_do_leak_check(void);
|
||||
void leak_bit() {
|
||||
int *p = (int *)malloc(sizeof(int));
|
||||
}
|
||||
*/
|
||||
import "C"
|
||||
|
||||
// RTMP 받으면 자동으로 Service 서비스 동작, 녹화 서비스까지~?
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
viper.SetConfigName("config") // name of config file (without extension)
|
||||
viper.SetConfigType("toml") // REQUIRED if the config file does not have the extension in the name
|
||||
viper.AddConfigPath(".") // optionally look for config in the working directory
|
||||
@@ -46,10 +58,16 @@ func main() {
|
||||
}
|
||||
fmt.Printf("Config: %+v\n", conf)
|
||||
log.Init()
|
||||
//log.SetCaller(ctx, true)
|
||||
//log.SetFormatter(ctx, &logrus.JSONFormatter{
|
||||
// TimestampFormat: "2006-01-02 15:04:05",
|
||||
//})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
sig := <-sigs
|
||||
log.Info(ctx, "Received signal:", sig)
|
||||
log.Info(ctx, "Initiating shutdown...")
|
||||
//C.__lsan_do_leak_check()
|
||||
cancel()
|
||||
}()
|
||||
ctx = log.WithFields(ctx, logrus.Fields{
|
||||
"app": "liveflow",
|
||||
})
|
||||
@@ -124,6 +142,7 @@ func main() {
|
||||
Tracks: tracks,
|
||||
Hub: hub,
|
||||
})
|
||||
_ = whep
|
||||
err = whep.Start(ctx, source)
|
||||
if err != nil {
|
||||
log.Errorf(ctx, "failed to start whep: %v", err)
|
||||
|
@@ -106,7 +106,8 @@ func (h *Hub) Publish(streamID string, data *FrameData) {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
for _, ch := range h.streams[streamID] {
|
||||
channels := h.streams[streamID]
|
||||
for _, ch := range channels {
|
||||
select {
|
||||
case ch <- data:
|
||||
case <-ctx.Done():
|
||||
@@ -135,7 +136,7 @@ func (h *Hub) Subscribe(streamID string) <-chan *FrameData {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
ch := make(chan *FrameData)
|
||||
ch := make(chan *FrameData, 1000)
|
||||
h.streams[streamID] = append(h.streams[streamID], ch)
|
||||
return ch
|
||||
}
|
||||
@@ -157,11 +158,3 @@ func (h *Hub) RemoveStream(streamID string) {
|
||||
delete(h.streams, streamID)
|
||||
}
|
||||
}
|
||||
|
||||
//func checkLeak() {
|
||||
// go func() {
|
||||
// fmt.Println("will check leak")
|
||||
// time.Sleep(3 * time.Second)
|
||||
// C.__lsan_do_leak_check()
|
||||
// }()
|
||||
//}
|
||||
|
@@ -1,5 +1,10 @@
|
||||
package whep
|
||||
|
||||
// #include <stdio.h>
|
||||
// #include <stdlib.h>
|
||||
//
|
||||
// void __lsan_do_leak_check(void);
|
||||
import "C"
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
@@ -84,40 +89,68 @@ func (w *WHEP) Start(ctx context.Context, source hub.Source) error {
|
||||
if audioTranscodingProcess == nil {
|
||||
audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate)
|
||||
audioTranscodingProcess.Init()
|
||||
defer audioTranscodingProcess.Close()
|
||||
}
|
||||
w.addAudioTrack(ctx, source.StreamID())
|
||||
err := w.onAACAudio(ctx, source, data.AACAudio, audioTranscodingProcess)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to process AAC audio")
|
||||
}
|
||||
} else {
|
||||
if data.OPUSAudio != nil {
|
||||
err := w.onAudio(source, data.OPUSAudio)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to process OPUS audio")
|
||||
}
|
||||
} else if data.OPUSAudio != nil {
|
||||
w.addAudioTrack(ctx, source.StreamID())
|
||||
err := w.onAudio(source, data.OPUSAudio)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to process OPUS audio")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if audioTranscodingProcess != nil {
|
||||
audioTranscodingProcess.Close()
|
||||
}
|
||||
log.Info(ctx, "end whep")
|
||||
//C.__lsan_do_leak_check()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
|
||||
func (w *WHEP) addVideoTrack(streamID string, videoClockRate uint32) error {
|
||||
if w.videoTrack == nil {
|
||||
var err error
|
||||
w.videoTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.videoTrack)
|
||||
w.tracks[streamID] = append(w.tracks[streamID], w.videoTrack)
|
||||
ssrc := uint32(110)
|
||||
const (
|
||||
h264PayloadType = 96
|
||||
mtu = 1400
|
||||
)
|
||||
w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), h264Video.VideoClockRate)
|
||||
w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), videoClockRate)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WHEP) addAudioTrack(ctx context.Context, streamID string) error {
|
||||
if w.audioTrack == nil {
|
||||
var err error
|
||||
w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to create audio track")
|
||||
}
|
||||
w.tracks[streamID] = append(w.tracks[streamID], w.audioTrack)
|
||||
ssrc := uint32(111)
|
||||
const (
|
||||
opusPayloadType = 111
|
||||
mtu = 1400
|
||||
)
|
||||
w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), 48000)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
|
||||
w.addVideoTrack(source.StreamID(), h264Video.VideoClockRate)
|
||||
|
||||
videoDuration := h264Video.DTS - w.lastVideoTimestamp
|
||||
videoPackets := w.videoPacketizer.Packetize(h264Video.Data, uint32(videoDuration))
|
||||
@@ -132,21 +165,6 @@ func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error {
|
||||
}
|
||||
|
||||
func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error {
|
||||
if w.audioTrack == nil {
|
||||
var err error
|
||||
w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.audioTrack)
|
||||
ssrc := uint32(111)
|
||||
const (
|
||||
opusPayloadType = 111
|
||||
mtu = 1400
|
||||
)
|
||||
w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), opusAudio.AudioClockRate)
|
||||
}
|
||||
|
||||
audioDuration := opusAudio.DTS - w.lastAudioTimestamp
|
||||
audioPackets := w.audioPacketizer.Packetize(opusAudio.Data, uint32(audioDuration))
|
||||
|
||||
@@ -162,42 +180,6 @@ func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WHEP) syncAndSendPackets() error {
|
||||
for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 {
|
||||
videoPacket := w.videoBuffer[0]
|
||||
audioPacket := w.audioBuffer[0]
|
||||
// Remove lagging packet from buffer
|
||||
if videoPacket.timestamp <= audioPacket.timestamp {
|
||||
// If audio is ahead, remove video from buffer
|
||||
if len(w.videoBuffer) > 100 {
|
||||
w.videoBuffer = append([]*packetWithTimestamp{}, w.videoBuffer[1:]...)
|
||||
} else {
|
||||
w.videoBuffer = w.videoBuffer[1:]
|
||||
}
|
||||
if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// If video is ahead, remove audio from buffer
|
||||
if len(w.audioBuffer) > 100 {
|
||||
w.audioBuffer = append([]*packetWithTimestamp{}, w.audioBuffer[1:]...)
|
||||
} else {
|
||||
w.audioBuffer = w.audioBuffer[1:]
|
||||
}
|
||||
if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func abs(x int64) int64 {
|
||||
if x < 0 {
|
||||
return -x
|
||||
}
|
||||
return x
|
||||
}
|
||||
func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) error {
|
||||
if len(aac.Data) == 0 {
|
||||
log.Warn(ctx, "no data")
|
||||
@@ -233,3 +215,27 @@ func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAu
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WHEP) syncAndSendPackets() error {
|
||||
for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 {
|
||||
videoPacket := w.videoBuffer[0]
|
||||
audioPacket := w.audioBuffer[0]
|
||||
// Remove lagging packet from buffer
|
||||
if videoPacket.timestamp <= audioPacket.timestamp {
|
||||
// If audio is ahead, remove video from buffer
|
||||
w.videoBuffer[0] = nil
|
||||
w.videoBuffer = w.videoBuffer[1:]
|
||||
if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// If video is ahead, remove audio from buffer
|
||||
w.audioBuffer[0] = nil
|
||||
w.audioBuffer = w.audioBuffer[1:]
|
||||
if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -4,11 +4,12 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"liveflow/media/streamer/ingress"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"liveflow/media/streamer/ingress"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
@@ -282,6 +283,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
|
||||
}
|
||||
streamKey, err := r.bearerToken(c)
|
||||
if err != nil {
|
||||
log.Error(context.Background(), err, "failed to get stream key")
|
||||
return c.JSON(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
|
||||
@@ -289,11 +291,12 @@ func (r *WHIP) whepHandler(c echo.Context) error {
|
||||
m := &webrtc.MediaEngine{}
|
||||
err = registerCodec(m)
|
||||
if err != nil {
|
||||
log.Error(context.Background(), err, "failed to register codec")
|
||||
return c.JSON(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
|
||||
se := webrtc.SettingEngine{}
|
||||
se.SetEphemeralUDPPortRange(30000, 30500)
|
||||
se.SetEphemeralUDPPortRange(40000, 40010)
|
||||
if r.dockerMode {
|
||||
se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost)
|
||||
}
|
||||
@@ -301,6 +304,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(se))
|
||||
peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration)
|
||||
if err != nil {
|
||||
log.Error(context.Background(), err, "failed to create peer connection")
|
||||
return c.JSON(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
|
||||
@@ -309,6 +313,7 @@ func (r *WHIP) whepHandler(c echo.Context) error {
|
||||
for _, track := range r.tracks[streamKey] {
|
||||
sender, err := peerConnection.AddTrack(track)
|
||||
if err != nil {
|
||||
log.Error(context.Background(), err, "failed to add track")
|
||||
return c.JSON(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
rtpSenders = append(rtpSenders, sender)
|
||||
|
@@ -4,10 +4,11 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"liveflow/log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"liveflow/log"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/interceptor/pkg/intervalpli"
|
||||
@@ -125,7 +126,7 @@ func (r *WHIP) whipHandler(c echo.Context) error {
|
||||
|
||||
// Create the API object with the MediaEngine
|
||||
se := webrtc.SettingEngine{}
|
||||
se.SetEphemeralUDPPortRange(30000, 30500)
|
||||
se.SetEphemeralUDPPortRange(40000, 40010)
|
||||
if r.dockerMode {
|
||||
se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost)
|
||||
se.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeUDP4})
|
||||
|
@@ -5,10 +5,9 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"liveflow/log"
|
||||
"liveflow/media/streamer/pipe"
|
||||
|
||||
astiav "github.com/asticode/go-astiav"
|
||||
|
||||
"liveflow/log"
|
||||
)
|
||||
|
||||
type MediaPacket struct {
|
||||
@@ -18,7 +17,7 @@ type MediaPacket struct {
|
||||
SampleRate int
|
||||
}
|
||||
type AudioTranscodingProcess struct {
|
||||
pipe.BaseProcess[*MediaPacket, []*MediaPacket]
|
||||
//pipe.BaseProcess[*MediaPacket, []*MediaPacket]
|
||||
decCodecID astiav.CodecID
|
||||
encCodecID astiav.CodecID
|
||||
decCodec *astiav.Codec
|
||||
@@ -110,28 +109,32 @@ func (t *AudioTranscodingProcess) Close() {
|
||||
|
||||
func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, error) {
|
||||
ctx := context.Background()
|
||||
frame := astiav.AllocFrame()
|
||||
defer frame.Free()
|
||||
packet := astiav.AllocPacket()
|
||||
defer packet.Free()
|
||||
err := packet.FromData(data.Data)
|
||||
if err != nil {
|
||||
if err := packet.FromData(data.Data); err != nil {
|
||||
log.Error(ctx, err, "failed to create packet")
|
||||
}
|
||||
packet.SetPts(data.PTS)
|
||||
packet.SetDts(data.DTS)
|
||||
err = t.decCodecContext.SendPacket(packet)
|
||||
if err != nil {
|
||||
if err := t.decCodecContext.SendPacket(packet); err != nil {
|
||||
log.Error(ctx, err, "failed to send packet")
|
||||
}
|
||||
|
||||
frameToSend := astiav.AllocFrame()
|
||||
defer frameToSend.Free()
|
||||
if t.audioFifo == nil {
|
||||
t.audioFifo = astiav.AllocAudioFifo(
|
||||
t.encCodecContext.SampleFormat(),
|
||||
t.encCodecContext.ChannelLayout().Channels(),
|
||||
t.encCodecContext.SampleRate())
|
||||
t.encCodecContext.SampleRate(),
|
||||
)
|
||||
}
|
||||
|
||||
var opusAudio []*MediaPacket
|
||||
|
||||
for {
|
||||
frame := astiav.AllocFrame()
|
||||
defer frame.Free()
|
||||
err := t.decCodecContext.ReceiveFrame(frame)
|
||||
if errors.Is(err, astiav.ErrEof) {
|
||||
fmt.Println("EOF: ", err.Error())
|
||||
@@ -140,21 +143,21 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
|
||||
break
|
||||
}
|
||||
t.audioFifo.Write(frame)
|
||||
nbSamples := 0
|
||||
|
||||
// check whether we have enough samples to encode
|
||||
for t.audioFifo.Size() >= t.encCodecContext.FrameSize() {
|
||||
frameToSend := astiav.AllocFrame()
|
||||
defer frameToSend.Free()
|
||||
// initialize frameToSend before reusing it
|
||||
frameToSend.Unref()
|
||||
frameToSend.SetNbSamples(t.encCodecContext.FrameSize())
|
||||
frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout()) // t.encCodecContext.ChannelLayout())
|
||||
frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout())
|
||||
frameToSend.SetSampleFormat(t.encCodecContext.SampleFormat())
|
||||
frameToSend.SetSampleRate(t.encCodecContext.SampleRate())
|
||||
frameToSend.SetPts(t.lastPts + int64(t.encCodecContext.FrameSize()))
|
||||
t.lastPts = frameToSend.Pts()
|
||||
nbSamples += frame.NbSamples()
|
||||
err := frameToSend.AllocBuffer(0)
|
||||
if err != nil {
|
||||
if err := frameToSend.AllocBuffer(0); err != nil {
|
||||
log.Error(ctx, err, "failed to alloc buffer")
|
||||
}
|
||||
t.lastPts = frameToSend.Pts()
|
||||
|
||||
read, err := t.audioFifo.Read(frameToSend)
|
||||
if err != nil {
|
||||
log.Error(ctx, err, "failed to read fifo")
|
||||
@@ -162,14 +165,13 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
|
||||
if read < frameToSend.NbSamples() {
|
||||
log.Error(ctx, err, "failed to read fifo")
|
||||
}
|
||||
// Encode the frame
|
||||
err = t.encCodecContext.SendFrame(frameToSend)
|
||||
if err != nil {
|
||||
|
||||
if err := t.encCodecContext.SendFrame(frameToSend); err != nil {
|
||||
log.Error(ctx, err, "failed to send frame")
|
||||
}
|
||||
|
||||
pkt := astiav.AllocPacket()
|
||||
for {
|
||||
pkt := astiav.AllocPacket()
|
||||
defer pkt.Free()
|
||||
err := t.encCodecContext.ReceivePacket(pkt)
|
||||
if errors.Is(err, astiav.ErrEof) {
|
||||
fmt.Println("EOF: ", err.Error())
|
||||
@@ -183,12 +185,12 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er
|
||||
DTS: pkt.Dts(),
|
||||
SampleRate: t.encCodecContext.SampleRate(),
|
||||
})
|
||||
pkt.Unref()
|
||||
}
|
||||
pkt.Free()
|
||||
}
|
||||
frame.Unref()
|
||||
}
|
||||
select {
|
||||
case t.ResultChan() <- opusAudio:
|
||||
default:
|
||||
}
|
||||
|
||||
return opusAudio, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user