ffmpeg: Add transmuxing mode for mp4 outputs

Adds a dedicated `transmuxing` mode in the transcode pipeline - useful to convert a bunch of mpegts segments from multiple sessions of a stream, to a single `.mp4` file.
This commit is contained in:
Ivan Tivonenko
2021-05-18 22:07:48 +03:00
committed by GitHub
parent 6e5c36facc
commit f7f2491160
9 changed files with 288 additions and 30 deletions

2
.gitignore vendored
View File

@@ -22,3 +22,5 @@ _testmain.go
*.exe
*.test
*.prof
/.vscode

View File

@@ -52,7 +52,7 @@ int process_in(struct input_ctx *ictx, AVFrame *frame, AVPacket *pkt)
ist = ictx->ic->streams[pkt->stream_index];
if (ist->index == ictx->vi && ictx->vc) decoder = ictx->vc;
else if (ist->index == ictx->ai && ictx->ac) decoder = ictx->ac;
else if (pkt->stream_index == ictx->vi || pkt->stream_index == ictx->ai) break;
else if (pkt->stream_index == ictx->vi || pkt->stream_index == ictx->ai || ictx->transmuxing) break;
else goto drop_packet; // could be an extra stream; skip
if (!ictx->first_pkt && pkt->flags & AV_PKT_FLAG_KEY && decoder == ictx->vc) {
@@ -271,20 +271,24 @@ int open_input(input_params *params, struct input_ctx *ctx)
char *inp = params->fname;
int ret = 0;
ctx->transmuxing = params->transmuxe;
// open demuxer
ret = avformat_open_input(&ic, inp, NULL, NULL);
if (ret < 0) LPMS_ERR(open_input_err, "demuxer: Unable to open input");
ctx->ic = ic;
ret = avformat_find_stream_info(ic, NULL);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to find input info");
ret = open_video_decoder(params, ctx);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to open video decoder")
ret = open_audio_decoder(params, ctx);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to open audio decoder")
ctx->last_frame_v = av_frame_alloc();
if (!ctx->last_frame_v) LPMS_ERR(open_input_err, "Unable to alloc last_frame_v");
ctx->last_frame_a = av_frame_alloc();
if (!ctx->last_frame_a) LPMS_ERR(open_input_err, "Unable to alloc last_frame_a");
if (params->transmuxe == 0) {
ret = open_video_decoder(params, ctx);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to open video decoder")
ret = open_audio_decoder(params, ctx);
if (ret < 0) LPMS_ERR(open_input_err, "Unable to open audio decoder")
ctx->last_frame_v = av_frame_alloc();
if (!ctx->last_frame_v) LPMS_ERR(open_input_err, "Unable to alloc last_frame_v");
ctx->last_frame_a = av_frame_alloc();
if (!ctx->last_frame_a) LPMS_ERR(open_input_err, "Unable to alloc last_frame_a");
}
return 0;

View File

@@ -33,6 +33,19 @@ struct input_ctx {
// Filter flush
AVFrame *last_frame_v, *last_frame_a;
// transmuxing specific fields:
// last non-zero duration
int64_t last_duration;
//
int64_t last_dts;
//
int64_t dts_diff;
//
int discontinuity;
// Transmuxing mode. Close output in lpms_transcode_stop instead of
// at the end of lpms_transcode call.
int transmuxing;
};
// Exported methods

View File

@@ -152,6 +152,37 @@ void free_output(struct output_ctx *octx)
free_filter(&octx->af);
}
int open_remux_output(struct input_ctx *ictx, struct output_ctx *octx)
{
int ret = 0;
octx->oc->flags |= AVFMT_FLAG_FLUSH_PACKETS;
octx->oc->flush_packets = 1;
for (int i = 0; i < ictx->ic->nb_streams; i++) {
ret = 0;
AVStream *st = avformat_new_stream(octx->oc, NULL);
if (!st) LPMS_ERR(open_output_err, "Unable to alloc stream");
if (octx->fps.den)
st->avg_frame_rate = octx->fps;
else
st->avg_frame_rate = ictx->ic->streams[i]->r_frame_rate;
AVStream *ist = ictx->ic->streams[i];
st->time_base = ist->time_base;
ret = avcodec_parameters_copy(st->codecpar, ist->codecpar);
if (ret < 0)
LPMS_ERR(open_output_err, "Error copying params from input stream");
// Sometimes the codec tag is wonky for some reason, so correct it
ret = av_codec_get_tag2(octx->oc->oformat->codec_tag,
st->codecpar->codec_id, &st->codecpar->codec_tag);
avformat_transfer_internal_stream_timing_info(octx->oc->oformat, st, ist,
AVFMT_TBCF_DEMUXER);
}
return 0;
open_output_err:
return ret;
}
int open_output(struct output_ctx *octx, struct input_ctx *ictx)
{
int ret = 0, inp_has_stream;
@@ -202,15 +233,22 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)
octx->hw_type = ictx->hw_type;
}
// add video stream if input contains video
inp_has_stream = ictx->vi >= 0;
if (inp_has_stream && !octx->dv) {
ret = add_video_stream(octx, ictx);
if (ret < 0) LPMS_ERR(open_output_err, "Error adding video stream");
}
if (!ictx->transmuxing) {
// add video stream if input contains video
inp_has_stream = ictx->vi >= 0;
if (inp_has_stream && !octx->dv) {
ret = add_video_stream(octx, ictx);
if (ret < 0) LPMS_ERR(open_output_err, "Error adding video stream");
}
ret = open_audio_output(ictx, octx, fmt);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening audio output");
ret = open_audio_output(ictx, octx, fmt);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening audio output");
} else {
ret = open_remux_output(ictx, octx);
if (ret < 0) {
goto open_output_err;
}
}
if (!(fmt->flags & AVFMT_NOFILE)) {
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);

