From 49a0465523cbe95d671e323a4e4e340ccf7e9147 Mon Sep 17 00:00:00 2001 From: Han Gyoung-Su Date: Mon, 17 Mar 2025 02:35:41 +0900 Subject: [PATCH] Fix whep's memory leak (#17) Adopt address sanitizer for ffmpeg --- Dockerfile | 20 ++-- docker-compose.yaml | 8 +- lsan.Dockerfile | 20 ++-- main.go | 29 +++++- media/hub/hub.go | 13 +-- media/streamer/egress/whep/whep.go | 128 +++++++++++++------------ media/streamer/ingress/whip/handler.go | 9 +- media/streamer/ingress/whip/serve.go | 5 +- media/streamer/processes/transcoder.go | 58 +++++------ 9 files changed, 159 insertions(+), 131 deletions(-) diff --git a/Dockerfile b/Dockerfile index cfc1145..3298743 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,4 @@ -# ubuntu -#FROM ubuntu:latest -FROM golang:1.21-bullseye +FROM golang:1.23-bullseye RUN apt-get update RUN apt-get upgrade -y RUN apt-get install -y build-essential git pkg-config libunistring-dev libaom-dev libdav1d-dev bzip2 nasm wget yasm ca-certificates @@ -8,12 +6,14 @@ COPY install-ffmpeg.sh /install-ffmpeg.sh RUN chmod +x /install-ffmpeg.sh && /install-ffmpeg.sh ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH} ENV PATH="/usr/local/go/bin:${PATH}" -COPY ./ /app WORKDIR /app -RUN go mod download -RUN go build -o /app/bin/liveflow -RUN cp config.toml /app/bin/config.toml -RUN cp -r static /app/bin/static +COPY go.mod go.sum ./ +RUN --mount=type=cache,target=/go/pkg/mod \ + go mod download +COPY ./ /app RUN mkdir -p /app/bin/videos -WORKDIR /app/bin -ENTRYPOINT ["/app/bin/liveflow"] +RUN --mount=type=cache,target=/root/.cache/go-build \ + go build -o /app +WORKDIR /app +ENV GOGC=10 +ENTRYPOINT ["/app/liveflow"] diff --git a/docker-compose.yaml b/docker-compose.yaml index bde8fc4..199bbc2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,16 +1,16 @@ -# docker-compose -version: "3.4" +version: "2.1" services: liveflow: image: liveflow_custom:latest + mem_limit: 350m stdin_open: true # docker run -i tty: true # docker run -t volumes: - - "~/.store:/app/bin/videos" + - "~/.store:/app/videos" ports: - "8044:8044" - "1930:1930" - - "40000-41000:40000-41000/udp" + - "40000-40010:40000-40010/udp" environment: DOCKER_MODE: "true" build: diff --git a/lsan.Dockerfile b/lsan.Dockerfile index 7b9080d..71b062d 100644 --- a/lsan.Dockerfile +++ b/lsan.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21-bullseye +FROM golang:1.23-bullseye RUN apt-get update && \ apt-get upgrade -y && \ @@ -10,15 +10,17 @@ RUN chmod +x /install-ffmpeg-lsan.sh && /install-ffmpeg-lsan.sh ENV PKG_CONFIG_PATH=/ffmpeg_build/lib/pkgconfig:${PKG_CONFIG_PATH} ENV PATH="/usr/local/go/bin:${PATH}" -COPY ./ /app WORKDIR /app - +COPY go.mod go.sum ./ ENV CGO_LDFLAGS='-fsanitize=address' ENV CGO_CFLAGS='-fsanitize=address' -RUN go mod download -RUN go build -o /app/bin/liveflow -RUN cp config.toml /app/bin/config.toml -RUN cp -r static /app/bin/static +RUN --mount=type=cache,target=/go/pkg/mod \ + go mod download +COPY ./ /app RUN mkdir -p /app/bin/videos -WORKDIR /app/bin -ENTRYPOINT ["/app/bin/liveflow"] \ No newline at end of file +RUN --mount=type=cache,target=/root/.cache/go-build \ + go build -o /app +WORKDIR /app +ENV GOGC 10 +ENV ASAN_OPTIONS quarantine_size_mb=32 +ENTRYPOINT ["/app/liveflow"] \ No newline at end of file diff --git a/main.go b/main.go index c92d168..822ccb7 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,10 @@ import ( "fmt" "net/http" _ "net/http/pprof" // pprof을 사용하기 위한 패키지 + "os" + "os/signal" "strconv" + "syscall" "liveflow/config" "liveflow/media/streamer/egress/hls" @@ -28,9 +31,18 @@ import ( "liveflow/media/streamer/ingress/rtmp" ) +/* +#include +#include +void __lsan_do_leak_check(void); +void leak_bit() { + int *p = (int *)malloc(sizeof(int)); +} +*/ +import "C" + // RTMP 받으면 자동으로 Service 서비스 동작, 녹화 서비스까지~? func main() { - ctx := context.Background() viper.SetConfigName("config") // name of config file (without extension) viper.SetConfigType("toml") // REQUIRED if the config file does not have the extension in the name viper.AddConfigPath(".") // optionally look for config in the working directory @@ -46,10 +58,16 @@ func main() { } fmt.Printf("Config: %+v\n", conf) log.Init() - //log.SetCaller(ctx, true) - //log.SetFormatter(ctx, &logrus.JSONFormatter{ - // TimestampFormat: "2006-01-02 15:04:05", - //}) + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + log.Info(ctx, "Received signal:", sig) + log.Info(ctx, "Initiating shutdown...") + //C.__lsan_do_leak_check() + cancel() + }() ctx = log.WithFields(ctx, logrus.Fields{ "app": "liveflow", }) @@ -124,6 +142,7 @@ func main() { Tracks: tracks, Hub: hub, }) + _ = whep err = whep.Start(ctx, source) if err != nil { log.Errorf(ctx, "failed to start whep: %v", err) diff --git a/media/hub/hub.go b/media/hub/hub.go index b64371f..dcba0d0 100644 --- a/media/hub/hub.go +++ b/media/hub/hub.go @@ -106,7 +106,8 @@ func (h *Hub) Publish(streamID string, data *FrameData) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - for _, ch := range h.streams[streamID] { + channels := h.streams[streamID] + for _, ch := range channels { select { case ch <- data: case <-ctx.Done(): @@ -135,7 +136,7 @@ func (h *Hub) Subscribe(streamID string) <-chan *FrameData { h.mu.RLock() defer h.mu.RUnlock() - ch := make(chan *FrameData) + ch := make(chan *FrameData, 1000) h.streams[streamID] = append(h.streams[streamID], ch) return ch } @@ -157,11 +158,3 @@ func (h *Hub) RemoveStream(streamID string) { delete(h.streams, streamID) } } - -//func checkLeak() { -// go func() { -// fmt.Println("will check leak") -// time.Sleep(3 * time.Second) -// C.__lsan_do_leak_check() -// }() -//} diff --git a/media/streamer/egress/whep/whep.go b/media/streamer/egress/whep/whep.go index 06f6d08..bfbda51 100644 --- a/media/streamer/egress/whep/whep.go +++ b/media/streamer/egress/whep/whep.go @@ -1,5 +1,10 @@ package whep +// #include +// #include +// +// void __lsan_do_leak_check(void); +import "C" import ( "context" "errors" @@ -84,40 +89,68 @@ func (w *WHEP) Start(ctx context.Context, source hub.Source) error { if audioTranscodingProcess == nil { audioTranscodingProcess = processes.NewTranscodingProcess(astiav.CodecIDAac, astiav.CodecIDOpus, audioSampleRate) audioTranscodingProcess.Init() - defer audioTranscodingProcess.Close() } + w.addAudioTrack(ctx, source.StreamID()) err := w.onAACAudio(ctx, source, data.AACAudio, audioTranscodingProcess) if err != nil { log.Error(ctx, err, "failed to process AAC audio") } - } else { - if data.OPUSAudio != nil { - err := w.onAudio(source, data.OPUSAudio) - if err != nil { - log.Error(ctx, err, "failed to process OPUS audio") - } + } else if data.OPUSAudio != nil { + w.addAudioTrack(ctx, source.StreamID()) + err := w.onAudio(source, data.OPUSAudio) + if err != nil { + log.Error(ctx, err, "failed to process OPUS audio") } } } + + if audioTranscodingProcess != nil { + audioTranscodingProcess.Close() + } + log.Info(ctx, "end whep") + //C.__lsan_do_leak_check() }() + return nil } -func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error { +func (w *WHEP) addVideoTrack(streamID string, videoClockRate uint32) error { if w.videoTrack == nil { var err error w.videoTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") if err != nil { return err } - w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.videoTrack) + w.tracks[streamID] = append(w.tracks[streamID], w.videoTrack) ssrc := uint32(110) const ( h264PayloadType = 96 mtu = 1400 ) - w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), h264Video.VideoClockRate) + w.videoPacketizer = rtp.NewPacketizer(mtu, h264PayloadType, ssrc, &codecs.H264Payloader{}, rtp.NewRandomSequencer(), videoClockRate) } + return nil +} + +func (w *WHEP) addAudioTrack(ctx context.Context, streamID string) error { + if w.audioTrack == nil { + var err error + w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") + if err != nil { + log.Error(ctx, err, "failed to create audio track") + } + w.tracks[streamID] = append(w.tracks[streamID], w.audioTrack) + ssrc := uint32(111) + const ( + opusPayloadType = 111 + mtu = 1400 + ) + w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), 48000) + } + return nil +} +func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error { + w.addVideoTrack(source.StreamID(), h264Video.VideoClockRate) videoDuration := h264Video.DTS - w.lastVideoTimestamp videoPackets := w.videoPacketizer.Packetize(h264Video.Data, uint32(videoDuration)) @@ -132,21 +165,6 @@ func (w *WHEP) onVideo(source hub.Source, h264Video *hub.H264Video) error { } func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error { - if w.audioTrack == nil { - var err error - w.audioTrack, err = webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") - if err != nil { - return err - } - w.tracks[source.StreamID()] = append(w.tracks[source.StreamID()], w.audioTrack) - ssrc := uint32(111) - const ( - opusPayloadType = 111 - mtu = 1400 - ) - w.audioPacketizer = rtp.NewPacketizer(mtu, opusPayloadType, ssrc, &codecs.OpusPayloader{}, rtp.NewRandomSequencer(), opusAudio.AudioClockRate) - } - audioDuration := opusAudio.DTS - w.lastAudioTimestamp audioPackets := w.audioPacketizer.Packetize(opusAudio.Data, uint32(audioDuration)) @@ -162,42 +180,6 @@ func (w *WHEP) onAudio(source hub.Source, opusAudio *hub.OPUSAudio) error { return nil } -func (w *WHEP) syncAndSendPackets() error { - for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 { - videoPacket := w.videoBuffer[0] - audioPacket := w.audioBuffer[0] - // Remove lagging packet from buffer - if videoPacket.timestamp <= audioPacket.timestamp { - // If audio is ahead, remove video from buffer - if len(w.videoBuffer) > 100 { - w.videoBuffer = append([]*packetWithTimestamp{}, w.videoBuffer[1:]...) - } else { - w.videoBuffer = w.videoBuffer[1:] - } - if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil { - return err - } - } else { - // If video is ahead, remove audio from buffer - if len(w.audioBuffer) > 100 { - w.audioBuffer = append([]*packetWithTimestamp{}, w.audioBuffer[1:]...) - } else { - w.audioBuffer = w.audioBuffer[1:] - } - if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil { - return err - } - } - } - return nil -} - -func abs(x int64) int64 { - if x < 0 { - return -x - } - return x -} func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAudio, transcodingProcess *processes.AudioTranscodingProcess) error { if len(aac.Data) == 0 { log.Warn(ctx, "no data") @@ -233,3 +215,27 @@ func (w *WHEP) onAACAudio(ctx context.Context, source hub.Source, aac *hub.AACAu } return nil } + +func (w *WHEP) syncAndSendPackets() error { + for len(w.videoBuffer) > 0 && len(w.audioBuffer) > 0 { + videoPacket := w.videoBuffer[0] + audioPacket := w.audioBuffer[0] + // Remove lagging packet from buffer + if videoPacket.timestamp <= audioPacket.timestamp { + // If audio is ahead, remove video from buffer + w.videoBuffer[0] = nil + w.videoBuffer = w.videoBuffer[1:] + if err := w.videoTrack.WriteRTP(videoPacket.packet); err != nil { + return err + } + } else { + // If video is ahead, remove audio from buffer + w.audioBuffer[0] = nil + w.audioBuffer = w.audioBuffer[1:] + if err := w.audioTrack.WriteRTP(audioPacket.packet); err != nil { + return err + } + } + } + return nil +} diff --git a/media/streamer/ingress/whip/handler.go b/media/streamer/ingress/whip/handler.go index 882eb34..27f8766 100644 --- a/media/streamer/ingress/whip/handler.go +++ b/media/streamer/ingress/whip/handler.go @@ -4,11 +4,12 @@ import ( "context" "fmt" "io" - "liveflow/media/streamer/ingress" "net/http" "strings" "time" + "liveflow/media/streamer/ingress" + "github.com/labstack/echo/v4" "github.com/pion/rtp" "github.com/pion/rtp/codecs" @@ -282,6 +283,7 @@ func (r *WHIP) whepHandler(c echo.Context) error { } streamKey, err := r.bearerToken(c) if err != nil { + log.Error(context.Background(), err, "failed to get stream key") return c.JSON(http.StatusInternalServerError, err.Error()) } @@ -289,11 +291,12 @@ func (r *WHIP) whepHandler(c echo.Context) error { m := &webrtc.MediaEngine{} err = registerCodec(m) if err != nil { + log.Error(context.Background(), err, "failed to register codec") return c.JSON(http.StatusInternalServerError, err.Error()) } se := webrtc.SettingEngine{} - se.SetEphemeralUDPPortRange(30000, 30500) + se.SetEphemeralUDPPortRange(40000, 40010) if r.dockerMode { se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost) } @@ -301,6 +304,7 @@ func (r *WHIP) whepHandler(c echo.Context) error { api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(se)) peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration) if err != nil { + log.Error(context.Background(), err, "failed to create peer connection") return c.JSON(http.StatusInternalServerError, err.Error()) } @@ -309,6 +313,7 @@ func (r *WHIP) whepHandler(c echo.Context) error { for _, track := range r.tracks[streamKey] { sender, err := peerConnection.AddTrack(track) if err != nil { + log.Error(context.Background(), err, "failed to add track") return c.JSON(http.StatusInternalServerError, err.Error()) } rtpSenders = append(rtpSenders, sender) diff --git a/media/streamer/ingress/whip/serve.go b/media/streamer/ingress/whip/serve.go index 5cf55c3..fdda137 100644 --- a/media/streamer/ingress/whip/serve.go +++ b/media/streamer/ingress/whip/serve.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "io" - "liveflow/log" "net/http" "strings" + "liveflow/log" + "github.com/labstack/echo/v4" "github.com/pion/interceptor" "github.com/pion/interceptor/pkg/intervalpli" @@ -125,7 +126,7 @@ func (r *WHIP) whipHandler(c echo.Context) error { // Create the API object with the MediaEngine se := webrtc.SettingEngine{} - se.SetEphemeralUDPPortRange(30000, 30500) + se.SetEphemeralUDPPortRange(40000, 40010) if r.dockerMode { se.SetNAT1To1IPs([]string{"127.0.0.1"}, webrtc.ICECandidateTypeHost) se.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeUDP4}) diff --git a/media/streamer/processes/transcoder.go b/media/streamer/processes/transcoder.go index cc47055..2515b2f 100644 --- a/media/streamer/processes/transcoder.go +++ b/media/streamer/processes/transcoder.go @@ -5,10 +5,9 @@ import ( "errors" "fmt" - "liveflow/log" - "liveflow/media/streamer/pipe" - astiav "github.com/asticode/go-astiav" + + "liveflow/log" ) type MediaPacket struct { @@ -18,7 +17,7 @@ type MediaPacket struct { SampleRate int } type AudioTranscodingProcess struct { - pipe.BaseProcess[*MediaPacket, []*MediaPacket] + //pipe.BaseProcess[*MediaPacket, []*MediaPacket] decCodecID astiav.CodecID encCodecID astiav.CodecID decCodec *astiav.Codec @@ -110,28 +109,32 @@ func (t *AudioTranscodingProcess) Close() { func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, error) { ctx := context.Background() + frame := astiav.AllocFrame() + defer frame.Free() packet := astiav.AllocPacket() defer packet.Free() - err := packet.FromData(data.Data) - if err != nil { + if err := packet.FromData(data.Data); err != nil { log.Error(ctx, err, "failed to create packet") } packet.SetPts(data.PTS) packet.SetDts(data.DTS) - err = t.decCodecContext.SendPacket(packet) - if err != nil { + if err := t.decCodecContext.SendPacket(packet); err != nil { log.Error(ctx, err, "failed to send packet") } + + frameToSend := astiav.AllocFrame() + defer frameToSend.Free() if t.audioFifo == nil { t.audioFifo = astiav.AllocAudioFifo( t.encCodecContext.SampleFormat(), t.encCodecContext.ChannelLayout().Channels(), - t.encCodecContext.SampleRate()) + t.encCodecContext.SampleRate(), + ) } + var opusAudio []*MediaPacket + for { - frame := astiav.AllocFrame() - defer frame.Free() err := t.decCodecContext.ReceiveFrame(frame) if errors.Is(err, astiav.ErrEof) { fmt.Println("EOF: ", err.Error()) @@ -140,21 +143,21 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er break } t.audioFifo.Write(frame) - nbSamples := 0 + + // check whether we have enough samples to encode for t.audioFifo.Size() >= t.encCodecContext.FrameSize() { - frameToSend := astiav.AllocFrame() - defer frameToSend.Free() + // initialize frameToSend before reusing it + frameToSend.Unref() frameToSend.SetNbSamples(t.encCodecContext.FrameSize()) - frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout()) // t.encCodecContext.ChannelLayout()) + frameToSend.SetChannelLayout(t.encCodecContext.ChannelLayout()) frameToSend.SetSampleFormat(t.encCodecContext.SampleFormat()) frameToSend.SetSampleRate(t.encCodecContext.SampleRate()) frameToSend.SetPts(t.lastPts + int64(t.encCodecContext.FrameSize())) - t.lastPts = frameToSend.Pts() - nbSamples += frame.NbSamples() - err := frameToSend.AllocBuffer(0) - if err != nil { + if err := frameToSend.AllocBuffer(0); err != nil { log.Error(ctx, err, "failed to alloc buffer") } + t.lastPts = frameToSend.Pts() + read, err := t.audioFifo.Read(frameToSend) if err != nil { log.Error(ctx, err, "failed to read fifo") @@ -162,14 +165,13 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er if read < frameToSend.NbSamples() { log.Error(ctx, err, "failed to read fifo") } - // Encode the frame - err = t.encCodecContext.SendFrame(frameToSend) - if err != nil { + + if err := t.encCodecContext.SendFrame(frameToSend); err != nil { log.Error(ctx, err, "failed to send frame") } + + pkt := astiav.AllocPacket() for { - pkt := astiav.AllocPacket() - defer pkt.Free() err := t.encCodecContext.ReceivePacket(pkt) if errors.Is(err, astiav.ErrEof) { fmt.Println("EOF: ", err.Error()) @@ -183,12 +185,12 @@ func (t *AudioTranscodingProcess) Process(data *MediaPacket) ([]*MediaPacket, er DTS: pkt.Dts(), SampleRate: t.encCodecContext.SampleRate(), }) + pkt.Unref() } + pkt.Free() } + frame.Unref() } - select { - case t.ResultChan() <- opusAudio: - default: - } + return opusAudio, nil }