send stream info as meta data

This commit is contained in:
Leandro Moreira
2024-02-24 23:45:04 -03:00
parent a3cec908c8
commit 8dfe08f238
5 changed files with 46 additions and 17 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/asticode/go-astiav"
"github.com/asticode/go-astikit"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/fx"
"go.uber.org/zap"
)
@@ -17,6 +18,7 @@ import (
type LibAVFFmpegStreamer struct {
c *entities.Config
l *zap.SugaredLogger
m *mapper.Mapper
lastAudioFrameDTS float64
currentAudioFrameSize float64
@@ -26,6 +28,7 @@ type LibAVFFmpegStreamerParams struct {
fx.In
C *entities.Config
L *zap.SugaredLogger
M *mapper.Mapper
}
type ResultLibAVFFmpegStreamer struct {
@@ -38,6 +41,7 @@ func NewLibAVFFmpegStreamer(p LibAVFFmpegStreamerParams) ResultLibAVFFmpegStream
LibAVFFmpegStreamer: &LibAVFFmpegStreamer{
c: p.C,
l: p.L,
m: p.M,
},
}
}
@@ -153,13 +157,13 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do
}
inputOptions := c.defineInputOptions(donut, closer)
if err := p.inputFormatContext.OpenInput(donut.StreamURL, inputFormat, inputOptions); err != nil {
return errors.New(fmt.Sprintf("ffmpeg/libav: opening input failed %s", err.Error()))
return fmt.Errorf("ffmpeg/libav: opening input failed %w", err)
}
closer.Add(p.inputFormatContext.CloseInput)
if err := p.inputFormatContext.FindStreamInfo(nil); err != nil {
return errors.New(fmt.Sprintf("ffmpeg/libav: finding stream info failed %s", err.Error()))
return fmt.Errorf("ffmpeg/libav: finding stream info failed %w", err)
}
for _, is := range p.inputFormatContext.Streams() {
@@ -181,7 +185,7 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do
closer.Add(s.decCodecContext.Free)
if err := is.CodecParameters().ToCodecContext(s.decCodecContext); err != nil {
return errors.New(fmt.Sprintf("ffmpeg/libav: updating codec context failed %s", err.Error()))
return fmt.Errorf("ffmpeg/libav: updating codec context failed %w", err)
}
if is.CodecParameters().MediaType() == astiav.MediaTypeVideo {
@@ -189,13 +193,18 @@ func (c *LibAVFFmpegStreamer) prepareInput(p *params, closer *astikit.Closer, do
}
if err := s.decCodecContext.Open(s.decCodec, nil); err != nil {
return errors.New(fmt.Sprintf("ffmpeg/libav: opening codec context failed %s", err.Error()))
return fmt.Errorf("ffmpeg/libav: opening codec context failed %w", err)
}
s.decFrame = astiav.AllocFrame()
closer.Add(s.decFrame.Free)
p.streams[is.Index()] = s
if donut.OnStream != nil {
stream := c.m.FromLibAVStreamToEntityStream(is)
donut.OnStream(&stream)
}
}
return nil
}
@@ -204,7 +213,7 @@ func (c *LibAVFFmpegStreamer) defineInputFormat(streamFormat string) (*astiav.In
if streamFormat != "" {
inputFormat := astiav.FindInputFormat(streamFormat)
if inputFormat == nil {
return nil, errors.New(fmt.Sprintf("ffmpeg/libav: could not find %s input format", streamFormat))
return nil, fmt.Errorf("ffmpeg/libav: could not find %s input format", streamFormat)
}
}
return nil, nil
@@ -236,8 +245,12 @@ func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.
// 1s = dur * (sample/frameSize)
// ref https://developer.apple.com/documentation/coreaudiotypes/audiostreambasicdescription/1423257-mframesperpacket
// TODO: handle wraparound
// TODO: properly handle wraparound / roll over
c.currentAudioFrameSize = float64(pkt.Dts()) - c.lastAudioFrameDTS
if c.currentAudioFrameSize < 0 {
c.currentAudioFrameSize = c.lastAudioFrameDTS*2 - c.lastAudioFrameDTS
}
c.lastAudioFrameDTS = float64(pkt.Dts())
sampleRate := float64(s.inputStream.CodecParameters().SampleRate())
audioDuration = time.Duration((c.currentAudioFrameSize / sampleRate) * float64(time.Second))

View File

@@ -2,6 +2,7 @@ package controllers
import (
"context"
"encoding/json"
"net"
"github.com/flavioribeiro/donut/internal/entities"
@@ -126,6 +127,19 @@ func (c *WebRTCController) SendVideoSample(videoTrack *webrtc.TrackLocalStaticSa
return nil
}
func (c *WebRTCController) SendMetadata(metaTrack *webrtc.DataChannel, st *entities.Stream) error {
msg := c.m.FromStreamToEntityMessage(*st)
msgBytes, err := json.Marshal(msg)
if err != nil {
return err
}
err = metaTrack.SendText(string(msgBytes))
if err != nil {
return err
}
return nil
}
func NewWebRTCSettingsEngine(c *entities.Config, tcpListener net.Listener, udpListener net.PacketConn) webrtc.SettingEngine {
settingEngine := webrtc.SettingEngine{}

View File

@@ -64,6 +64,7 @@ const (
UnknownCodec Codec = "unknownCodec"
H264 Codec = "h264"
H265 Codec = "h265"
VP8 Codec = "vp8"
VP9 Codec = "vp9"
AV1 Codec = "av1"
AAC Codec = "aac"

View File

@@ -114,6 +114,11 @@ func (m *Mapper) FromWebRTCSessionDescriptionToStreamInfo(desc webrtc.SessionDes
Codec: entities.H265,
Type: mediaType,
}
} else if strings.Contains(a.Value, "VP8") {
unique[entities.VP8] = entities.Stream{
Codec: entities.VP8,
Type: mediaType,
}
} else if strings.Contains(a.Value, "VP9") {
unique[entities.VP9] = entities.Stream{
Codec: entities.VP9,
@@ -182,6 +187,10 @@ func (m *Mapper) FromLibAVStreamToEntityStream(libavStream *astiav.Stream) entit
st.Codec = entities.AV1
} else if libavStream.CodecParameters().CodecID().Name() == "aac" {
st.Codec = entities.AAC
} else if libavStream.CodecParameters().CodecID().Name() == "vp8" {
st.Codec = entities.VP8
} else if libavStream.CodecParameters().CodecID().Name() == "vp9" {
st.Codec = entities.VP9
} else if libavStream.CodecParameters().CodecID().Name() == "opus" {
st.Codec = entities.Opus
} else {

View File

@@ -137,7 +137,9 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
h.l.Errorw("error while streaming", "error", err)
},
OnStream: func(st *entities.Stream) {
h.sendStreamInfoToMetadata(st, metadataSender)
if err := h.webRTCController.SendMetadata(metadataSender, st); err != nil {
h.l.Errorw("error while sending metadata", "error", err)
}
},
OnVideoFrame: func(data []byte, c entities.MediaFrameContext) error {
return h.webRTCController.SendVideoSample(videoTrack, data, c)
@@ -159,16 +161,6 @@ func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) err
return nil
}
func (h *SignalingHandler) sendStreamInfoToMetadata(st *entities.Stream, md *webrtc.DataChannel) {
msg := h.mapper.FromStreamToEntityMessage(*st)
msgBytes, err := json.Marshal(msg)
if err != nil {
h.l.Errorw("error marshalling stream message", "error", err)
return
}
md.SendText(string(msgBytes))
}
func (h *SignalingHandler) createAndValidateParams(w http.ResponseWriter, r *http.Request) (entities.RequestParams, error) {
if r.Method != http.MethodPost {
return entities.RequestParams{}, entities.ErrHTTPPostOnly