extract a function for mpeg-ts

This commit is contained in:
Leandro Moreira
2024-02-01 09:15:19 -03:00
parent f83dfa2d62
commit f6f44c9125

View File

@@ -35,22 +35,22 @@ func (c *StreamingController) Stream(sp entities.StreamParameters) {
defer sp.WebRTCConn.Close() defer sp.WebRTCConn.Close()
defer sp.Cancel() defer sp.Cancel()
c.l.Sugar().Infow("start streaming")
// TODO: pick the proper transport? is it possible to get rtp instead? // TODO: pick the proper transport? is it possible to get rtp instead?
go c.readFromSRTIntoWriterPipe(sp.SRTConnection, w) go c.readFromSRTIntoWriterPipe(sp.SRTConnection, w)
// reading from reader pipe into mpeg-ts demuxer // reading from reader pipe to the mpeg-ts demuxer
mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r) mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r)
eia608Reader := NewEIA608Reader() eia608Reader := NewEIA608Reader()
h264PID := uint16(0) h264PID := uint16(0)
c.l.Sugar().Infow("streaming has started")
for { for {
select { select {
case <-sp.Ctx.Done(): case <-sp.Ctx.Done():
c.l.Sugar().Errorw("stream was cancelled") c.l.Sugar().Errorw("streaming has stopped")
return return
default: default:
// fetching mpeg-ts data
// ref https://tsduck.io/download/docs/mpegts-introduction.pdf // ref https://tsduck.io/download/docs/mpegts-introduction.pdf
mpegTSDemuxData, err := mpegTSDemuxer.NextData() mpegTSDemuxData, err := mpegTSDemuxer.NextData()
if err != nil { if err != nil {
@@ -61,42 +61,47 @@ func (c *StreamingController) Stream(sp entities.StreamParameters) {
} }
if mpegTSDemuxData.PMT != nil { if mpegTSDemuxData.PMT != nil {
// writing mpeg-ts meida metadata to the metadata webrtc channel // writing mpeg-ts media codec info to the metadata webrtc channel
h264PID = c.captureMediaInfoAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack, h264PID) h264PID = c.captureMediaInfoAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack, h264PID)
c.captureBitrateAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack) c.captureBitrateAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack)
} }
if mpegTSDemuxData.PID == h264PID && mpegTSDemuxData.PES != nil { // writing mpeg-ts video/captions to webrtc channels
// writing video from mpeg-ts into webrtc err = c.writeMpegtsToWebRTC(mpegTSDemuxData, h264PID, err, sp, eia608Reader)
if err = sp.VideoTrack.WriteSample(media.Sample{Data: mpegTSDemuxData.PES.Data, Duration: time.Second / 30}); err != nil { if err != nil {
c.l.Sugar().Errorw("failed to write a sample mpeg-ts to web rtc", c.l.Sugar().Errorw("failed to write an mpeg-ts to web rtc",
"error", err, "error", err,
) )
return return
}
captions, err := eia608Reader.Parse(mpegTSDemuxData.PES)
if err != nil {
c.l.Sugar().Errorw("failed to parse eia 608",
"error", err,
)
return
}
if captions != "" {
captionsMsg, err := BuildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions)
if err != nil {
c.l.Sugar().Errorw("failed to build captions message",
"error", err,
)
return
}
// writing metadata to the metadata webrtc channel
sp.MetadataTrack.SendText(captionsMsg)
}
} }
} }
} }
} }
func (c *StreamingController) writeMpegtsToWebRTC(mpegTSDemuxData *astits.DemuxerData, h264PID uint16, err error, sp entities.StreamParameters, eia608Reader *EIA608Reader) error {
if mpegTSDemuxData.PID == h264PID && mpegTSDemuxData.PES != nil {
if err = sp.VideoTrack.WriteSample(media.Sample{Data: mpegTSDemuxData.PES.Data, Duration: time.Second / 30}); err != nil {
return err
}
captions, err := eia608Reader.Parse(mpegTSDemuxData.PES)
if err != nil {
return err
}
if captions != "" {
captionsMsg, err := BuildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions)
if err != nil {
return err
}
sp.MetadataTrack.SendText(captionsMsg)
}
}
return nil
}
func (*StreamingController) captureBitrateAndSendToWebRTC(d *astits.DemuxerData, metadataTrack *webrtc.DataChannel) { func (*StreamingController) captureBitrateAndSendToWebRTC(d *astits.DemuxerData, metadataTrack *webrtc.DataChannel) {
for _, d := range d.PMT.ProgramDescriptors { for _, d := range d.PMT.ProgramDescriptors {
if d.MaximumBitrate != nil { if d.MaximumBitrate != nil {
@@ -134,14 +139,14 @@ func (c *StreamingController) readFromSRTIntoWriterPipe(srtConnection *astisrt.C
for { for {
n, err := srtConnection.Read(inboundMpegTsPacket) n, err := srtConnection.Read(inboundMpegTsPacket)
if err != nil { if err != nil {
c.l.Sugar().Errorw("str conn failed to read mpeg ts", c.l.Sugar().Errorw("str conn failed to write data to buffer",
"error", err, "error", err,
) )
break break
} }
if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil { if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil {
c.l.Sugar().Errorw("failed to write mpeg ts in the pipe", c.l.Sugar().Errorw("failed to write mpeg-ts into the pipe",
"error", err, "error", err,
) )
break break