View File

@@ -3,13 +3,14 @@ package ffmpeg
import (
"errors"
"fmt"
"github.com/golang/glog"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"unsafe"
"github.com/golang/glog"
)
// #cgo pkg-config: libavformat libavfilter libavcodec libavutil libswscale gnutls
@@ -49,9 +50,10 @@ type Transcoder struct {
}
type TranscodeOptionsIn struct {
Fname string
Accel Acceleration
Device string
Fname string
Accel Acceleration
Device string
Transmuxing bool
}
type TranscodeOptions struct {
@@ -87,7 +89,7 @@ func RTMPToHLS(localRTMPUrl string, outM3U8 string, tmpl string, seglen_secs str
C.free(unsafe.Pointer(ts_tmpl))
C.free(unsafe.Pointer(seglen))
C.free(unsafe.Pointer(segstart))
if 0 != ret {
if ret != 0 {
glog.Infof("RTMP2HLS Transmux Return : %v\n", Strerror(ret))
return ErrorMap[ret]
}
@@ -190,6 +192,9 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
}
fname := C.CString(input.Fname)
defer C.free(unsafe.Pointer(fname))
if input.Transmuxing {
t.started = true
}
if !t.started {
ret := int(C.lpms_is_bypass_needed(fname))
if ret != 1 {
@@ -208,14 +213,14 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
param := p.Profile
w, h, err := VideoProfileResolution(param)
if err != nil {
if "drop" != p.VideoEncoder.Name && "copy" != p.VideoEncoder.Name {
if p.VideoEncoder.Name != "drop" && p.VideoEncoder.Name != "copy" {
return nil, err
}
}
br := strings.Replace(param.Bitrate, "k", "000", 1)
bitrate, err := strconv.Atoi(br)
if err != nil {
if "drop" != p.VideoEncoder.Name && "copy" != p.VideoEncoder.Name {
if p.VideoEncoder.Name != "drop" && p.VideoEncoder.Name != "copy" {
return nil, err
}
}
@@ -351,6 +356,9 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
}
inp := &C.input_params{fname: fname, hw_type: hw_type, device: device,
handle: t.handle}
if input.Transmuxing {
inp.transmuxe = 1
}
results := make([]C.output_results, len(ps))
decoded := &C.output_results{}
var (
@@ -362,7 +370,7 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
resultsPointer = (*C.output_results)(&results[0])
}
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
if 0 != ret {
if ret != 0 {
glog.Error("Transcoder Return : ", ErrorMap[ret])
return nil, ErrorMap[ret]
}
@@ -380,6 +388,12 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
return &TranscodeResults{Encoded: tr, Decoded: dec}, nil
}
func (t *Transcoder) Discontinuity() {
t.mu.Lock()
defer t.mu.Unlock()
C.lpms_transcode_discontinuity(t.handle)
}
func NewTranscoder() *Transcoder {
return &Transcoder{
handle: C.lpms_transcode_new(),

6
ffmpeg/ffmpeg_darwin.go Normal file
View File

@@ -0,0 +1,6 @@
// +build darwin
package ffmpeg
// #cgo LDFLAGS: -framework Foundation -framework Security
import "C"

View File

@@ -116,8 +116,8 @@ int transcode(struct transcode_thread *h,
output_results *results, output_results *decoded_results)
{
int ret = 0, i = 0;
int reopen_decoders = 1;
struct input_ctx *ictx = &h->ictx;
int reopen_decoders = !ictx->transmuxing;
struct output_ctx *outputs = h->outputs;
int nb_outputs = h->nb_outputs;
AVPacket ipkt = {0};
@@ -168,15 +168,24 @@ int transcode(struct transcode_thread *h,
// first segment of a stream, need to initalize output HW context
// XXX valgrind this line up
if (!h->initialized || AV_HWDEVICE_TYPE_NONE == octx->hw_type) {
// when transmuxing we're opening output with first segment, but closing it
// only when lpms_transcode_stop called, so we don't want to re-open it
// on subsequent segments
if (!h->initialized || (AV_HWDEVICE_TYPE_NONE == octx->hw_type && !ictx->transmuxing)) {
ret = open_output(octx, ictx);
if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open output");
if (ictx->transmuxing) {
octx->oc->flags |= AVFMT_FLAG_FLUSH_PACKETS;
octx->oc->flush_packets = 1;
}
continue;
}
// non-first segment of a HW session
ret = reopen_output(octx, ictx);
if (!ictx->transmuxing) {
// non-first segment of a HW session
ret = reopen_output(octx, ictx);
if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to re-open output for HW session");
}
}
av_init_packet(&ipkt);
@@ -223,6 +232,28 @@ int transcode(struct transcode_thread *h,
av_frame_unref(last_frame);
av_frame_ref(last_frame, dframe);
}
if (ictx->transmuxing) {
// decoded_results->frames++;
ist = ictx->ic->streams[ipkt.stream_index];
if (AVMEDIA_TYPE_VIDEO == ist->codecpar->codec_type) {
decoded_results->frames++;
}
if (ictx->discontinuity) {
// calc dts diff
ictx->dts_diff = ictx->last_dts + ictx->last_duration - ipkt.dts;
ictx->discontinuity = 0;
}
ipkt.pts += ictx->dts_diff;
ipkt.dts += ictx->dts_diff;
if (ipkt.stream_index == 0) {
ictx->last_dts = ipkt.dts;
if (ipkt.duration) {
ictx->last_duration = ipkt.duration;
}
}
}
// ENCODING & MUXING OF ALL OUTPUT RENDITIONS
for (i = 0; i < nb_outputs; i++) {
@@ -232,7 +263,9 @@ int transcode(struct transcode_thread *h,
AVCodecContext *encoder = NULL;
ret = 0; // reset to avoid any carry-through
if (ist->index == ictx->vi) {
if (ictx->transmuxing)
ost = octx->oc->streams[ipkt.stream_index];
else if (ist->index == ictx->vi) {
if (octx->dv) continue; // drop video stream for this output
ost = octx->oc->streams[0];
if (ictx->vc) {
@@ -270,6 +303,17 @@ whileloop_end:
av_packet_unref(&ipkt);
}
if (ictx->transmuxing) {
for (i = 0; i < nb_outputs; i++) {
av_interleaved_write_frame(outputs[i].oc, NULL); // flush muxer
}
if (ictx->ic) {
avformat_close_input(&ictx->ic);
ictx->ic = NULL;
}
return 0;
}
// flush outputs
for (i = 0; i < nb_outputs; i++) {
ret = flush_outputs(ictx, &outputs[i]);
@@ -356,8 +400,17 @@ void lpms_transcode_stop(struct transcode_thread *handle) {
free_input(&handle->ictx);
for (i = 0; i < MAX_OUTPUT_SIZE; i++) {
if (handle->ictx.transmuxing && handle->outputs[i].oc) {
av_write_trailer(handle->outputs[i].oc);
}
free_output(&handle->outputs[i]);
}
free(handle);
}
void lpms_transcode_discontinuity(struct transcode_thread *handle) {
if (!handle)
return;
handle->ictx.discontinuity = 1;
}

View File

@@ -46,6 +46,8 @@ typedef struct {
// Optional hardware acceleration
enum AVHWDeviceType hw_type;
char *device;
int transmuxe;
} input_params;
typedef struct {
@@ -69,5 +71,6 @@ void lpms_init(enum LPMSLogLevel max_level);
int lpms_transcode(input_params *inp, output_params *params, output_results *results, int nb_outputs, output_results *decoded_results);
struct transcode_thread* lpms_transcode_new();
void lpms_transcode_stop(struct transcode_thread* handle);
void lpms_transcode_discontinuity(struct transcode_thread *handle);
#endif // _LPMS_TRANSCODER_H_

125
ffmpeg/transmuxer_test.go Normal file
View File

@@ -0,0 +1,125 @@
package ffmpeg
import (
"fmt"
"os"
"testing"
)
func TestTransmuxer_Join(t *testing.T) {
run, dir := setupTest(t)
defer os.RemoveAll(dir)
cmd := `
# run segmenter and sanity check frame counts . Hardcode for now.
ffmpeg -loglevel warning -i "$1"/../transcoder/test.ts -c:a copy -c:v copy -f hls test.m3u8
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test0.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test1.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test2.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test3.ts | grep nb_read_frames=120
`
run(cmd)
tc := NewTranscoder()
out := []TranscodeOptions{
{
Oname: fmt.Sprintf("%s/out.mp4", dir),
VideoEncoder: ComponentOptions{
Name: "copy",
},
AudioEncoder: ComponentOptions{
Name: "copy",
},
Profile: VideoProfile{Format: FormatNone},
Muxer: ComponentOptions{
Name: "mp4",
Opts: map[string]string{"movflags": "frag_keyframe+negative_cts_offsets+omit_tfhd_offset+disable_chpl+default_base_moof"},
},
},
}
for i := 0; i < 4; i++ {
in := &TranscodeOptionsIn{
Fname: fmt.Sprintf("%s/test%d.ts", dir, i),
Transmuxing: true,
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Fatal(err)
}
if res.Decoded.Frames != 120 {
t.Error(in.Fname, " Mismatched frame count: expected 120 got ", res.Decoded.Frames)
}
}
tc.StopTranscoder()
cmd = `
ffprobe -loglevel warning -select_streams v -count_frames -show_streams out.mp4 | grep nb_read_frames=480
`
run(cmd)
}
func TestTransmuxer_Discontinuity(t *testing.T) {
run, dir := setupTest(t)
defer os.RemoveAll(dir)
cmd := `
# run segmenter and sanity check frame counts . Hardcode for now.
ffmpeg -loglevel warning -i "$1"/../transcoder/test.ts -c:a copy -c:v copy -f hls test.m3u8
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test0.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test1.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test2.ts | grep nb_read_frames=120
ffprobe -loglevel warning -select_streams v -count_frames -show_streams test3.ts | grep nb_read_frames=120
`
run(cmd)
tc := NewTranscoder()
out := []TranscodeOptions{
{
Oname: fmt.Sprintf("%s/out.mp4", dir),
VideoEncoder: ComponentOptions{
Name: "copy",
},
AudioEncoder: ComponentOptions{
Name: "copy",
},
Profile: VideoProfile{Format: FormatNone},
Muxer: ComponentOptions{
Name: "mp4",
Opts: map[string]string{"movflags": "frag_keyframe+negative_cts_offsets+omit_tfhd_offset+disable_chpl+default_base_moof"},
},
},
}
for i := 0; i < 4; i++ {
in := &TranscodeOptionsIn{
Fname: fmt.Sprintf("%s/test%d.ts", dir, i),
Transmuxing: true,
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Fatal(err)
}
if res.Decoded.Frames != 120 {
t.Error(in.Fname, " Mismatched frame count: expected 120 got ", res.Decoded.Frames)
}
}
tc.Discontinuity()
for i := 0; i < 4; i++ {
in := &TranscodeOptionsIn{
Fname: fmt.Sprintf("%s/test%d.ts", dir, i),
Transmuxing: true,
}
res, err := tc.Transcode(in, out)
if err != nil {
t.Fatal(err)
}
if res.Decoded.Frames != 120 {
t.Error(in.Fname, " Mismatched frame count: expected 120 got ", res.Decoded.Frames)
}
}
tc.StopTranscoder()
cmd = `
ffprobe -loglevel warning -select_streams v -count_frames -show_streams out.mp4 | grep nb_read_frames=960
ffprobe -loglevel warning -select_streams v -count_frames -show_streams -show_frames out.mp4 | grep pkt_pts=1444380
`
run(cmd)
}