Merge pull request #31 from flavioribeiro/leandromoreira/add-audio

Add audio, RTMP, FFmpeg/LibAV and more
This commit is contained in:
Leandro Moreira
2024-06-04 22:08:47 -03:00
committed by GitHub
46 changed files with 2714 additions and 591 deletions

19
.dockerignore Normal file
View File

@@ -0,0 +1,19 @@
# The .dockerignore file excludes files from the container build process.
#
# https://docs.docker.com/engine/reference/builder/#dockerignore-file
# Exclude locally vendored dependencies.
vendor/
# Exclude "build-time" ignore files.
.dockerignore
.gcloudignore
.github
.vscode
.git
# Exclude git history and configuration.
.gitignore
tags
probers.test
coverage.out

29
.gitattributes vendored Normal file
View File

@@ -0,0 +1,29 @@
# Auto detect text files and perform LF normalization
* text=auto
# Collapse generated and vendored files on GitHub
AUTHORS linguist-generated merge=union
*.gen.* linguist-generated merge=ours
*.pb.go linguist-generated merge=ours
*.pb.gw.go linguist-generated merge=ours
go.sum linguist-generated merge=ours
go.mod linguist-generated
gen.sum linguist-generated merge=ours
vendor/* linguist-vendored
rules.mk linguist-vendored
*/vendor/* linguist-vendored
# doc
doc/* linguist-documentation
doc/Makefile linguist-documentation=false
# Reduce conflicts on markdown files
*.md merge=union
# A set of files you probably don't want in distribution
/.github export-ignore
/.githooks export-ignore
.gitattributes export-ignore
.gitignore export-ignore
.gitmodules export-ignore
/tool/lint export-ignore

Binary file not shown.

Before

Width:  |  Height:  |  Size: 33 KiB

After

Width:  |  Height:  |  Size: 41 KiB

5
.gitignore vendored
View File

@@ -8,6 +8,7 @@ donut
# Test binary, built with `go test -c`
*.test
coverage.out
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
@@ -16,6 +17,8 @@ donut
# vendor/
tags
tmp/
.DS_Store
.vscode
.vscode

17
DOCKER_DEVELOPMENT.md Normal file
View File

@@ -0,0 +1,17 @@
# Developing using docker
To run/test/develop using docker, make sure you have `docker` installed. Run the following command to start simulations of SRT and RTMP streamings, and **a bash session where you can run commands** such as you'd in your local machine.
```bash
make run-docker-dev
```
While you're inside the container, you can start/stop/restart the donut server.
```bash
make run-server-inside-docker
```
You can access [http://localhost:8080/demo/](http://localhost:8080/demo/), using preferable **the Chrome browser**. You can connect to the simulated SRT and see donut working in practice.
You can work and change files locally, on your OS, and restart `CTRL+C + make run-server-inside-docker` the donut server in the container. It's fast since it avoids rebuilding all images. It'll offer a faster feedback cycle while developing.

View File

@@ -1,8 +1,15 @@
FROM golang:1.19
FROM jrottenberg/ffmpeg:5.1.4-ubuntu2204 AS base
FROM golang:1.22
# TODO: copy only required files
COPY --from=base / /
# ffmpeg/libav libraries
ENV LD_LIBRARY_PATH="/usr/local/lib:/usr/lib:/usr/lib/x86_64-linux-gnu/"
ENV CGO_CFLAGS="-I/usr/local/include/"
ENV CGO_LDFLAGS="-L/usr/local/lib"
ENV WD=/usr/src/app
ENV SRT_VERSION="v1.5.3"
ENV SRT_FOLDER="/opt/srt_lib"
WORKDIR ${WD}
RUN apt-get clean && apt-get update && \
@@ -10,22 +17,7 @@ RUN apt-get clean && apt-get update && \
tclsh pkg-config cmake libssl-dev build-essential git \
&& apt-get clean
RUN \
mkdir -p "${SRT_FOLDER}" && \
git clone --depth 1 --branch "${SRT_VERSION}" https://github.com/Haivision/srt && \
cd srt && \
./configure --prefix=${SRT_FOLDER} $(configure) && \
make && \
make install
# To find where the srt.h and libsrt.so were you can
# find / -name srt.h
# find / -name libsrt.so
# inside the container docker run -it --rm -t <TAG_YOU_BUILT> bash
ENV GOPROXY=direct
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:${SRT_FOLDER}/lib/"
ENV CGO_CFLAGS="-I${SRT_FOLDER}/include/"
ENV CGO_LDFLAGS="-L${SRT_FOLDER}/lib/"
COPY . ./donut
WORKDIR ${WD}/donut

View File

@@ -1,30 +1,18 @@
FROM golang:1.19
FROM jrottenberg/ffmpeg:5.1.4-ubuntu2204 AS base
FROM golang:1.22
ENV WD=/usr/src/app
ENV SRT_VERSION="v1.5.3"
ENV SRT_FOLDER="/opt/srt_lib"
WORKDIR ${WD}
# TODO: copy only required files
COPY --from=base / /
# ffmpeg/libav libraries
ENV LD_LIBRARY_PATH="/usr/local/lib:/usr/lib:/usr/lib/x86_64-linux-gnu/"
ENV CGO_CFLAGS="-I/usr/local/include/"
ENV CGO_LDFLAGS="-L/usr/local/lib"
RUN apt-get clean && apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -y \
tclsh pkg-config cmake libssl-dev build-essential git \
&& apt-get clean
RUN \
mkdir -p "${SRT_FOLDER}" && \
git clone --depth 1 --branch "${SRT_VERSION}" https://github.com/Haivision/srt && \
cd srt && \
./configure --prefix=${SRT_FOLDER} $(configure) && \
make && \
make install
# To find where the srt.h and libsrt.so were you can
# find / -name srt.h
# find / -name libsrt.so
# inside the container docker run -it --rm -t <TAG_YOU_BUILT> bash
ENV GOPROXY=direct
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:${SRT_FOLDER}/lib/"
ENV CGO_CFLAGS="-I${SRT_FOLDER}/include/"
ENV CGO_LDFLAGS="-L${SRT_FOLDER}/lib/"
RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.55.2
ENV WD=/usr/src/app
WORKDIR ${WD}

43
FAQ.md
View File

@@ -1,4 +1,4 @@
# FAQ
# FAQ & Dev Troubleshooting
## I can't connect two tabs or browser at the same for the SRT
@@ -48,11 +48,38 @@ clang: error: linker command failed with exit code 1 (use -v to see invocation)
You can try to use the [docker-compose](/README.md#run-using-docker-compose), but if you want to run it locally you must provide path to the linker.
```bash
# Find where the headers and libraries files are located at. If you can't find, install them with brew or apt-get.
# Feel free to replace the path after the find to roo[/] or any other suspicious place.
sudo find /opt/homebrew/ -name srt.h
sudo find /opt/ -name libsrt.a # libsrt.so for linux
# Add the required flags so the compiler/linker can find the needed files.
CGO_LDFLAGS="-L/opt/homebrew/Cellar/srt/1.5.3/lib -lsrt" CGO_CFLAGS="-I/opt/homebrew//Cellar/srt/1.5.3/include/" go run main.go helpers.go
# For MacOS
CGO_LDFLAGS="-L$(brew --prefix srt)/lib -lsrt" CGO_CFLAGS="-I$(brew --prefix srt)/include/" go run main.go
```
## If you're seeing the error "At least one invalid signature was encountered ... GPG error: http://security." when running the app
If you see the error "At least one invalid signature was encountered." when running `make run` Or "failed to copy files: userspace copy failed: write":
```
3.723 W: GPG error: http://deb.debian.org/debian bookworm InRelease: At least one invalid signature was encountered.
3.723 E: The repository 'http://deb.debian.org/debian bookworm InRelease' is not signed.
3.723 W: GPG error: http://deb.debian.org/debian bookworm-updates InRelease: At least one invalid signature was encountered.
3.723 E: The repository 'http://deb.debian.org/debian bookworm-updates InRelease' is not signed.
3.723 W: GPG error: http://deb.debian.org/debian-security bookworm-security InRelease: At least one invalid signature was encountered.
3.723 E: The repository 'http://deb.debian.org/debian-security bookworm-security InRelease' is not signed.
3.723 W: An error occurred during the signature verification. The repository is not updated and the previous index files will be used. GPG error: http://archive.ubuntu.com/ubuntu focal InRelease: At least one invalid signature was encountered.
3.723 W: An error occurred during the signature verification. The repository is not updated and the previous index files will be used. GPG error: http://security.ubuntu.com/ubuntu focal-security InRelease: At least one invalid signature was encountered.
```
```
=> CANCELED [test stage-1 6/6] RUN curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.55.2 4.4s
=> ERROR [app stage-1 6/8] COPY . ./donut 4.1s
------
> [app stage-1 6/8] COPY . ./donut:
------
failed to solve: failed to copy files: userspace copy failed: write /var/lib/docker/overlay2/30zm6uywrtfed4z4wfzbf1ema/merged/usr/src/app/donut/tmp/n5.1.2/src/tests/reference.pnm: no space left on device
make: *** [run] Error 17
```
Please try to run:
```
# PLEASE be aware that the following command will erase all your docker images, containers, volumes, etc.
make clean-docker
```

View File

@@ -1,34 +1,97 @@
# Data flow diagram
# INTRODUCTION
```golang
// It builds an engine based on user inputs
// {url: url, id: id, sdp: webRTCOffer}
donutEngine := donut.EngineFor(reqParams)
// It fetches the server-side (streaming server) stream info (codec, ...)
serverStreamInfo := donutEngine.ServerIngredients(reqParams)
// It gets the client side (browser) media support (codec, ...)
clientStreamInfo := donutEngine.ClientIngredients(reqParams)
// Given the client's restrictions and the server's availability, it builds the right recipe.
donutRecipe := donutEngine.RecipeFor(reqParams, serverStreamInfo, clientStreamInfo)
// It streams the media from the backend server to the client while there's data.
go donutEngine.Serve(DonutParameters{
Recipe: donutRecipe,
OnVideoFrame: func(data []byte, c MediaFrameContext) error {
return SendMediaSample(VIDEO_TYPE, data, c)
},
OnAudioFrame: func(data []byte, c MediaFrameContext) error {
return SendMediaSample(AUDIO_TYPE, data, c)
},
})
```
# DATA FLOW DIAGRAM
```mermaid
sequenceDiagram
actor User
box Navy Browser
box Navy
participant browser
end
browser->>+server: GET /
server->>+browser: 200 /index.html
User->>+browser: feed SRT host, port, and id
User->>+browser: click on [connect]
Note over server,browser: WebRTC connection setup
browser->>+browser: create offer
browser--)browser: WebRTC.ontrack(video)
browser->>+server: POST /doSignaling {offer}
server->>+server: set remote {offer}
server->>+browser: reply {answer}
browser->>+browser: set remote {answer}
Note over server,browser: WebRTC connection setup
User->>+browser: input protocol, host, port, id, and opts
User->>+browser: click on [Connect]
Note over donut,browser: WebRTC connection setup
browser->>+browser: create WebRTC browserOffer
browser->>+donut: POST /doSignaling {browserOffer}
loop Async SRT to WebRTC
server--)SRT: mpegFrom(SRT)
server--)browser: WebRTC.WriteSample(mpegts.PES.Data)
donut->>+browser: reply WebRTC {serverOffer}
Note over donut,browser: WebRTC connection setup
loop Async streaming
donut--)streaming server: fetchMedia
donut--)donut: ffmpeg::libav demux/transcode
donut--)browser: sendWebRTCMedia
browser--)browser: render audio/video frames
User--)browser: watch media
end
browser--)User: render frames
```
# CORE COMPONENTS
```mermaid
classDiagram
class Signaling{
+ServeHTTP()
}
class WebRTC{
+Setup()
+CreatePeerConnection()
+CreateTrack()
+CreateDataChannel()
+SendMediaSample(track)
+SendMetadata(track)
}
class DonutEngine{
+EngineFor(params)
+ServerIngredients()
+ClientIngredients()
+RecipeFor(server, client)
+Serve(donutParams)
+Appetizer()
}
class Prober {
+StreamInfo(appetizer)
+Match(params)
}
class Streamer {
+Stream(donutParams)
+Match(params)
}
DonutEngine *-- Signaling
WebRTC *-- Signaling
Prober *-- DonutEngine
Streamer *-- DonutEngine
```

50
MAC_DEVELOPMENT.md Normal file
View File

@@ -0,0 +1,50 @@
# Running on MacOS
To run Donut locally using MacOS, make sure you have `ffmpeg@5` installed:
```bash
brew install ffmpeg@5
```
You can have multiple versions of ffmpeg installed in your mac. To find where the specific `ffmpeg@5`` was installed, run:
```bash
sudo find /opt/homebrew -name avcodec.h
```
Let's assume the prior command showed two entries:
```bash
sudo find /opt/homebrew -name avcodec.h
/opt/homebrew/Cellar/ffmpeg/7.0_1/include/libavcodec/avcodec.h
/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/include/libavcodec/avcodec.h
```
You must configure the CGO library path pointing it to ffmpeg 5 (`5.1.4_6`) folder not the newest (`7.0_1`).
```bash
export CGO_LDFLAGS="-L/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/lib/"
export CGO_CFLAGS="-I/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/include/"
export PKG_CONFIG_PATH="/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/lib/pkgconfig"
```
After you set the proper cgo paths, you can run it locally:
```bash
go run main.go -- --enable-ice-mux=true
go test -v ./...
```
# Simulating SRT and RTMP live streaming
You can use docker to simulate `SRT` and `RTMP` streaming:
```bash
# docker compose stop && docker compose down && docker compose up nginx_rtmp haivision_srt
make run-srt-rtmp-streaming-alone
```
They're both now exposed `RTMP/1935` and `SRT/40052` in your `localhost`. You can use VLC to test both streams:
* vlc rtmp://localhost/live/app
* vlc srt://localhost:40052

View File

@@ -1,10 +1,23 @@
run:
docker-compose stop && docker-compose down && docker-compose build && docker-compose up
docker compose stop && docker compose up app
test:
docker compose stop test && docker compose down test && docker compose run --rm test
run-dev:
docker compose stop && docker compose down && docker compose build app && docker compose up app
run-dev-total-rebuild:
docker compose stop && docker compose down && docker compose build && docker compose up app
clean-docker:
docker-compose down -v --rmi all --remove-orphans && docker volume prune -a -f && docker system prune -a -f && docker builder prune -a -f
run-docker-dev:
docker compose run --rm --service-ports dev
run-server-inside-docker:
go run main.go -- --enable-ice-mux=true
run-srt-rtmp-streaming-alone:
docker compose stop && docker compose down && docker compose up nginx_rtmp haivision_srt
lint:
docker compose stop lint && docker compose down lint && docker compose run --rm lint
.PHONY: run

View File

@@ -1,28 +1,72 @@
<img src="https://user-images.githubusercontent.com/244265/200068510-7c24d5c7-6ba0-44ee-8e60-0f157f990b90.png" width="350" />
donut is a zero setup required SRT+MPEG-TS -> WebRTC Bridge powered by [Pion](http://pion.ly/).
**donut** is a zero setup required [SRT](https://en.wikipedia.org/wiki/Secure_Reliable_Transport) (_MPEG-TS_) and [RTMP](https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol) to [WebRTC](https://webrtc.org/) bridge powered by [Pion](http://pion.ly/).
### Install & Run Locally
# HOW IT WORKS
Make sure you have the `libsrt` installed in your system. If not, follow their [build instructions](https://github.com/Haivision/srt#build-instructions).
Once you finish installing it, execute:
```mermaid
sequenceDiagram
actor User
```
$ go install github.com/flavioribeiro/donut@latest
```
Once installed, execute `donut`. This will be in your `$GOPATH/bin`. The default will be `~/go/bin/donut`
box Navy
participant browser
end
### Run using docker-compose
User->>+browser: input protocol, host, port, id, and opts
User->>+browser: click on [Connect]
Note over donut,browser: WebRTC connection setup
browser->>+browser: create WebRTC browserOffer
browser->>+donut: POST /doSignaling {browserOffer}
Alternatively, you can use `docker-compose` to simulate an SRT live transmission and run the donut effortless.
donut->>+browser: reply WebRTC {serverOffer}
```
$ make run
Note over donut,browser: WebRTC connection setup
loop Async streaming
donut--)streaming server: fetchMedia
donut--)donut: ffmpeg::libav demux/transcode
donut--)browser: sendWebRTCMedia
browser--)browser: render audio/video frames
User--)browser: watch media
end
```
#### Open the Web UI
Open [http://localhost:8080](http://localhost:8080). You will see three text boxes. Fill in with the SRT listener configuration and hit connect.
![donut docker-compose setup](/.github/docker-compose-donut-setup.webp "donut docker-compose setup")
ref: [how donut works](/HOW_IT_WORKS.md)
# QUICK START
Make sure you have the `ffmpeg 5.x.x`. You must configure the CGO library path pointing it to **ffmpeg 5**.
```bash
export CGO_LDFLAGS="-L/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/lib/"
export CGO_CFLAGS="-I/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/include/"
export PKG_CONFIG_PATH="/opt/homebrew/Cellar/ffmpeg@5/5.1.4_6/lib/pkgconfig"
```
Now you can install and run it:
```bash
go install github.com/flavioribeiro/donut@latest
donut
```
Here are specific instructions [to run on MacOS](/MAC_DEVELOPMENT.md).
# RUN USING DOCKER-COMPOSE
Alternatively, you can use `docker-compose` to simulate an [SRT live transmission and run the donut effortless](/DOCKER_DEVELOPMENT.md).
```bash
make run
```
## OPEN THE WEB UI
Open [http://localhost:8080/demo](http://localhost:8080/demo). You will see two text fields. Fill them with the your streaming info and hit connect.
![donut docker-compose setup](/.github/docker-compose-donut-setup.webp "donut docker-compose setup")

View File

@@ -1,9 +1,58 @@
# Adding audio
# Add tests/CI/linter
* add ffmpeg test utility
* add run test locally
* add run linter on docker
* tag integration tests // +build integration || go test -tags integration -v ./...
# Adding audio (WIP)
```golang
// potential api for stream libav
go donutEngine.Stream(
StreamParamter{
OnVideoData: func(d data[]) error {
},
OnAudioData: func(d data[], stream) error {
channel.setData(d,stream.duration)
},
OnError: func(d data[]) error {
},
Observe: func(st stream) error {
metadata.send
},
}
)
```
ref https://wiki.xiph.org/Opus_Recommended_Settings 48000 webrtc
ref https://ffmpeg.org/ffmpeg-codecs.html#libopus-1 opus
## Date: 2/4/24
### Summary: Adding audio track
* add support to detect server side streams information
* add support to intercept stream as it goes (middleware: useful for gathering data such as media info, bitrate, eia608, etc)
* TODO: test push directly vp8 and ogg through rtc (is it possible through SRT?)
* TODO: test push directly h264 and aac through
* TODO: test transcode server side stream (h264 and aac) to client side stream support (vp8/vp9/ogg) through libav/ffmpeg.
// selects proper media that client and server has adverted.
// donutEngine preferable vp8, ogg???
// From: [] To: [] or Transcode:[], Bypass: []
// libav_streamer.go, libav_streamer_format.go, libav_streamer_codec.go...
// reads from Server (input) and generates h264 raw, and ogg and send it with timing attributes
refs:
* binding go https://github.com/asticode/go-astiav
* transcoding https://github.com/asticode/go-astiav/blob/master/examples/transcoding/main.go
* using buffer streaming https://github.com/bubbajoe/go-astiav-contr/blob/misc-update/examples/gocv/main.go#L167
* (working) go webrtc same stream https://github.com/pion/webrtc/blob/v3.2.24/examples/play-from-disk/main.go#L88C39-L88C64
* (working) https://jsfiddle.net/8kup9mvn/
* (two tracks - js side) https://www.youtube.com/watch?v=8I2axE6j204
* webrtc discussion https://github.com/pion/webrtc/discussions/1955
* go webrtc example https://github.com/pion/webrtc/blob/master/examples/play-from-disk-renegotiation/main.go
* webrtc discussion https://stackoverflow.com/questions/66243915/how-to-get-multiple-streams-from-webrtc-peerconnection
@@ -14,6 +63,10 @@ refs:
* example https://blog.mi.hdm-stuttgart.de/index.php/2018/03/21/livestreaming-with-libav-tutorial-part-2/
* libav doc https://ffmpeg.org/doxygen/trunk/index.html
* generic av format https://github.com/rvs/ffmpeg/blob/master/libavformat/output-example.c
* mpegts example https://github.com/wakabayashik/mpegts-to-webrtc/blob/main/main.go
* network use https://github.com/asticode/go-astiav/issues/7
* srt live https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md
# Moving player to static

View File

@@ -1,10 +1,9 @@
version: '2.1'
services:
app:
build:
context: .
working_dir: "/app"
platform: "linux/amd64"
volumes:
- "./:/app/"
ports:
@@ -12,26 +11,57 @@ services:
- "8081:8081"
- "8081:8081/udp"
- "6060:6060"
depends_on:
- haivision_srt
- nginx_rtmp
links:
- haivision_srt
- nginx_rtmp
test:
dev:
build:
context: .
dockerfile: Dockerfile-dev
working_dir: "/app"
platform: "linux/amd64"
volumes:
- "./:/app/"
command: "go test ./..."
command: "bash"
ports:
- "8080:8080"
- "8081:8081"
- "8081:8081/udp"
- "6060:6060"
depends_on:
- haivision_srt
- nginx_rtmp
links:
- haivision_srt
- nginx_rtmp
lint:
build:
context: .
dockerfile: Dockerfile-dev
working_dir: "/app"
nginx_rtmp:
image: alfg/nginx-rtmp
ports:
- "1935:1935"
volumes:
- "./:/app/"
command: "golangci-lint run -v"
- ./nginx.conf:/etc/nginx/nginx.conf.template
depends_on:
- origin_rtmp
links:
- origin_rtmp
srt:
origin_rtmp: # simulating an RTMP flv (h264/aac) live transmission
image: jrottenberg/ffmpeg:4.4-alpine
entrypoint: sh
command: "/scripts/ffmpeg_rtmp.sh"
volumes:
- "./scripts:/scripts"
- "./fonts/0xProto:/usr/share/fonts"
environment:
- RTMP_HOST=nginx_rtmp
- RTMP_PORT=1935
haivision_srt:
build:
context: .
dockerfile: Dockerfile-srt-live
@@ -47,22 +77,38 @@ services:
ports:
- "40052:40052/udp"
depends_on:
- app
- origin_srt
links:
- app
- origin_srt
origin: # simulating an mpeg-ts upd origin live transmission
origin_srt: # simulating an (h264/aac) mpeg-ts upd origin live transmission
image: jrottenberg/ffmpeg:4.4-alpine
entrypoint: sh
command: "/scripts/ffmpeg_mpegts_udp.sh"
volumes:
- "./scripts:/scripts"
- "./fonts/0xProto:/usr/share/fonts"
environment:
- SRT_INPUT_HOST=srt
- SRT_INPUT_HOST=haivision_srt
- SRT_INPUT_PORT=1234
- PKT_SIZE=1316
depends_on:
- srt
links:
- srt
test:
build:
context: .
dockerfile: Dockerfile-dev
working_dir: "/app"
platform: "linux/amd64"
volumes:
- "./:/app/"
command: "go test -v ./..."
lint:
build:
context: .
dockerfile: Dockerfile-dev
working_dir: "/app"
platform: "linux/amd64"
volumes:
- "./:/app/"
command: "golangci-lint run -v"

Binary file not shown.

Binary file not shown.

Binary file not shown.

92
fonts/0xProto/LICENSE Normal file
View File

@@ -0,0 +1,92 @@
Copyright (c) 2023, 0xType Project Authors (https://github.com/0xType)
This Font Software is licensed under the SIL Open Font License, Version 1.1.
This license is copied below, and is also available with a FAQ at:
http://scripts.sil.org/OFL
-----------------------------------------------------------
SIL OPEN FONT LICENSE Version 1.1 - 26 February 2007
-----------------------------------------------------------
PREAMBLE
The goals of the Open Font License (OFL) are to stimulate worldwide
development of collaborative font projects, to support the font creation
efforts of academic and linguistic communities, and to provide a free and
open framework in which fonts may be shared and improved in partnership
with others.
The OFL allows the licensed fonts to be used, studied, modified and
redistributed freely as long as they are not sold by themselves. The
fonts, including any derivative works, can be bundled, embedded,
redistributed and/or sold with any software provided that any reserved
names are not used by derivative works. The fonts and derivatives,
however, cannot be released under any other type of license. The
requirement for fonts to remain under this license does not apply
to any document created using the fonts or their derivatives.
DEFINITIONS
"Font Software" refers to the set of files released by the Copyright
Holder(s) under this license and clearly marked as such. This may
include source files, build scripts and documentation.
"Reserved Font Name" refers to any names specified as such after the
copyright statement(s).
"Original Version" refers to the collection of Font Software components as
distributed by the Copyright Holder(s).
"Modified Version" refers to any derivative made by adding to, deleting,
or substituting -- in part or in whole -- any of the components of the
Original Version, by changing formats or by porting the Font Software to a
new environment.
"Author" refers to any designer, engineer, programmer, technical
writer or other person who contributed to the Font Software.
PERMISSION & CONDITIONS
Permission is hereby granted, free of charge, to any person obtaining
a copy of the Font Software, to use, study, copy, merge, embed, modify,
redistribute, and sell modified and unmodified copies of the Font
Software, subject to the following conditions:
1) Neither the Font Software nor any of its individual components,
in Original or Modified Versions, may be sold by itself.
2) Original or Modified Versions of the Font Software may be bundled,
redistributed and/or sold with any software, provided that each copy
contains the above copyright notice and this license. These can be
included either as stand-alone text files, human-readable headers or
in the appropriate machine-readable metadata fields within text or
binary files as long as those fields can be easily viewed by the user.
3) No Modified Version of the Font Software may use the Reserved Font
Name(s) unless explicit written permission is granted by the corresponding
Copyright Holder. This restriction only applies to the primary font name as
presented to the users.
4) The name(s) of the Copyright Holder(s) or the Author(s) of the Font
Software shall not be used to promote, endorse or advertise any
Modified Version, except to acknowledge the contribution(s) of the
Copyright Holder(s) and the Author(s) or with their explicit written
permission.
5) The Font Software, modified or unmodified, in part or in whole,
must be distributed entirely under this license, and must not be
distributed under any other license. The requirement for fonts to
remain under this license does not apply to any document created
using the Font Software.
TERMINATION
This license becomes null and void if any of the above conditions are
not met.
DISCLAIMER
THE FONT SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO ANY WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT
OF COPYRIGHT, PATENT, TRADEMARK, OR OTHER RIGHT. IN NO EVENT SHALL THE
COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
INCLUDING ANY GENERAL, SPECIAL, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL
DAMAGES, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF THE USE OR INABILITY TO USE THE FONT SOFTWARE OR FROM
OTHER DEALINGS IN THE FONT SOFTWARE.

48
fonts/0xProto/README.md Normal file
View File

@@ -0,0 +1,48 @@
# Nerd Fonts
This is an archived font from the Nerd Fonts release v3.2.1.
For more information see:
* https://github.com/ryanoasis/nerd-fonts/
* https://github.com/ryanoasis/nerd-fonts/releases/latest/
# 0xProto
An opinionated font for software engineers.
For more information have a look at the upstream website: https://github.com/0xType/0xProto
Version: 1.603
## Which font?
### TL;DR
* Pick your font family:
* If you are limited to monospaced fonts (because of your terminal, etc) then pick a font with `Nerd Font Mono` (or `NFM`).
* If you want to have bigger icons (usually around 1.5 normal letters wide) pick a font without `Mono` i.e. `Nerd Font` (or `NF`). Most terminals support this, but ymmv.
* If you work in a proportional context (GUI elements or edit a presentation etc) pick a font with `Nerd Font Propo` (or `NFP`).
### Ligatures
Ligatures are generally preserved in the patched fonts.
Nerd Fonts `v2.0.0` had no ligatures in the `Nerd Font Mono` fonts, this has been dropped with `v2.1.0`.
If you have a ligature-aware terminal and don't want ligatures you can (usually) disable them in the terminal settings.
### Explanation
Once you narrow down your font choice of family (`Droid Sans`, `Inconsolata`, etc) and style (`bold`, `italic`, etc) you have 2 main choices:
#### `Option 1: Download already patched font`
* For a stable version download a font package from the [release page](https://github.com/ryanoasis/nerd-fonts/releases)
* Direct links for [0xProto.zip](https://github.com/ryanoasis/nerd-fonts/releases/latest/download/0xProto.zip) or [0xProto.tar.xz](https://github.com/ryanoasis/nerd-fonts/releases/latest/download/0xProto.tar.xz)
#### `Option 2: Patch your own font`
* Patch your own variations with the various options provided by the font patcher (i.e. not include all symbols for smaller font size)
For more information see: [The FAQ](https://github.com/ryanoasis/nerd-fonts/wiki/FAQ-and-Troubleshooting#which-font)
[SIL-RFN]:http://scripts.sil.org/cms/scripts/page.php?item_id=OFL_web_fonts_and_RFNs#14cbfd4a

11
go.mod generated
View File

@@ -3,16 +3,18 @@ module github.com/flavioribeiro/donut
go 1.19
require (
github.com/asticode/go-astisrt v0.3.0
github.com/asticode/go-astits v1.11.0
github.com/asticode/go-astiav v0.14.2-0.20240514161420-d8844951c978
github.com/asticode/go-astikit v0.42.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/pion/webrtc/v3 v3.1.47
github.com/stretchr/testify v1.8.0
github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3
go.uber.org/fx v1.20.1
go.uber.org/zap v1.23.0
)
require (
github.com/asticode/go-astikit v0.36.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pion/datachannel v1.5.2 // indirect
github.com/pion/dtls/v2 v2.1.5 // indirect
@@ -30,11 +32,12 @@ require (
github.com/pion/transport v0.13.1 // indirect
github.com/pion/turn/v2 v2.0.8 // indirect
github.com/pion/udp v0.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.2.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

19
go.sum generated
View File

@@ -1,10 +1,7 @@
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astikit v0.36.0 h1:WHSY88YT76D/XRbdp0lMLwfjyUGw8dygnbKKtbGNIG8=
github.com/asticode/go-astikit v0.36.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astisrt v0.3.0 h1:LpvqOc17qfMr2suLZPzMs9wYLozxXYu/PE9CA1tH88c=
github.com/asticode/go-astisrt v0.3.0/go.mod h1:tP5Dx+MXyaICUeF0gz4nwyav3RDI609e0en3QQkrxKE=
github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng=
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/asticode/go-astiav v0.14.2-0.20240514161420-d8844951c978 h1:+xACJz51oNEvxrhrHsvGNn16n/vuLmjtvp93LS6onTQ=
github.com/asticode/go-astiav v0.14.2-0.20240514161420-d8844951c978/go.mod h1:K7D8UC6GeQt85FUxk2KVwYxHnotrxuEnp5evkkudc2s=
github.com/asticode/go-astikit v0.42.0 h1:pnir/2KLUSr0527Tv908iAH6EGYYrYta132vvjXsH5w=
github.com/asticode/go-astikit v0.42.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -30,8 +27,10 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
@@ -84,18 +83,18 @@ github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M
github.com/pion/webrtc/v3 v3.1.47 h1:2dFEKRI1rzFvehXDq43hK9OGGyTGJSusUi3j6QKHC5s=
github.com/pion/webrtc/v3 v3.1.47/go.mod h1:8U39MYZCLVV4sIBn01htASVNkWQN2zDa/rx5xisEXWs=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3 h1:j8SVIV6YZreqjOPGjxM48tB4XgS8oUZdgy0cyN7YrBg=
github.com/szatmary/gocaption v0.0.0-20220607192049-fdd59655f0c3/go.mod h1:l9r7RYKHGLuHbXpKJhJgASvi8xT+Uqxnz9B26uVU73c=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -179,6 +178,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
@@ -188,3 +188,4 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,168 @@
package engine
import (
"fmt"
"strings"
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/controllers/streamers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/fx"
)
type DonutEngine interface {
Appetizer() (entities.DonutAppetizer, error)
ServerIngredients() (*entities.StreamInfo, error)
ClientIngredients() (*entities.StreamInfo, error)
RecipeFor(server, client *entities.StreamInfo) (*entities.DonutRecipe, error)
Serve(p *entities.DonutParameters)
}
type DonutEngineParams struct {
fx.In
Streamers []streamers.DonutStreamer `group:"streamers"`
Probers []probers.DonutProber `group:"probers"`
Mapper *mapper.Mapper
}
type DonutEngineController struct {
p DonutEngineParams
}
func NewDonutEngineController(p DonutEngineParams) *DonutEngineController {
return &DonutEngineController{p}
}
func (c *DonutEngineController) EngineFor(req *entities.RequestParams) (DonutEngine, error) {
prober := c.selectProberFor(req)
if prober == nil {
return nil, fmt.Errorf("request %v: not fulfilled. error %w", req, entities.ErrMissingProber)
}
streamer := c.selectStreamerFor(req)
if streamer == nil {
return nil, fmt.Errorf("request %v: not fulfilled. error %w", req, entities.ErrMissingStreamer)
}
return &donutEngine{
prober: prober,
streamer: streamer,
mapper: c.p.Mapper,
req: req,
}, nil
}
// TODO: try to use generics
func (c *DonutEngineController) selectProberFor(req *entities.RequestParams) probers.DonutProber {
for _, p := range c.p.Probers {
if p.Match(req) {
return p
}
}
return nil
}
// TODO: try to use generics
func (c *DonutEngineController) selectStreamerFor(req *entities.RequestParams) streamers.DonutStreamer {
for _, p := range c.p.Streamers {
if p.Match(req) {
return p
}
}
return nil
}
type donutEngine struct {
prober probers.DonutProber
streamer streamers.DonutStreamer
mapper *mapper.Mapper
req *entities.RequestParams
}
func (d *donutEngine) ServerIngredients() (*entities.StreamInfo, error) {
appetizer, err := d.Appetizer()
if err != nil {
return nil, err
}
return d.prober.StreamInfo(appetizer)
}
func (d *donutEngine) ClientIngredients() (*entities.StreamInfo, error) {
return d.mapper.FromWebRTCSessionDescriptionToStreamInfo(d.req.Offer)
}
func (d *donutEngine) Serve(p *entities.DonutParameters) {
d.streamer.Stream(p)
}
func (d *donutEngine) RecipeFor(server, client *entities.StreamInfo) (*entities.DonutRecipe, error) {
// TODO: implement proper matching
//
// suggestions:
// if client.medias.contains(server.media)
// bypass, server.media
// else
// preferable = [vp8, opus]
// if union(preferable, client.medias)
// transcode, preferable
appetizer, err := d.Appetizer()
if err != nil {
return nil, err
}
r := &entities.DonutRecipe{
Input: appetizer,
Video: entities.DonutMediaTask{
// Action: entities.DonutTranscode,
Action: entities.DonutBypass,
Codec: entities.H264,
DonutBitStreamFilter: &entities.DonutH264AnnexB,
// CodecContextOptions: []entities.LibAVOptionsCodecContext{
// entities.SetBitRate(100_000),
// entities.SetBaselineProfile(),
// entities.SetGopSize(30),
// },
},
Audio: entities.DonutMediaTask{
Action: entities.DonutTranscode,
Codec: entities.Opus,
DonutStreamFilter: entities.AudioResamplerFilter(48000),
CodecContextOptions: []entities.LibAVOptionsCodecContext{
entities.SetSampleRate(48000),
entities.SetSampleFormat("fltp"),
},
},
}
return r, nil
}
func (d *donutEngine) Appetizer() (entities.DonutAppetizer, error) {
isRTMP := strings.Contains(strings.ToLower(d.req.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(d.req.StreamURL), "srt")
if isRTMP {
return entities.DonutAppetizer{
URL: fmt.Sprintf("%s/%s", d.req.StreamURL, d.req.StreamID),
Options: map[entities.DonutInputOptionKey]string{
entities.DonutRTMPLive: "live",
},
Format: "flv",
}, nil
}
if isSRT {
return entities.DonutAppetizer{
URL: d.req.StreamURL,
Format: "mpegts", // TODO: check how to get format for srt
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: d.req.StreamID,
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}, nil
}
return entities.DonutAppetizer{}, entities.ErrUnsupportedStreamURL
}

View File

@@ -0,0 +1,8 @@
package probers
import "github.com/flavioribeiro/donut/internal/entities"
type DonutProber interface {
StreamInfo(req entities.DonutAppetizer) (*entities.StreamInfo, error)
Match(req *entities.RequestParams) bool
}

View File

@@ -0,0 +1,112 @@
package probers
import (
"fmt"
"strings"
"github.com/asticode/go-astiav"
"github.com/asticode/go-astikit"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/fx"
"go.uber.org/zap"
)
type LibAVFFmpeg struct {
c *entities.Config
l *zap.SugaredLogger
m *mapper.Mapper
}
type ResultLibAVFFmpeg struct {
fx.Out
LibAVFFmpegProber DonutProber `group:"probers"`
}
// NewLibAVFFmpeg creates a new LibAVFFmpeg DonutProber
func NewLibAVFFmpeg(
c *entities.Config,
l *zap.SugaredLogger,
m *mapper.Mapper,
) ResultLibAVFFmpeg {
return ResultLibAVFFmpeg{
LibAVFFmpegProber: &LibAVFFmpeg{
c: c,
l: l,
m: m,
},
}
}
// Match returns true when the request is for an LibAVFFmpeg prober
func (c *LibAVFFmpeg) Match(req *entities.RequestParams) bool {
isRTMP := strings.Contains(strings.ToLower(req.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(req.StreamURL), "srt")
return isRTMP || isSRT
}
// StreamInfo connects to the SRT stream to discovery media properties.
func (c *LibAVFFmpeg) StreamInfo(req entities.DonutAppetizer) (*entities.StreamInfo, error) {
closer := astikit.NewCloser()
defer closer.Close()
var inputFormatContext *astiav.FormatContext
if inputFormatContext = astiav.AllocFormatContext(); inputFormatContext == nil {
return nil, entities.ErrFFmpegLibAVFormatContextIsNil
}
closer.Add(inputFormatContext.Free)
inputFormat, err := c.defineInputFormat(req.Format.String())
if err != nil {
return nil, err
}
inputOptions := c.defineInputOptions(req.Options, closer)
if err := inputFormatContext.OpenInput(req.URL, inputFormat, inputOptions); err != nil {
return nil, fmt.Errorf("error while inputFormatContext.OpenInput: (%s, %#v, %#v) %w", req.URL, inputFormat, inputOptions, err)
}
closer.Add(inputFormatContext.CloseInput)
if err := inputFormatContext.FindStreamInfo(nil); err != nil {
return nil, fmt.Errorf("error while inputFormatContext.FindStreamInfo %w", err)
}
streams := []entities.Stream{}
for _, is := range inputFormatContext.Streams() {
if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
c.l.Info("skipping media type", is.CodecParameters().MediaType())
continue
}
streams = append(streams, c.m.FromLibAVStreamToEntityStream(is))
}
si := entities.StreamInfo{Streams: streams}
return &si, nil
}
// TODO: merge common behavior (streamer / prober)
func (c *LibAVFFmpeg) defineInputFormat(streamFormat string) (*astiav.InputFormat, error) {
var inputFormat *astiav.InputFormat
if streamFormat != "" {
inputFormat = astiav.FindInputFormat(streamFormat)
if inputFormat == nil {
return nil, fmt.Errorf("ffmpeg/libav: could not find %s input format", streamFormat)
}
}
return inputFormat, nil
}
func (c *LibAVFFmpeg) defineInputOptions(opts map[entities.DonutInputOptionKey]string, closer *astikit.Closer) *astiav.Dictionary {
var dic *astiav.Dictionary
if len(opts) > 0 {
dic = &astiav.Dictionary{}
closer.Add(dic.Free)
for k, v := range opts {
dic.Set(k.String(), v, 0)
}
}
return dic
}

View File

@@ -0,0 +1,97 @@
package probers_test
import (
"testing"
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/teststreaming"
"github.com/flavioribeiro/donut/internal/web"
"github.com/stretchr/testify/assert"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
)
var p []probers.DonutProber
func selectProberFor(t *testing.T, req *entities.RequestParams) probers.DonutProber {
if p == nil {
fxtest.New(t,
web.Dependencies(false),
fx.Populate(
fx.Annotate(
&p,
fx.ParamTags(`group:"probers"`),
),
),
)
}
for _, c := range p {
if c.Match(req) {
return c
}
}
return nil
}
func TestSrtMpegTs_StreamInfo_264(t *testing.T) {
t.Parallel()
ffmpeg := teststreaming.FFMPEG_LIVE_SRT_MPEG_TS_H264_AAC
defer ffmpeg.Stop()
ffmpeg.Start()
req := &entities.RequestParams{
StreamURL: ffmpeg.Output().StreamURL,
StreamID: ffmpeg.Output().StreamID,
}
input := entities.DonutAppetizer{
URL: ffmpeg.Output().StreamURL,
Format: "mpegts",
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: ffmpeg.Output().StreamID,
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}
prober := selectProberFor(t, req)
streamInfo, err := prober.StreamInfo(input)
assert.Nil(t, err)
assert.NotNil(t, streamInfo)
assert.ElementsMatch(t, ffmpeg.ExpectedStreams(), streamInfo.Streams)
}
func TestSrtMpegTs_StreamInfo_265(t *testing.T) {
t.Parallel()
ffmpeg := teststreaming.FFMPEG_LIVE_SRT_MPEG_TS_H265_AAC
defer ffmpeg.Stop()
ffmpeg.Start()
req := &entities.RequestParams{
StreamURL: ffmpeg.Output().StreamURL,
StreamID: ffmpeg.Output().StreamID,
}
input := entities.DonutAppetizer{
URL: ffmpeg.Output().StreamURL,
Format: "mpegts",
Options: map[entities.DonutInputOptionKey]string{
entities.DonutSRTStreamID: ffmpeg.Output().StreamID,
entities.DonutSRTTranstype: "live",
entities.DonutSRTsmoother: "live",
},
}
prober := selectProberFor(t, req)
streamInfo, err := prober.StreamInfo(input)
assert.Nil(t, err)
assert.NotNil(t, streamInfo)
assert.ElementsMatch(t, ffmpeg.ExpectedStreams(), streamInfo.Streams)
}

View File

@@ -1,91 +0,0 @@
package controllers
import (
"context"
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/flavioribeiro/donut/internal/entities"
"go.uber.org/fx"
"go.uber.org/zap"
)
type SRTController struct {
c *entities.Config
l *zap.SugaredLogger
}
func NewSRTController(c *entities.Config, l *zap.SugaredLogger, lc fx.Lifecycle) (*SRTController, error) {
// Handle logs
astisrt.SetLogLevel(astisrt.LogLevel(astisrt.LogLevelNotice))
astisrt.SetLogHandler(func(ll astisrt.LogLevel, file, area, msg string, line int) {
l.Infow("SRT",
"ll", ll,
"msg", msg,
)
})
// Startup srt
if err := astisrt.Startup(); err != nil {
l.Errorw("failed to start up srt",
"error", err,
)
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
// Clean up
if err := astisrt.CleanUp(); err != nil {
l.Errorw("failed to clean up srt",
"error", err,
)
return err
}
return nil
},
})
return &SRTController{
c: c,
l: l,
}, nil
}
func (c *SRTController) Connect(cancel context.CancelFunc, params entities.RequestParams) (*astisrt.Connection, error) {
c.l.Infow("trying to connect srt")
if err := params.Valid(); err != nil {
return nil, err
}
c.l.Infow("Connecting to SRT ",
"offer", params.String(),
)
conn, err := astisrt.Dial(astisrt.DialOptions{
ConnectionOptions: []astisrt.ConnectionOption{
astisrt.WithLatency(c.c.SRTConnectionLatencyMS),
astisrt.WithStreamid(params.SRTStreamID),
astisrt.WithCongestion("live"),
astisrt.WithTranstype(astisrt.Transtype(astisrt.TranstypeLive)),
},
OnDisconnect: func(conn *astisrt.Connection, err error) {
c.l.Infow("Canceling SRT",
"error", err,
)
cancel()
},
Host: params.SRTHost,
Port: params.SRTPort,
})
if err != nil {
c.l.Errorw("failed to connect srt",
"error", err,
)
return nil, err
}
c.l.Infow("Connected to SRT")
return conn, nil
}

View File

@@ -0,0 +1,8 @@
package streamers
import "github.com/flavioribeiro/donut/internal/entities"
type DonutStreamer interface {
Stream(p *entities.DonutParameters)
Match(req *entities.RequestParams) bool
}

View File

@@ -0,0 +1,764 @@
package streamers
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/asticode/go-astiav"
"github.com/asticode/go-astikit"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/fx"
"go.uber.org/zap"
)
type LibAVFFmpegStreamer struct {
c *entities.Config
l *zap.SugaredLogger
m *mapper.Mapper
lastAudioFrameDTS float64
currentAudioFrameSize float64
}
type LibAVFFmpegStreamerParams struct {
fx.In
C *entities.Config
L *zap.SugaredLogger
M *mapper.Mapper
}
type ResultLibAVFFmpegStreamer struct {
fx.Out
LibAVFFmpegStreamer DonutStreamer `group:"streamers"`
}
func NewLibAVFFmpegStreamer(p LibAVFFmpegStreamerParams) ResultLibAVFFmpegStreamer {
return ResultLibAVFFmpegStreamer{
LibAVFFmpegStreamer: &LibAVFFmpegStreamer{
c: p.C,
l: p.L,
m: p.M,
},
}
}
func (c *LibAVFFmpegStreamer) Match(req *entities.RequestParams) bool {
isRTMP := strings.Contains(strings.ToLower(req.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(req.StreamURL), "srt")
return isRTMP || isSRT
}
type streamContext struct {
// IN
inputStream *astiav.Stream
decCodec *astiav.Codec
decCodecContext *astiav.CodecContext
decFrame *astiav.Frame
// FILTER
filterGraph *astiav.FilterGraph
buffersinkContext *astiav.FilterContext
buffersrcContext *astiav.FilterContext
filterFrame *astiav.Frame
// OUT
encCodec *astiav.Codec
encCodecContext *astiav.CodecContext
encPkt *astiav.Packet
// Bit stream filter
bsfContext *astiav.BitStreamFilterContext
bsfPacket *astiav.Packet
}
type libAVParams struct {
inputFormatContext *astiav.FormatContext
streams map[int]*streamContext
}
func (c *LibAVFFmpegStreamer) Stream(donut *entities.DonutParameters) {
c.l.Infof("streaming has started for %#v", donut)
closer := astikit.NewCloser()
defer closer.Close()
p := &libAVParams{
streams: make(map[int]*streamContext),
}
// it's useful for debugging
// astiav.SetLogLevel(astiav.LogLevelDebug)
astiav.SetLogLevel(astiav.LogLevelInfo)
astiav.SetLogCallback(func(_ astiav.Classer, l astiav.LogLevel, fmt, msg string) {
c.l.Infof("ffmpeg %s: - %s", c.libAVLogToString(l), strings.TrimSpace(msg))
})
c.l.Infof("preparing input")
if err := c.prepareInput(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
c.l.Infof("preparing output")
if err := c.prepareOutput(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
c.l.Infof("preparing filters")
if err := c.prepareFilters(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
c.l.Infof("preparing bit stream filters")
if err := c.prepareBitStreamFilters(p, closer, donut); err != nil {
c.onError(err, donut)
return
}
inPkt := astiav.AllocPacket()
closer.Add(inPkt.Free)
for {
select {
case <-donut.Ctx.Done():
if errors.Is(donut.Ctx.Err(), context.Canceled) {
c.l.Info("streaming has stopped due cancellation")
return
}
c.onError(donut.Ctx.Err(), donut)
return
default:
if err := p.inputFormatContext.ReadFrame(inPkt); err != nil {
if errors.Is(err, astiav.ErrEof) {
c.l.Info("streaming has ended")
return
}
c.onError(err, donut)
}
s, ok := p.streams[inPkt.StreamIndex()]
if !ok {
c.l.Warnf("skipping to process stream id=%d", inPkt.StreamIndex())
continue
}
if s.bsfContext != nil {
if err := c.applyBitStreamFilter(p, inPkt, s, donut); err != nil {
c.onError(err, donut)
return
}
} else {
if err := c.processPacket(p, inPkt, s, donut); err != nil {
c.onError(err, donut)
return
}
}
inPkt.Unref()
}
}
}
func (c *LibAVFFmpegStreamer) onError(err error, p *entities.DonutParameters) {
if p.OnError != nil {
p.OnError(err)
}
}
func (c *LibAVFFmpegStreamer) prepareInput(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error {
if p.inputFormatContext = astiav.AllocFormatContext(); p.inputFormatContext == nil {
return errors.New("ffmpeg/libav: input format context is nil")
}
closer.Add(p.inputFormatContext.Free)
inputFormat, err := c.defineInputFormat(donut.Recipe.Input.Format.String())
if err != nil {
return err
}
inputOptions := c.defineInputOptions(donut, closer)
if err := p.inputFormatContext.OpenInput(donut.Recipe.Input.URL, inputFormat, inputOptions); err != nil {
return fmt.Errorf("ffmpeg/libav: opening input failed %w", err)
}
closer.Add(p.inputFormatContext.CloseInput)
if err := p.inputFormatContext.FindStreamInfo(nil); err != nil {
return fmt.Errorf("ffmpeg/libav: finding stream info failed %w", err)
}
for _, is := range p.inputFormatContext.Streams() {
if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
c.l.Infof("skipping media type %s", is.CodecParameters().MediaType().String())
continue
}
s := &streamContext{inputStream: is}
if s.decCodec = astiav.FindDecoder(is.CodecParameters().CodecID()); s.decCodec == nil {
return errors.New("ffmpeg/libav: codec is missing")
}
if s.decCodecContext = astiav.AllocCodecContext(s.decCodec); s.decCodecContext == nil {
return errors.New("ffmpeg/libav: codec context is nil")
}
closer.Add(s.decCodecContext.Free)
if err := is.CodecParameters().ToCodecContext(s.decCodecContext); err != nil {
return fmt.Errorf("ffmpeg/libav: updating codec context failed %w", err)
}
//FFMPEG_NEW
s.decCodecContext.SetTimeBase(s.inputStream.TimeBase())
if is.CodecParameters().MediaType() == astiav.MediaTypeVideo {
s.decCodecContext.SetFramerate(p.inputFormatContext.GuessFrameRate(is, nil))
}
if err := s.decCodecContext.Open(s.decCodec, nil); err != nil {
return fmt.Errorf("ffmpeg/libav: opening codec context failed %w", err)
}
s.decFrame = astiav.AllocFrame()
closer.Add(s.decFrame.Free)
p.streams[is.Index()] = s
if donut.OnStream != nil {
stream := c.m.FromLibAVStreamToEntityStream(is)
err := donut.OnStream(&stream)
if err != nil {
return err
}
}
}
return nil
}
func (c *LibAVFFmpegStreamer) prepareOutput(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error {
for _, is := range p.inputFormatContext.Streams() {
s, ok := p.streams[is.Index()]
if !ok {
c.l.Infof("skipping absent stream index = %d", is.Index())
continue
}
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass
if isVideo && isVideoBypass {
c.l.Infof("bypass video for %+v", s.inputStream)
continue
}
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass
if isAudio && isAudioBypass {
c.l.Infof("bypass audio for %+v", s.inputStream)
continue
}
var codecID astiav.CodecID
if isAudio {
audioCodecID, err := c.m.FromStreamCodecToLibAVCodecID(donut.Recipe.Audio.Codec)
if err != nil {
return err
}
codecID = audioCodecID
}
if isVideo {
videoCodecID, err := c.m.FromStreamCodecToLibAVCodecID(donut.Recipe.Video.Codec)
if err != nil {
return err
}
codecID = videoCodecID
}
if s.encCodec = astiav.FindEncoder(codecID); s.encCodec == nil {
// TODO: migrate error to entity
return fmt.Errorf("cannot find a libav encoder for %+v", codecID)
}
if s.encCodecContext = astiav.AllocCodecContext(s.encCodec); s.encCodecContext == nil {
return errors.New("ffmpeg/libav: codec context is nil")
}
closer.Add(s.encCodecContext.Free)
if isAudio {
if v := s.encCodec.ChannelLayouts(); len(v) > 0 {
s.encCodecContext.SetChannelLayout(v[0])
} else {
s.encCodecContext.SetChannelLayout(s.decCodecContext.ChannelLayout())
}
s.encCodecContext.SetChannels(s.decCodecContext.Channels())
s.encCodecContext.SetSampleRate(s.decCodecContext.SampleRate())
if v := s.encCodec.SampleFormats(); len(v) > 0 {
s.encCodecContext.SetSampleFormat(v[0])
} else {
s.encCodecContext.SetSampleFormat(s.decCodecContext.SampleFormat())
}
s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase())
// overriding with user provide config
if len(donut.Recipe.Audio.CodecContextOptions) > 0 {
for _, opt := range donut.Recipe.Audio.CodecContextOptions {
opt(s.encCodecContext)
}
}
}
if isVideo {
if v := s.encCodec.PixelFormats(); len(v) > 0 {
s.encCodecContext.SetPixelFormat(v[0])
} else {
s.encCodecContext.SetPixelFormat(s.decCodecContext.PixelFormat())
}
s.encCodecContext.SetSampleAspectRatio(s.decCodecContext.SampleAspectRatio())
s.encCodecContext.SetTimeBase(s.decCodecContext.TimeBase())
s.encCodecContext.SetHeight(s.decCodecContext.Height())
s.encCodecContext.SetWidth(s.decCodecContext.Width())
// s.encCodecContext.SetFramerate(s.inputStream.AvgFrameRate())
// overriding with user provide config
if len(donut.Recipe.Video.CodecContextOptions) > 0 {
for _, opt := range donut.Recipe.Video.CodecContextOptions {
opt(s.encCodecContext)
}
}
}
if s.decCodecContext.Flags().Has(astiav.CodecContextFlagGlobalHeader) {
s.encCodecContext.SetFlags(s.encCodecContext.Flags().Add(astiav.CodecContextFlagGlobalHeader))
}
if err := s.encCodecContext.Open(s.encCodec, nil); err != nil {
return fmt.Errorf("opening encoder context failed: %w", err)
}
}
return nil
}
func (c *LibAVFFmpegStreamer) prepareFilters(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error {
for _, s := range p.streams {
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isVideoBypass := donut.Recipe.Video.Action == entities.DonutBypass
if isVideo && isVideoBypass {
c.l.Infof("bypass video for %+v", s.inputStream)
continue
}
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
isAudioBypass := donut.Recipe.Audio.Action == entities.DonutBypass
if isAudio && isAudioBypass {
c.l.Infof("bypass audio for %+v", s.inputStream)
continue
}
var args astiav.FilterArgs
var buffersrc, buffersink *astiav.Filter
var content string
var err error
if s.filterGraph = astiav.AllocFilterGraph(); s.filterGraph == nil {
return errors.New("main: graph is nil")
}
closer.Add(s.filterGraph.Free)
outputs := astiav.AllocFilterInOut()
if outputs == nil {
return errors.New("main: outputs is nil")
}
closer.Add(outputs.Free)
inputs := astiav.AllocFilterInOut()
if inputs == nil {
return errors.New("main: inputs is nil")
}
closer.Add(inputs.Free)
if isAudio {
args = astiav.FilterArgs{
"channel_layout": s.decCodecContext.ChannelLayout().String(),
"sample_fmt": s.decCodecContext.SampleFormat().Name(),
"sample_rate": strconv.Itoa(s.decCodecContext.SampleRate()),
"time_base": s.decCodecContext.TimeBase().String(),
}
buffersrc = astiav.FindFilterByName("abuffer")
buffersink = astiav.FindFilterByName("abuffersink")
if donut.Recipe.Audio.DonutStreamFilter != nil {
content = string(*donut.Recipe.Audio.DonutStreamFilter)
} else {
content = "anull" /* passthrough (dummy) filter for audio */
}
}
if isVideo {
args = astiav.FilterArgs{
"pix_fmt": strconv.Itoa(int(s.decCodecContext.PixelFormat())),
"pixel_aspect": s.decCodecContext.SampleAspectRatio().String(),
"time_base": s.decCodecContext.TimeBase().String(),
"video_size": strconv.Itoa(s.decCodecContext.Width()) + "x" + strconv.Itoa(s.decCodecContext.Height()),
}
buffersrc = astiav.FindFilterByName("buffer")
buffersink = astiav.FindFilterByName("buffersink")
if donut.Recipe.Video.DonutStreamFilter != nil {
content = string(*donut.Recipe.Video.DonutStreamFilter)
} else {
content = "null" /* passthrough (dummy) filter for video */
}
}
if buffersrc == nil {
return errors.New("main: buffersrc is nil")
}
if buffersink == nil {
return errors.New("main: buffersink is nil")
}
if s.buffersrcContext, err = s.filterGraph.NewFilterContext(buffersrc, "in", args); err != nil {
return fmt.Errorf("main: creating buffersrc context failed: %w", err)
}
if s.buffersinkContext, err = s.filterGraph.NewFilterContext(buffersink, "out", nil); err != nil {
return fmt.Errorf("main: creating buffersink context failed: %w", err)
}
outputs.SetName("in")
outputs.SetFilterContext(s.buffersrcContext)
outputs.SetPadIdx(0)
outputs.SetNext(nil)
inputs.SetName("out")
inputs.SetFilterContext(s.buffersinkContext)
inputs.SetPadIdx(0)
inputs.SetNext(nil)
if err = s.filterGraph.Parse(content, inputs, outputs); err != nil {
return fmt.Errorf("main: parsing filter failed: %w", err)
}
if err = s.filterGraph.Configure(); err != nil {
return fmt.Errorf("main: configuring filter failed: %w", err)
}
s.filterFrame = astiav.AllocFrame()
closer.Add(s.filterFrame.Free)
s.encPkt = astiav.AllocPacket()
closer.Add(s.encPkt.Free)
}
return nil
}
func (c *LibAVFFmpegStreamer) prepareBitStreamFilters(p *libAVParams, closer *astikit.Closer, donut *entities.DonutParameters) error {
for _, s := range p.streams {
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
var currentMedia *entities.DonutMediaTask
if isAudio {
currentMedia = &donut.Recipe.Audio
} else if isVideo {
currentMedia = &donut.Recipe.Video
} else {
c.l.Warnf("ignoring bit stream filter for media type %s", s.decCodecContext.MediaType().String())
continue
}
if currentMedia.DonutBitStreamFilter == nil {
c.l.Infof("no bit stream filter configured for %s", s.decCodecContext.String())
continue
}
bsf := astiav.FindBitStreamFilterByName(string(*currentMedia.DonutBitStreamFilter))
if bsf == nil {
return fmt.Errorf("can not find the filter %s", string(*currentMedia.DonutBitStreamFilter))
}
var err error
s.bsfContext, err = astiav.AllocBitStreamFilterContext(bsf)
if err != nil {
return fmt.Errorf("error while allocating bit stream context %w", err)
}
closer.Add(s.bsfContext.Free)
s.bsfContext.SetTimeBaseIn(s.inputStream.TimeBase())
if err := s.inputStream.CodecParameters().Copy(s.bsfContext.CodecParametersIn()); err != nil {
return fmt.Errorf("error while copying codec parameters %w", err)
}
if err := s.bsfContext.Initialize(); err != nil {
return fmt.Errorf("error while initiating %w", err)
}
s.bsfPacket = astiav.AllocPacket()
closer.Add(s.bsfPacket.Free)
}
return nil
}
func (c *LibAVFFmpegStreamer) processPacket(p *libAVParams, pkt *astiav.Packet, s *streamContext, donut *entities.DonutParameters) error {
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
var currentMedia *entities.DonutMediaTask
if isAudio {
currentMedia = &donut.Recipe.Audio
} else if isVideo {
currentMedia = &donut.Recipe.Video
} else {
c.l.Warnf("ignoring to stream for media type %s", s.decCodecContext.MediaType().String())
return nil
}
byPass := currentMedia.Action == entities.DonutBypass
if isVideo && byPass {
if donut.OnVideoFrame != nil {
pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
if err := donut.OnVideoFrame(pkt.Data(), entities.MediaFrameContext{
PTS: int(pkt.Pts()),
DTS: int(pkt.Dts()),
Duration: c.defineVideoDuration(s, pkt),
}); err != nil {
return err
}
}
return nil
}
if isAudio && byPass {
if donut.OnAudioFrame != nil {
pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
if err := donut.OnAudioFrame(pkt.Data(), entities.MediaFrameContext{
PTS: int(pkt.Pts()),
DTS: int(pkt.Dts()),
Duration: c.defineAudioDuration(s, pkt),
}); err != nil {
return err
}
}
return nil
}
// if isAudio {
// continue
// }
if err := s.decCodecContext.SendPacket(pkt); err != nil {
return err
}
for {
if err := s.decCodecContext.ReceiveFrame(s.decFrame); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
return err
}
if err := c.filterAndEncode(p, s.decFrame, s, donut); err != nil {
return err
}
}
return nil
}
func (c *LibAVFFmpegStreamer) applyBitStreamFilter(p *libAVParams, pkt *astiav.Packet, s *streamContext, donut *entities.DonutParameters) error {
if err := s.bsfContext.SendPacket(pkt); err != nil && !errors.Is(err, astiav.ErrEagain) {
return fmt.Errorf("sending bit stream packet failed: %w", err)
}
for {
if err := s.bsfContext.ReceivePacket(s.bsfPacket); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
break
}
return fmt.Errorf("receiving bit stream packet failed: %w", err)
}
c.processPacket(p, s.bsfPacket, s, donut)
s.bsfPacket.Unref()
}
return nil
}
func (c *LibAVFFmpegStreamer) filterAndEncode(p *libAVParams, f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) {
if err = s.buffersrcContext.BuffersrcAddFrame(f, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
return fmt.Errorf("adding frame failed: %w", err)
}
for {
s.filterFrame.Unref()
if err = s.buffersinkContext.BuffersinkGetFrame(s.filterFrame, astiav.NewBuffersinkFlags()); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
err = nil
break
}
return fmt.Errorf("getting frame failed: %w", err)
}
// TODO: should we avoid setting the picture type for audio?
s.filterFrame.SetPictureType(astiav.PictureTypeNone)
if err = c.encodeFrame(p, s.filterFrame, s, donut); err != nil {
err = fmt.Errorf("main: encoding and writing frame failed: %w", err)
return
}
}
return nil
}
func (c *LibAVFFmpegStreamer) encodeFrame(p *libAVParams, f *astiav.Frame, s *streamContext, donut *entities.DonutParameters) (err error) {
s.encPkt.Unref()
// when converting from aac to opus using filters,
// the np samples are bigger than the frame size
// to fix the error "more samples than frame size"
if f != nil {
f.SetNbSamples(s.encCodecContext.FrameSize())
}
if err = s.encCodecContext.SendFrame(f); err != nil {
return fmt.Errorf("sending frame failed: %w", err)
}
for {
if err = s.encCodecContext.ReceivePacket(s.encPkt); err != nil {
if errors.Is(err, astiav.ErrEof) || errors.Is(err, astiav.ErrEagain) {
err = nil
break
}
return fmt.Errorf("receiving packet failed: %w", err)
}
// TODO: check if we need to swap
// pkt.RescaleTs(s.inputStream.TimeBase(), s.decCodecContext.TimeBase())
s.encPkt.RescaleTs(s.inputStream.TimeBase(), s.encCodecContext.TimeBase())
isVideo := s.decCodecContext.MediaType() == astiav.MediaTypeVideo
if isVideo {
if donut.OnVideoFrame != nil {
if err := donut.OnVideoFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),
Duration: c.defineVideoDuration(s, s.encPkt),
}); err != nil {
return err
}
}
}
isAudio := s.decCodecContext.MediaType() == astiav.MediaTypeAudio
if isAudio {
if donut.OnAudioFrame != nil {
if err := donut.OnAudioFrame(s.encPkt.Data(), entities.MediaFrameContext{
PTS: int(s.encPkt.Pts()),
DTS: int(s.encPkt.Dts()),
Duration: c.defineAudioDuration(s, s.encPkt),
}); err != nil {
return err
}
}
}
}
return nil
}
func (c *LibAVFFmpegStreamer) defineInputFormat(streamFormat string) (*astiav.InputFormat, error) {
var inputFormat *astiav.InputFormat
if streamFormat != "" {
inputFormat = astiav.FindInputFormat(streamFormat)
if inputFormat == nil {
return nil, fmt.Errorf("ffmpeg/libav: could not find %s input format", streamFormat)
}
}
return inputFormat, nil
}
func (c *LibAVFFmpegStreamer) defineInputOptions(p *entities.DonutParameters, closer *astikit.Closer) *astiav.Dictionary {
var dic *astiav.Dictionary
if len(p.Recipe.Input.Options) > 0 {
dic = &astiav.Dictionary{}
closer.Add(dic.Free)
for k, v := range p.Recipe.Input.Options {
dic.Set(k.String(), v, 0)
}
}
return dic
}
func (c *LibAVFFmpegStreamer) defineAudioDuration(s *streamContext, pkt *astiav.Packet) time.Duration {
audioDuration := time.Duration(0)
if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeAudio {
// Audio
//
// dur = 12.416666ms
// sample = 48000
// frameSize = 596 (it can be variable for opus)
// 1s = dur * (sample/frameSize)
// ref https://developer.apple.com/documentation/coreaudiotypes/audiostreambasicdescription/1423257-mframesperpacket
// TODO: properly handle wraparound / roll over
// or explore av frame_size https://ffmpeg.org/doxygen/trunk/structAVCodecContext.html#aec57f0d859a6df8b479cd93ca3a44a33
// and libAV pts roll over
if float64(pkt.Dts())-c.lastAudioFrameDTS > 0 {
c.currentAudioFrameSize = float64(pkt.Dts()) - c.lastAudioFrameDTS
}
c.lastAudioFrameDTS = float64(pkt.Dts())
sampleRate := float64(s.encCodecContext.SampleRate())
audioDuration = time.Duration((c.currentAudioFrameSize / sampleRate) * float64(time.Second))
}
return audioDuration
}
func (c *LibAVFFmpegStreamer) defineVideoDuration(s *streamContext, _ *astiav.Packet) time.Duration {
videoDuration := time.Duration(0)
if s.inputStream.CodecParameters().MediaType() == astiav.MediaTypeVideo {
// Video
//
// dur = 0,033333
// sample = 30
// frameSize = 1
// 1s = dur * (sample/frameSize)
// we're assuming fixed video frame rate
videoDuration = time.Duration((float64(1) / float64(s.inputStream.AvgFrameRate().Num())) * float64(time.Second))
}
return videoDuration
}
// TODO: move this either to a mapper or make a PR for astiav
func (*LibAVFFmpegStreamer) libAVLogToString(l astiav.LogLevel) string {
const _Ciconst_AV_LOG_DEBUG = 0x30
const _Ciconst_AV_LOG_ERROR = 0x10
const _Ciconst_AV_LOG_FATAL = 0x8
const _Ciconst_AV_LOG_INFO = 0x20
const _Ciconst_AV_LOG_PANIC = 0x0
const _Ciconst_AV_LOG_QUIET = -0x8
const _Ciconst_AV_LOG_VERBOSE = 0x28
const _Ciconst_AV_LOG_WARNING = 0x18
switch l {
case _Ciconst_AV_LOG_WARNING:
return "WARN"
case _Ciconst_AV_LOG_VERBOSE:
return "VERBOSE"
case _Ciconst_AV_LOG_QUIET:
return "QUIET"
case _Ciconst_AV_LOG_PANIC:
return "PANIC"
case _Ciconst_AV_LOG_INFO:
return "INFO"
case _Ciconst_AV_LOG_FATAL:
return "FATAL"
case _Ciconst_AV_LOG_DEBUG:
return "DEBUG"
case _Ciconst_AV_LOG_ERROR:
return "ERROR"
default:
return "UNKNOWN LEVEL"
}
}

View File

@@ -1,155 +0,0 @@
package controllers
import (
"encoding/json"
"fmt"
"io"
"time"
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"go.uber.org/zap"
)
type StreamingController struct {
c *entities.Config
l *zap.SugaredLogger
}
func NewStreamingController(c *entities.Config, l *zap.SugaredLogger) *StreamingController {
return &StreamingController{
c: c,
l: l,
}
}
func (c *StreamingController) Stream(sp *entities.StreamParameters) {
r, w := io.Pipe()
defer r.Close()
defer w.Close()
defer sp.SRTConnection.Close()
defer sp.WebRTCConn.Close()
defer sp.Cancel()
// TODO: pick the proper transport? is it possible to get rtp instead?
go c.readFromSRTIntoWriterPipe(sp.SRTConnection, w)
// reading from reader pipe to the mpeg-ts demuxer
mpegTSDemuxer := astits.NewDemuxer(sp.Ctx, r)
eia608Reader := NewEIA608Reader()
h264PID := uint16(0)
c.l.Infow("streaming has started")
for {
select {
case <-sp.Ctx.Done():
c.l.Errorw("streaming has stopped")
return
default:
// fetching mpeg-ts data
// ref https://tsduck.io/download/docs/mpegts-introduction.pdf
mpegTSDemuxData, err := mpegTSDemuxer.NextData()
if err != nil {
c.l.Errorw("failed to demux mpeg-ts",
"error", err,
)
return
}
if mpegTSDemuxData.PMT != nil {
// writing mpeg-ts media codec info to the metadata webrtc channel
h264PID = c.captureMediaInfoAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack, h264PID)
c.captureBitrateAndSendToWebRTC(mpegTSDemuxData, sp.MetadataTrack)
}
// writing mpeg-ts video/captions to webrtc channels
err = c.writeMpegtsToWebRTC(mpegTSDemuxData, h264PID, err, sp, eia608Reader)
if err != nil {
c.l.Errorw("failed to write an mpeg-ts to web rtc",
"error", err,
)
return
}
}
}
}
func (c *StreamingController) writeMpegtsToWebRTC(mpegTSDemuxData *astits.DemuxerData, h264PID uint16, err error, sp *entities.StreamParameters, eia608Reader *EIA608Reader) error {
if mpegTSDemuxData.PID == h264PID && mpegTSDemuxData.PES != nil {
if err = sp.VideoTrack.WriteSample(media.Sample{Data: mpegTSDemuxData.PES.Data, Duration: time.Second / 30}); err != nil {
return err
}
captions, err := eia608Reader.Parse(mpegTSDemuxData.PES)
if err != nil {
return err
}
if captions != "" {
captionsMsg, err := BuildCaptionsMessage(mpegTSDemuxData.PES.Header.OptionalHeader.PTS, captions)
if err != nil {
return err
}
sp.MetadataTrack.SendText(captionsMsg)
}
}
return nil
}
func (*StreamingController) captureBitrateAndSendToWebRTC(d *astits.DemuxerData, metadataTrack *webrtc.DataChannel) {
for _, d := range d.PMT.ProgramDescriptors {
if d.MaximumBitrate != nil {
bitrateInMbitsPerSecond := float32(d.MaximumBitrate.Bitrate) / float32(125000)
msg, _ := json.Marshal(entities.Message{
Type: entities.MessageTypeMetadata,
Message: fmt.Sprintf("Bitrate %.2fMbps", bitrateInMbitsPerSecond),
})
metadataTrack.SendText(string(msg))
}
}
}
func (*StreamingController) captureMediaInfoAndSendToWebRTC(d *astits.DemuxerData, metadataTrack *webrtc.DataChannel, h264PID uint16) uint16 {
for _, es := range d.PMT.ElementaryStreams {
msg, _ := json.Marshal(entities.Message{
Type: entities.MessageTypeMetadata,
Message: es.StreamType.String(),
})
metadataTrack.SendText(string(msg))
if es.StreamType == astits.StreamTypeH264Video {
h264PID = es.ElementaryPID
}
}
return h264PID
}
func (c *StreamingController) readFromSRTIntoWriterPipe(srtConnection *astisrt.Connection, w *io.PipeWriter) {
defer srtConnection.Close()
inboundMpegTsPacket := make([]byte, c.c.SRTReadBufferSizeBytes)
for {
n, err := srtConnection.Read(inboundMpegTsPacket)
if err != nil {
c.l.Errorw("str conn failed to write data to buffer",
"error", err,
)
break
}
if _, err := w.Write(inboundMpegTsPacket[:n]); err != nil {
c.l.Errorw("failed to write mpeg-ts into the pipe",
"error", err,
)
break
}
}
}

View File

@@ -1,23 +1,23 @@
package controllers
package streammiddlewares
import (
"encoding/json"
"github.com/asticode/go-astits"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
gocaption "github.com/szatmary/gocaption"
)
type EIA608Reader struct {
type eia608Reader struct {
frame gocaption.EIA608Frame
}
func NewEIA608Reader() (r *EIA608Reader) {
return &EIA608Reader{}
func newEIA608Reader() (r *eia608Reader) {
return &eia608Reader{}
}
func (r *EIA608Reader) Parse(PES *astits.PESData) (string, error) {
nalus, err := ParseNALUs(PES.Data)
func (r *eia608Reader) parse(data []byte) (string, error) {
nalus, err := controllers.ParseNALUs(data)
if err != nil {
return "", err
}
@@ -47,9 +47,10 @@ func (r *EIA608Reader) Parse(PES *astits.PESData) (string, error) {
return "", nil
}
func BuildCaptionsMessage(pts *astits.ClockReference, captions string) (string, error) {
// TODO: port to mappers
func (r *eia608Reader) buildCaptionsMessage(pts int64, captions string) (string, error) {
cue := entities.Cue{
StartTime: pts.Base,
StartTime: pts,
Text: captions,
Type: "captions",
}

View File

@@ -2,11 +2,13 @@ package controllers
import (
"context"
"encoding/json"
"net"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"go.uber.org/zap"
)
@@ -14,20 +16,64 @@ type WebRTCController struct {
c *entities.Config
l *zap.SugaredLogger
api *webrtc.API
m *mapper.Mapper
}
func NewWebRTCController(
c *entities.Config,
l *zap.SugaredLogger,
api *webrtc.API,
m *mapper.Mapper,
) *WebRTCController {
return &WebRTCController{
c: c,
l: l,
api: api,
m: m,
}
}
func (c *WebRTCController) Setup(cancel context.CancelFunc, donutRecipe *entities.DonutRecipe, params entities.RequestParams) (*entities.WebRTCSetupResponse, error) {
response := &entities.WebRTCSetupResponse{}
peer, err := c.CreatePeerConnection(cancel)
if err != nil {
return nil, err
}
response.Connection = peer
var videoTrack *webrtc.TrackLocalStaticSample
videoTrack, err = c.CreateTrack(peer, donutRecipe.Video.Codec, string(entities.VideoType), params.StreamID)
if err != nil {
return nil, err
}
response.Video = videoTrack
var audioTrack *webrtc.TrackLocalStaticSample
audioTrack, err = c.CreateTrack(peer, donutRecipe.Audio.Codec, string(entities.AudioType), params.StreamID)
if err != nil {
return nil, err
}
response.Audio = audioTrack
metadataSender, err := c.CreateDataChannel(peer, entities.MetadataChannelID)
if err != nil {
return nil, err
}
response.Data = metadataSender
if err = c.SetRemoteDescription(peer, params.Offer); err != nil {
return nil, err
}
localDescription, err := c.GatheringWebRTC(peer)
if err != nil {
return nil, err
}
response.LocalSDP = localDescription
return response, nil
}
func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*webrtc.PeerConnection, error) {
c.l.Infow("trying to set up web rtc conn")
@@ -69,8 +115,8 @@ func (c *WebRTCController) CreatePeerConnection(cancel context.CancelFunc) (*web
return peerConnection, nil
}
func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, track entities.Track, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
codecCapability := mapper.FromTrackToRTPCodecCapability(track)
func (c *WebRTCController) CreateTrack(peer *webrtc.PeerConnection, codec entities.Codec, id string, streamId string) (*webrtc.TrackLocalStaticSample, error) {
codecCapability := c.m.FromTrackToRTPCodecCapability(codec)
webRTCtrack, err := webrtc.NewTrackLocalStaticSample(codecCapability, id, streamId)
if err != nil {
return nil, err
@@ -99,7 +145,6 @@ func (c *WebRTCController) SetRemoteDescription(peer *webrtc.PeerConnection, des
}
func (c *WebRTCController) GatheringWebRTC(peer *webrtc.PeerConnection) (*webrtc.SessionDescription, error) {
c.l.Infow("Gathering WebRTC Candidates")
gatherComplete := webrtc.GatheringCompletePromise(peer)
answer, err := peer.CreateAnswer(nil)
@@ -115,6 +160,26 @@ func (c *WebRTCController) GatheringWebRTC(peer *webrtc.PeerConnection) (*webrtc
return peer.LocalDescription(), nil
}
func (c *WebRTCController) SendMediaSample(mediaTrack *webrtc.TrackLocalStaticSample, data []byte, mediaCtx entities.MediaFrameContext) error {
if err := mediaTrack.WriteSample(media.Sample{Data: data, Duration: mediaCtx.Duration}); err != nil {
return err
}
return nil
}
func (c *WebRTCController) SendMetadata(metaTrack *webrtc.DataChannel, st *entities.Stream) error {
msg := c.m.FromStreamToEntityMessage(*st)
msgBytes, err := json.Marshal(msg)
if err != nil {
return err
}
err = metaTrack.SendText(string(msgBytes))
if err != nil {
return err
}
return nil
}
func NewWebRTCSettingsEngine(c *entities.Config, tcpListener net.Listener, udpListener net.PacketConn) webrtc.SettingEngine {
settingEngine := webrtc.SettingEngine{}

View File

@@ -3,8 +3,10 @@ package entities
import (
"context"
"fmt"
"strings"
"time"
astisrt "github.com/asticode/go-astisrt/pkg"
"github.com/asticode/go-astiav"
"github.com/pion/webrtc/v3"
)
@@ -12,11 +14,18 @@ const (
MetadataChannelID string = "metadata"
)
type WebRTCSetupResponse struct {
Connection *webrtc.PeerConnection
Video *webrtc.TrackLocalStaticSample
Audio *webrtc.TrackLocalStaticSample
Data *webrtc.DataChannel
LocalSDP *webrtc.SessionDescription
}
type RequestParams struct {
SRTHost string
SRTPort uint16 `json:",string"`
SRTStreamID string
Offer webrtc.SessionDescription
StreamURL string
StreamID string
Offer webrtc.SessionDescription
}
func (p *RequestParams) Valid() error {
@@ -24,16 +33,18 @@ func (p *RequestParams) Valid() error {
return ErrMissingParamsOffer
}
if p.SRTHost == "" {
return ErrMissingSRTHost
if p.StreamID == "" {
return ErrMissingStreamID
}
if p.SRTPort == 0 {
return ErrMissingSRTPort
if p.StreamURL == "" {
return ErrMissingStreamURL
}
isRTMP := strings.Contains(strings.ToLower(p.StreamURL), "rtmp")
isSRT := strings.Contains(strings.ToLower(p.StreamURL), "srt")
if p.SRTStreamID == "" {
return ErrMissingSRTStreamID
if !(isRTMP || isSRT) {
return ErrUnsupportedStreamURL
}
return nil
@@ -43,7 +54,7 @@ func (p *RequestParams) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("ParamsOffer %v:%v/%v", p.SRTHost, p.SRTPort, p.SRTStreamID)
return fmt.Sprintf("RequestParams {StreamURL: %s, StreamID: %s}", p.StreamURL, p.StreamID)
}
type MessageType string
@@ -57,14 +68,64 @@ type Message struct {
Message string
}
type TrackType string
type Codec string
type MediaType string
const (
H264 TrackType = "h264"
UnknownCodec Codec = "unknownCodec"
H264 Codec = "h264"
H265 Codec = "h265"
VP8 Codec = "vp8"
VP9 Codec = "vp9"
AV1 Codec = "av1"
AAC Codec = "aac"
Opus Codec = "opus"
)
type Track struct {
Type TrackType
const (
UnknownType MediaType = "unknownMediaType"
VideoType MediaType = "video"
AudioType MediaType = "audio"
)
type Stream struct {
Codec Codec
Type MediaType
Id uint16
Index uint16
}
type MediaFrameContext struct {
// DTS decoding timestamp
DTS int
// PTS presentation timestamp
PTS int
// Media frame duration
Duration time.Duration
}
type StreamInfo struct {
Streams []Stream
}
func (s *StreamInfo) VideoStreams() []Stream {
var result []Stream
for _, s := range s.Streams {
if s.Type == VideoType {
result = append(result, s)
}
}
return result
}
func (s *StreamInfo) AudioStreams() []Stream {
var result []Stream
for _, s := range s.Streams {
if s.Type == AudioType {
result = append(result, s)
}
}
return result
}
type Cue struct {
@@ -73,15 +134,143 @@ type Cue struct {
Text string
}
type StreamParameters struct {
WebRTCConn *webrtc.PeerConnection
Cancel context.CancelFunc
Ctx context.Context
SRTConnection *astisrt.Connection
VideoTrack *webrtc.TrackLocalStaticSample
MetadataTrack *webrtc.DataChannel
type DonutParameters struct {
Cancel context.CancelFunc
Ctx context.Context
Recipe DonutRecipe
OnClose func()
OnError func(err error)
OnStream func(st *Stream) error
OnVideoFrame func(data []byte, c MediaFrameContext) error
OnAudioFrame func(data []byte, c MediaFrameContext) error
}
type DonutMediaTaskAction string
var DonutTranscode DonutMediaTaskAction = "transcode"
var DonutBypass DonutMediaTaskAction = "bypass"
type DonutBitStreamFilter string
var DonutH264AnnexB DonutBitStreamFilter = "h264_mp4toannexb"
type DonutStreamFilter string
func AudioResamplerFilter(sampleRate int) *DonutStreamFilter {
filter := DonutStreamFilter(fmt.Sprintf("aresample=%d", sampleRate))
return &filter
}
// TODO: split entities per domain or files avoiding name collision.
// DonutMediaTask is a transformation template to apply over a media.
type DonutMediaTask struct {
// Action the action that needs to be performed
Action DonutMediaTaskAction
// Codec is the main codec, it might be used depending on the action.
Codec Codec
// CodecContextOptions is a list of options to be applied on codec context.
// If no value is provided ffmpeg will use defaults.
// For instance, if one does not provide bit rate, it'll fallback to 64000 bps (opus)
CodecContextOptions []LibAVOptionsCodecContext
// DonutBitStreamFilter is the bitstream filter
DonutBitStreamFilter *DonutBitStreamFilter
// DonutStreamFilter is a regular filter
DonutStreamFilter *DonutStreamFilter
}
type DonutInputOptionKey string
func (d DonutInputOptionKey) String() string {
return string(d)
}
var DonutSRTStreamID DonutInputOptionKey = "srt_streamid"
var DonutSRTsmoother DonutInputOptionKey = "smoother"
var DonutSRTTranstype DonutInputOptionKey = "transtype"
var DonutRTMPLive DonutInputOptionKey = "rtmp_live"
type DonutInputFormat string
func (d DonutInputFormat) String() string {
return string(d)
}
var DonutMpegTSFormat DonutInputFormat = "mpegts"
var DonutFLVFormat DonutInputFormat = "flv"
type DonutAppetizer struct {
URL string
Format DonutInputFormat
Options map[DonutInputOptionKey]string
}
type DonutRecipe struct {
Input DonutAppetizer
Video DonutMediaTask
Audio DonutMediaTask
}
type LibAVOptionsCodecContext func(c *astiav.CodecContext)
func SetSampleRate(sampleRate int) LibAVOptionsCodecContext {
return func(c *astiav.CodecContext) {
c.SetSampleRate(sampleRate)
}
}
func SetTimeBase(num, den int) LibAVOptionsCodecContext {
return func(c *astiav.CodecContext) {
c.SetTimeBase(astiav.NewRational(num, den))
}
}
func SetBitRate(bitRate int64) LibAVOptionsCodecContext {
return func(c *astiav.CodecContext) {
c.SetBitRate(bitRate)
}
}
func SetBaselineProfile() LibAVOptionsCodecContext {
return func(c *astiav.CodecContext) {
c.SetProfile(astiav.ProfileH264Baseline)
}
}
func SetGopSize(gopSize int) LibAVOptionsCodecContext {
return func(c *astiav.CodecContext) {
c.SetGopSize(gopSize)
}
}
// SetSampleFormat sets sample format,
// CAUTION it only contains partial list of fmt
// TODO: move it to mappers
func SetSampleFormat(fmt string) LibAVOptionsCodecContext {
var sf astiav.SampleFormat
if fmt == "fltp" {
sf = astiav.SampleFormatFltp
} else if fmt == "flt" {
sf = astiav.SampleFormatFlt
} else {
// DANGER: assuming a default value
sf = astiav.SampleFormatS16
}
return func(c *astiav.CodecContext) {
c.SetSampleFormat(sf)
}
}
// TODO: implement proper matching
// DonutTransformRecipe
// AudioTask: {Action: Transcode, From: AAC, To: Opus}
// VideoTask: {Action: Bypass, From: H264, To: H264}
type Config struct {
HTTPPort int32 `required:"true" default:"8080"`
HTTPHost string `required:"true" default:"0.0.0.0"`
@@ -92,8 +281,13 @@ type Config struct {
ICEReadBufferSize int `required:"true" default:"8"`
ICEExternalIPsDNAT []string `required:"true" default:"127.0.0.1"`
EnableICEMux bool `require:"true" default:"false"`
StunServers []string `required:"true" default:"stun:stun.l.google.com:19302"`
StunServers []string `required:"true" default:"stun:stun.l.google.com:19302,stun:stun1.l.google.com:19302,stun:stun2.l.google.com:19302,stun:stun4.l.google.com:19302"`
SRTConnectionLatencyMS int32 `required:"true" default:"300"`
SRTReadBufferSizeBytes int `required:"true" default:"1316"`
// MPEG-TS consists of single units of 188 bytes. Multiplying 188*7 we get 1316,
// which is the maximum product of 188 that is less than MTU 1500 (188*8=1504)
// ref https://github.com/Haivision/srt/blob/master/docs/features/live-streaming.md#transmitting-mpeg-ts-binary-protocol-over-srt
SRTReadBufferSizeBytes int `required:"true" default:"1316"`
ProbingSize int `required:"true" default:"120"`
}

View File

@@ -1,12 +1,34 @@
package entities
import "errors"
import (
"errors"
"fmt"
)
var ErrHTTPGetOnly = errors.New("you must use http GET verb")
var ErrHTTPPostOnly = errors.New("you must use http POST verb")
var ErrMissingParamsOffer = errors.New("ParamsOffer must not be nil")
var ErrMissingStreamURL = errors.New("stream URL must not be nil")
var ErrMissingStreamID = errors.New("stream ID must not be nil")
var ErrUnsupportedStreamURL = errors.New("unsupported stream")
var ErrMissingSRTHost = errors.New("SRTHost must not be nil")
var ErrMissingSRTPort = errors.New("SRTPort must be valid")
var ErrMissingSRTStreamID = errors.New("SRTStreamID must not be empty")
var ErrMissingWebRTCSetup = errors.New("WebRTCController.SetupPeerConnection must be called first")
var ErrMissingRemoteOffer = errors.New("nil offer, in order to connect one must pass a valid offer")
var ErrMissingRequestParams = errors.New("RequestParams must not be nil")
var ErrMissingProcess = errors.New("there is no process running")
var ErrMissingProber = errors.New("there is no prober")
var ErrMissingStreamer = errors.New("there is no streamer")
var ErrMissingCompatibleStreams = errors.New("there is no compatible streams")
// FFmpeg/LibAV
var ErrFFMpegLibAV = errors.New("ffmpeg/libav error")
var ErrFFmpegLibAVNotFound = fmt.Errorf("%w input not found", ErrFFMpegLibAV)
var ErrFFmpegLibAVFormatContextIsNil = fmt.Errorf("%w format context is nil", ErrFFMpegLibAV)
var ErrFFmpegLibAVFormatContextOpenInputFailed = fmt.Errorf("%w format context open input has failed", ErrFFMpegLibAV)
var ErrFFmpegLibAVFindStreamInfo = fmt.Errorf("%w could not find stream info", ErrFFMpegLibAV)

View File

@@ -1,16 +1,190 @@
package mapper
import (
"fmt"
"strings"
"github.com/asticode/go-astiav"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/pion/webrtc/v3"
"go.uber.org/zap"
)
func FromTrackToRTPCodecCapability(track entities.Track) webrtc.RTPCodecCapability {
// TODO: split mapper by subject (either by files alone or new modules)
type Mapper struct {
l *zap.SugaredLogger
}
func NewMapper(l *zap.SugaredLogger) *Mapper {
return &Mapper{l: l}
}
func (m *Mapper) FromTrackToRTPCodecCapability(codec entities.Codec) webrtc.RTPCodecCapability {
// TODO: enrich codec capability, check if it's necessary
response := webrtc.RTPCodecCapability{}
if track.Type == entities.H264 {
if codec == entities.H264 {
response.MimeType = webrtc.MimeTypeH264
} else if codec == entities.H265 {
response.MimeType = webrtc.MimeTypeH265
} else if codec == entities.Opus {
response.MimeType = webrtc.MimeTypeOpus
} else {
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", codec)
}
return response
}
func (m *Mapper) FromWebRTCSessionDescriptionToStreamInfo(desc webrtc.SessionDescription) (*entities.StreamInfo, error) {
sdpDesc, err := desc.Unmarshal()
if err != nil {
return nil, err
}
result := &entities.StreamInfo{}
unique := map[entities.Codec]entities.Stream{}
for _, desc := range sdpDesc.MediaDescriptions {
// Currently defined media (MediaName.Media) are "audio","video", "text", "application", and "message"
// ref https://datatracker.ietf.org/doc/html/rfc4566#section-5.14
// ref https://aomediacodec.github.io/av1-rtp-spec/#73-examples
// ref https://webrtchacks.com/sdp-anatomy/
if desc.MediaName.Media != "video" && desc.MediaName.Media != "audio" {
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", desc.MediaName.Media)
continue
}
var mediaType entities.MediaType
if desc.MediaName.Media == "video" {
mediaType = entities.VideoType
}
if desc.MediaName.Media == "audio" {
mediaType = entities.AudioType
}
for _, a := range desc.Attributes {
if strings.Contains(a.Key, "rtpmap") {
// Samples:
// Key:rtpmap Value: 98 VP9/90000
// Key:rtpmap Value: 102 H264/90000
// Key:rtpmap Value: 102 H264/90000
// Key:rtpmap Value: 47 AV1/90000
// Key:rtpmap Value: 111 opus/48000/2
if strings.Contains(a.Value, "H264") {
unique[entities.H264] = entities.Stream{
Codec: entities.H264,
Type: mediaType,
}
} else if strings.Contains(a.Value, "H265") {
unique[entities.H265] = entities.Stream{
Codec: entities.H265,
Type: mediaType,
}
} else if strings.Contains(a.Value, "VP8") {
unique[entities.VP8] = entities.Stream{
Codec: entities.VP8,
Type: mediaType,
}
} else if strings.Contains(a.Value, "VP9") {
unique[entities.VP9] = entities.Stream{
Codec: entities.VP9,
Type: mediaType,
}
} else if strings.Contains(a.Value, "AV1") {
unique[entities.AV1] = entities.Stream{
Codec: entities.AV1,
Type: mediaType,
}
} else if strings.Contains(a.Value, "opus") {
unique[entities.Opus] = entities.Stream{
Codec: entities.Opus,
Type: mediaType,
}
} else {
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", a.Value)
}
}
}
for _, v := range unique {
result.Streams = append(result.Streams, v)
}
}
return result, nil
}
func (m *Mapper) FromStreamInfoToEntityMessages(si *entities.StreamInfo) []entities.Message {
var result []entities.Message
for _, s := range si.Streams {
result = append(result, m.FromStreamToEntityMessage(s))
}
return result
}
func (m *Mapper) FromStreamToEntityMessage(st entities.Stream) entities.Message {
return entities.Message{
Type: entities.MessageTypeMetadata,
Message: string(st.Codec),
}
}
func (m *Mapper) FromLibAVStreamToEntityStream(libavStream *astiav.Stream) entities.Stream {
st := entities.Stream{}
if libavStream.CodecParameters().MediaType() == astiav.MediaTypeAudio {
st.Type = entities.AudioType
} else if libavStream.CodecParameters().MediaType() == astiav.MediaTypeVideo {
st.Type = entities.VideoType
} else {
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", libavStream.CodecParameters().MediaType())
st.Type = entities.UnknownType
}
// https://github.com/FFmpeg/FFmpeg/blob/master/libavcodec/codec_desc.c#L34
if libavStream.CodecParameters().CodecID().Name() == "h264" {
st.Codec = entities.H264
} else if libavStream.CodecParameters().CodecID().Name() == "h265" {
st.Codec = entities.H265
} else if libavStream.CodecParameters().CodecID().Name() == "hevc" {
st.Codec = entities.H265
} else if libavStream.CodecParameters().CodecID().Name() == "av1" {
st.Codec = entities.AV1
} else if libavStream.CodecParameters().CodecID().Name() == "aac" {
st.Codec = entities.AAC
} else if libavStream.CodecParameters().CodecID().Name() == "vp8" {
st.Codec = entities.VP8
} else if libavStream.CodecParameters().CodecID().Name() == "vp9" {
st.Codec = entities.VP9
} else if libavStream.CodecParameters().CodecID().Name() == "opus" {
st.Codec = entities.Opus
} else {
m.l.Info("[[[[TODO: mapper not implemented]]]] for ", libavStream.CodecParameters().CodecID().Name())
st.Codec = entities.UnknownCodec
}
st.Id = uint16(libavStream.ID())
st.Index = uint16(libavStream.Index())
return st
}
func (m *Mapper) FromStreamCodecToLibAVCodecID(codec entities.Codec) (astiav.CodecID, error) {
if codec == entities.H264 {
return astiav.CodecIDH264, nil
} else if codec == entities.H265 {
return astiav.CodecIDHevc, nil
} else if codec == entities.Opus {
return astiav.CodecIDOpus, nil
} else if codec == entities.VP8 {
return astiav.CodecIDVp8, nil
} else if codec == entities.VP9 {
return astiav.CodecIDVp9, nil
} else if codec == entities.AAC {
return astiav.CodecIDAac, nil
}
// TODO: port error to entities
return astiav.CodecIDH264, fmt.Errorf("cannot find a libav codec id for donut codec id %+v", codec)
}

View File

@@ -0,0 +1,83 @@
package teststreaming
import (
"log"
"os/exec"
"strings"
"time"
"github.com/flavioribeiro/donut/internal/entities"
)
const (
ffmpeg_startup = 5 * time.Second
)
type FFmpeg interface {
Start() error
Stop() error
ExpectedStreams() []entities.Stream
Output() entities.RequestParams
}
type testFFmpeg struct {
arguments string
expectedStreams []entities.Stream
cmdExec *exec.Cmd
output entities.RequestParams
}
func (t *testFFmpeg) Start() error {
t.cmdExec = exec.Command("ffmpeg", prepareFFmpegParameters(t.arguments)...)
// For debugging:
// t.cmdExec.Stdout = os.Stdout
// t.cmdExec.Stderr = os.Stderr
go func() {
if err := t.cmdExec.Run(); err != nil {
if strings.Contains(err.Error(), "signal: killed") {
return
}
log.Fatalln("XXXXXXXXXXXX error while running ffmpeg XXXXXXXXXXXX", err.Error())
return
}
}()
// TODO: check the output to determine whether the ffmpeg is ready to accept connections
time.Sleep(ffmpeg_startup)
return nil
}
func (t *testFFmpeg) Stop() error {
if t == nil || t.cmdExec == nil {
return entities.ErrMissingProcess
}
if err := t.cmdExec.Process.Kill(); err != nil {
return err
}
return nil
}
func (t *testFFmpeg) ExpectedStreams() []entities.Stream {
return t.expectedStreams
}
func (t *testFFmpeg) Output() entities.RequestParams {
return t.output
}
func prepareFFmpegParameters(cmd string) []string {
result := []string{}
for _, item := range strings.Split(cmd, " ") {
item = strings.ReplaceAll(item, "\\", "")
item = strings.ReplaceAll(item, "\n", "")
item = strings.ReplaceAll(item, "\t", "")
item = strings.ReplaceAll(item, " ", "")
if item != "" {
result = append(result, item)
}
}
return result
}

View File

@@ -0,0 +1,49 @@
package teststreaming
import (
"fmt"
"strconv"
"github.com/flavioribeiro/donut/internal/entities"
)
// For debugging:
// use <-loglevel verbose>
// remove <-nostats>
// DO NOT REMOVE THE EXTRA SPACES ON THE END OF THESE LINES
var ffmpeg_input = `
-hide_banner -loglevel error -nostats
-re -f lavfi -i testsrc2=size=512x288:rate=30,format=yuv420p
-f lavfi -i sine=frequency=1000:sample_rate=44100
`
// OUTPUT PORTS: the output port must be different for each ffmpeg case so it might run in parallel
var outputPort = 45678
var FFMPEG_LIVE_SRT_MPEG_TS_H264_AAC = testFFmpeg{
arguments: ffmpeg_input + `
-c:v libx264 -preset veryfast -tune zerolatency -profile:v baseline
-b:v 500k -bufsize 1000k -x264opts keyint=30:min-keyint=30:scenecut=-1
-c:a aac -b:a 96k -f mpegts srt://0.0.0.0:` + strconv.Itoa(outputPort+0) + `?mode=listener&smoother=live&transtype=live
`,
expectedStreams: []entities.Stream{
{Index: 0, Id: uint16(256), Codec: entities.H264, Type: entities.VideoType},
{Index: 1, Id: uint16(257), Codec: entities.AAC, Type: entities.AudioType},
},
output: entities.RequestParams{StreamURL: fmt.Sprintf("srt://127.0.0.1:%d", outputPort+0), StreamID: "stream-id"},
}
// ref https://x265.readthedocs.io/en/stable/cli.html#executable-options
var FFMPEG_LIVE_SRT_MPEG_TS_H265_AAC = testFFmpeg{
arguments: ffmpeg_input + `
-c:v libx265 -preset veryfast -profile:v main
-b:v 500k -bufsize 1000k -x265-params keyint=30:min-keyint=30:scenecut=0
-c:a aac -b:a 96k -f mpegts srt://0.0.0.0:` + strconv.Itoa(outputPort+1) + `?mode=listener&smoother=live&transtype=live
`,
expectedStreams: []entities.Stream{
{Index: 0, Id: uint16(256), Codec: entities.H265, Type: entities.VideoType},
{Index: 1, Id: uint16(257), Codec: entities.AAC, Type: entities.AudioType},
},
output: entities.RequestParams{StreamURL: fmt.Sprintf("srt://127.0.0.1:%d", outputPort+1), StreamID: "stream-id"},
}

View File

@@ -0,0 +1,67 @@
package web
import (
"log"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/controllers/engine"
"github.com/flavioribeiro/donut/internal/controllers/probers"
"github.com/flavioribeiro/donut/internal/controllers/streamers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"github.com/flavioribeiro/donut/internal/web/handlers"
"github.com/kelseyhightower/envconfig"
"go.uber.org/fx"
"go.uber.org/zap"
)
func Dependencies(enableICEMux bool) fx.Option {
var c entities.Config
err := envconfig.Process("donut", &c)
if err != nil {
log.Fatal(err.Error())
}
c.EnableICEMux = enableICEMux
return fx.Options(
// HTTP Server
fx.Provide(NewHTTPServer),
// HTTP router
fx.Provide(NewServeMux),
// HTTP handlers
fx.Provide(handlers.NewSignalingHandler),
fx.Provide(handlers.NewIndexHandler),
// ICE mux servers
fx.Provide(controllers.NewTCPICEServer),
fx.Provide(controllers.NewUDPICEServer),
// Controllers
fx.Provide(controllers.NewWebRTCController),
fx.Provide(controllers.NewWebRTCSettingsEngine),
fx.Provide(controllers.NewWebRTCMediaEngine),
fx.Provide(controllers.NewWebRTCAPI),
fx.Provide(streamers.NewLibAVFFmpegStreamer),
fx.Provide(probers.NewLibAVFFmpeg),
fx.Provide(engine.NewDonutEngineController),
// // Stream middlewares
// fx.Provide(streammiddlewares.NewStreamInfo),
// fx.Provide(streammiddlewares.NewEIA608),
// Mappers
fx.Provide(mapper.NewMapper),
// Logging, Config constructors
fx.Provide(func() *zap.SugaredLogger {
logger, _ := zap.NewProduction()
return logger.Sugar()
}),
fx.Provide(func() *entities.Config {
return &c
}),
)
}

View File

@@ -6,129 +6,127 @@ import (
"net/http"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/controllers/engine"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/mapper"
"go.uber.org/zap"
)
type SignalingHandler struct {
c *entities.Config
l *zap.SugaredLogger
webRTCController *controllers.WebRTCController
srtController *controllers.SRTController
streamingController *controllers.StreamingController
c *entities.Config
l *zap.SugaredLogger
webRTCController *controllers.WebRTCController
mapper *mapper.Mapper
donut *engine.DonutEngineController
}
func NewSignalingHandler(
c *entities.Config,
log *zap.SugaredLogger,
webRTCController *controllers.WebRTCController,
srtController *controllers.SRTController,
streamingController *controllers.StreamingController,
mapper *mapper.Mapper,
donut *engine.DonutEngineController,
) *SignalingHandler {
return &SignalingHandler{
c: c,
l: log,
webRTCController: webRTCController,
srtController: srtController,
streamingController: streamingController,
c: c,
l: log,
webRTCController: webRTCController,
mapper: mapper,
donut: donut,
}
}
func (h *SignalingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) error {
if r.Method != http.MethodPost {
h.l.Errorw("unexpected method")
return entities.ErrHTTPPostOnly
}
params := entities.RequestParams{}
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
h.l.Errorw("error while decoding request params json",
"error", err,
)
params, err := h.createAndValidateParams(r)
if err != nil {
return err
}
if err := params.Valid(); err != nil {
h.l.Errorw("invalid params",
"error", err,
)
h.l.Infof("RequestParams %s", params.String())
donutEngine, err := h.donut.EngineFor(&params)
if err != nil {
return err
}
h.l.Infof("DonutEngine %#v", donutEngine)
// server side media info
serverStreamInfo, err := donutEngine.ServerIngredients()
if err != nil {
return err
}
h.l.Infof("ServerIngredients %#v", serverStreamInfo)
// client side media support
clientStreamInfo, err := donutEngine.ClientIngredients()
if err != nil {
return err
}
h.l.Infof("ClientIngredients %#v", clientStreamInfo)
donutRecipe, err := donutEngine.RecipeFor(serverStreamInfo, clientStreamInfo)
if err != nil {
return err
}
h.l.Infof("DonutRecipe %#v", donutRecipe)
// We can't defer calling cancel here because it'll live alongside the stream.
ctx, cancel := context.WithCancel(context.Background())
peer, err := h.webRTCController.CreatePeerConnection(cancel)
webRTCResponse, err := h.webRTCController.Setup(cancel, donutRecipe, params)
if err != nil {
h.l.Errorw("error while setting up web rtc connection",
"error", err,
)
cancel()
return err
}
h.l.Infof("WebRTCResponse %#v", webRTCResponse)
// TODO: create tracks according with SRT available streams
// Create a video track
videoTrack, err := h.webRTCController.CreateTrack(
peer,
entities.Track{
Type: entities.H264,
}, "video", params.SRTStreamID,
)
if err != nil {
h.l.Errorw("error while creating a web rtc track",
"error", err,
)
return err
}
go donutEngine.Serve(&entities.DonutParameters{
Cancel: cancel,
Ctx: ctx,
metadataSender, err := h.webRTCController.CreateDataChannel(peer, entities.MetadataChannelID)
if err != nil {
h.l.Errorw("error while creating a web rtc data channel",
"error", err,
)
return err
}
Recipe: *donutRecipe,
if err = h.webRTCController.SetRemoteDescription(peer, params.Offer); err != nil {
h.l.Errorw("error while setting a remote web rtc description",
"error", err,
)
return err
}
localDescription, err := h.webRTCController.GatheringWebRTC(peer)
if err != nil {
h.l.Errorw("error while preparing a local web rtc description",
"error", err,
)
return err
}
srtConnection, err := h.srtController.Connect(cancel, params)
if err != nil {
h.l.Errorw("error while connecting to an srt server",
"error", err,
)
return err
}
go h.streamingController.Stream(&entities.StreamParameters{
Cancel: cancel,
Ctx: ctx,
WebRTCConn: peer,
SRTConnection: srtConnection,
VideoTrack: videoTrack,
MetadataTrack: metadataSender,
OnClose: func() {
cancel()
webRTCResponse.Connection.Close()
},
OnError: func(err error) {
h.l.Errorw("error while streaming", "error", err)
},
OnStream: func(st *entities.Stream) error {
return h.webRTCController.SendMetadata(webRTCResponse.Data, st)
},
OnVideoFrame: func(data []byte, c entities.MediaFrameContext) error {
return h.webRTCController.SendMediaSample(webRTCResponse.Video, data, c)
},
OnAudioFrame: func(data []byte, c entities.MediaFrameContext) error {
return h.webRTCController.SendMediaSample(webRTCResponse.Audio, data, c)
},
})
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
err = json.NewEncoder(w).Encode(*localDescription)
err = json.NewEncoder(w).Encode(*webRTCResponse.LocalSDP)
if err != nil {
h.l.Errorw("error while encoding a local web rtc description",
"error", err,
)
cancel()
return err
}
h.l.Infof("webRTCResponse %#v", webRTCResponse)
return nil
}
func (h *SignalingHandler) createAndValidateParams(r *http.Request) (entities.RequestParams, error) {
if r.Method != http.MethodPost {
return entities.RequestParams{}, entities.ErrHTTPPostOnly
}
params := entities.RequestParams{}
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
return entities.RequestParams{}, err
}
if err := params.Valid(); err != nil {
return entities.RequestParams{}, err
}
return params, nil
}

View File

@@ -22,7 +22,7 @@ func NewServeMux(
mux.Handle("/", index)
fs := http.FileServer(http.Dir("./static"))
mux.Handle("/demo/", http.StripPrefix("/demo/", fs))
mux.Handle("/demo/", setHTTPNoCaching(http.StripPrefix("/demo/", fs)))
mux.Handle("/doSignaling", setCors(errorHandler(l, signaling)))
@@ -54,3 +54,10 @@ func errorHandler(l *zap.SugaredLogger, next ErrorHTTPHandler) http.Handler {
}
})
}
func setHTTPNoCaching(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-store, no-cache, max-age=0, must-revalidate, proxy-revalidate")
next.ServeHTTP(w, r)
})
}

46
main.go
View File

@@ -5,17 +5,11 @@ package main
import (
"flag"
"log"
"net/http"
"github.com/flavioribeiro/donut/internal/controllers"
"github.com/flavioribeiro/donut/internal/entities"
"github.com/flavioribeiro/donut/internal/web"
"github.com/flavioribeiro/donut/internal/web/handlers"
"github.com/kelseyhightower/envconfig"
"go.uber.org/fx"
"go.uber.org/zap"
)
func main() {
@@ -23,46 +17,8 @@ func main() {
flag.BoolVar(&enableICEMux, "enable-ice-mux", false, "Enable ICE Mux on :8081")
flag.Parse()
var c entities.Config
err := envconfig.Process("donut", &c)
if err != nil {
log.Fatal(err.Error())
}
c.EnableICEMux = enableICEMux
fx.New(
// HTTP Server
fx.Provide(web.NewHTTPServer),
// HTTP router
fx.Provide(web.NewServeMux),
// HTTP handlers
fx.Provide(handlers.NewSignalingHandler),
fx.Provide(handlers.NewIndexHandler),
// ICE mux servers
fx.Provide(controllers.NewTCPICEServer),
fx.Provide(controllers.NewUDPICEServer),
// Controllers
fx.Provide(controllers.NewSRTController),
fx.Provide(controllers.NewStreamingController),
fx.Provide(controllers.NewWebRTCController),
fx.Provide(controllers.NewWebRTCSettingsEngine),
fx.Provide(controllers.NewWebRTCMediaEngine),
fx.Provide(controllers.NewWebRTCAPI),
// Logging, Config constructors
fx.Provide(func() *zap.SugaredLogger {
logger, _ := zap.NewProduction()
return logger.Sugar()
}),
fx.Provide(func() *entities.Config {
return &c
}),
web.Dependencies(enableICEMux),
// Forcing the lifecycle initiation with NewHTTPServer
fx.Invoke(func(*http.Server) {}),
).Run()

19
nginx.conf Normal file
View File

@@ -0,0 +1,19 @@
daemon off;
error_log /dev/stdout info;
events {
worker_connections 1024;
}
rtmp {
server {
listen 1935;
chunk_size 4000;
application live {
live on;
record off;
}
}
}

View File

@@ -1,7 +1,8 @@
ffmpeg -hide_banner -loglevel verbose \
-re -f lavfi -i testsrc2=size=1280x720:rate=30,format=yuv420p \
ffmpeg -hide_banner -loglevel info \
-re -f lavfi -i testsrc2=size=768x432:rate=30,format=yuv420p \
-f lavfi -i sine=frequency=1000:sample_rate=44100 \
-c:v libx264 -preset veryfast -tune zerolatency -profile:v baseline \
-vf "drawtext=text='SRT streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=64:fontcolor=black" \
-b:v 1000k -bufsize 2000k -x264opts keyint=30:min-keyint=30:scenecut=-1 \
-c:a aac -b:a 128k \
-f mpegts "udp://${SRT_INPUT_HOST}:${SRT_INPUT_PORT}?pkt_size=${PKT_SIZE}"

9
scripts/ffmpeg_rtmp.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
ffmpeg -hide_banner -loglevel info \
-re -f lavfi -i testsrc2=size=768x432:rate=30,format=yuv420p \
-f lavfi -i sine=frequency=1000:sample_rate=44100 \
-c:v libx264 -preset veryfast -tune zerolatency -profile:v baseline \
-vf "drawtext=text='RTMP streaming':box=1:boxborderw=10:x=(w-text_w)/2:y=(h-text_h)/2:fontsize=64:fontcolor=black" \
-b:v 1000k -bufsize 2000k -x264opts keyint=30:min-keyint=30:scenecut=-1 \
-c:a aac -b:a 128k \
-f flv -rtmp_live live "rtmp://${RTMP_HOST}:${RTMP_PORT}/live/app"

View File

@@ -2,7 +2,70 @@
SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
SPDX-License-Identifier: MIT
*/
textarea {
width: 500px;
min-height: 75px;
* {
font-family: "Open Sans", sans-serif;
}
input {
width: 100%;
padding: 12px 20px;
margin: 8px 0;
box-sizing: border-box;
}
legend {
background-color: #000;
color: #fff;
padding: 3px 6px;
}
.hint {
color:darkgray;
}
button {
align-items: center;
background-color: #FFFFFF;
border: 1px solid rgba(0, 0, 0, 0.1);
border-radius: .25rem;
box-shadow: rgba(0, 0, 0, 0.02) 0 1px 3px 0;
box-sizing: border-box;
color: rgba(0, 0, 0, 0.85);
cursor: pointer;
display: inline-flex;
font-family: system-ui,-apple-system,system-ui,"Helvetica Neue",Helvetica,Arial,sans-serif;
font-size: 16px;
font-weight: 600;
justify-content: center;
line-height: 1.25;
margin: 0;
min-height: 3rem;
padding: calc(.875rem - 1px) calc(1.5rem - 1px);
position: relative;
text-decoration: none;
transition: all 250ms;
user-select: none;
-webkit-user-select: none;
touch-action: manipulation;
vertical-align: baseline;
width: auto;
}
button:hover,
button:focus {
border-color: rgba(0, 0, 0, 0.15);
box-shadow: rgba(0, 0, 0, 0.1) 0 4px 12px;
color: rgba(0, 0, 0, 0.65);
}
button {
transform: translateY(-1px);
}
button {
background-color: #F0F0F1;
border-color: rgba(0, 0, 0, 0.15);
box-shadow: rgba(0, 0, 0, 0.06) 0 2px 4px;
color: rgba(0, 0, 0, 0.65);
transform: translateY(0);
}

View File

@@ -2,15 +2,13 @@
window.metadataMessages = {}
window.startSession = () => {
let srtHost = document.getElementById('srt-host').value;
let srtPort = document.getElementById('srt-port').value;
let srtStreamId = document.getElementById('srt-stream-id').value;
let streamURL = document.getElementById('stream-url').value;
let streamID = document.getElementById('stream-id').value;
setupWebRTC((pc, offer) => {
let srtFullAddress = JSON.stringify({
"srtHost": srtHost,
"srtPort": srtPort,
"srtStreamId": srtStreamId,
"streamURL": streamURL,
"streamID": streamID,
offer
});
@@ -31,7 +29,12 @@ const setupWebRTC = (setRemoteSDPfn) => {
log("setting up web rtc");
const pc = new RTCPeerConnection({
iceServers: [{
urls: 'stun:stun.l.google.com:19302'
urls: [
'stun:stun.l.google.com:19302',
'stun:stun1.l.google.com:19302',
'stun:stun2.l.google.com:19302',
'stun:stun4.l.google.com:19302'
]
}]
});
@@ -47,6 +50,10 @@ const setupWebRTC = (setRemoteSDPfn) => {
// with auto play.
pc.ontrack = function (event) {
log("ontrack : " + event.track.kind + " label " + event.track.label);
// it only creates a video tag element
if (event.track.kind !== "video") {
return
}
const el = document.createElement(event.track.kind);
el.srcObject = event.streams[0];
@@ -131,8 +138,17 @@ const formattedNow = () => {
const log = (msg, level = "info") => {
const el = document.createElement("p")
if (level === "error") {
if (typeof(msg) !== "string") {
orig = msg
msg = "unknown log msg type " + typeof(msg)
msg = msg + " [" + orig + "]"
level = "error"
}
if (level === "error" || msg.includes("failed") || msg.includes("error")) {
el.style = "color: red;background-color: yellow;";
level = "error"
}
el.innerText = "[[" + level.toUpperCase().padEnd(5, ' ') + "]] " + formattedNow() + " : " + msg

View File

@@ -3,31 +3,48 @@
<head>
<title>donut</title>
<script src="demo.js"></script>
<style src="demo.css"></style>
<link rel="stylesheet" href="demo.css">
</link>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Open+Sans:ital,wght@0,300..800;1,300..800&display=swap"
rel="stylesheet">
</head>
<body>
<h1>SRT Config</h1>
<b> SRT Host </b>
<input type="text" id="srt-host" value="srt"> <br />
<fieldset>
<legend>Remote streaming</legend>
<p>
<label for="stream-url">URL: <span aria-label="required">*</span></label>
<input id="stream-url" type="text" name="stream-url" required value="srt://haivision_srt:40052" />
<label class="hint">rtmp://nginx_rtmp/live</label>
</p>
<p>
<label for="stream-id">ID: <span aria-label="required">*</span></label>
<input id="stream-id" type="text" name="stream-id" required value="stream-id" />
<label class="hint">app</label>
</p>
<p>
<button onclick="onConnect()"> Connect </button>
</p>
</fieldset>
<b> SRT Port </b>
<input type="text" id="srt-port" value="40052" /> <br />
<fieldset>
<legend>Video</legend>
<div id="remoteVideos"></div>
</fieldset>
<b> SRT Stream ID </b>
<input type="text" id="srt-stream-id" value="stream-id" /> <br />
<button onclick="onConnect()"> Connect </button>
<h1>Video</h1>
<div id="remoteVideos"></div>
<h1>Metadata</h1>
<div id="metadata"></div>
<h1>Logs</h1>
<div id="log"></div>
<fieldset>
<legend>Metadata</legend>
<div id="metadata"></div>
</fieldset>
<fieldset>
<legend>Logs</legend>
<div id="log"></div>
</fieldset>
</body>
<script>
@@ -40,26 +57,9 @@
}
docReady(function () {
const queryString = window.location.search;
const urlParams = new URLSearchParams(queryString);
window.onConnect = () => {
window.startSession();
}
if (urlParams.has('srtHost')) {
document.getElementById('srt-host').value = urlParams.get('srtHost');
}
if (urlParams.has('srtPort')) {
document.getElementById('srt-port').value = urlParams.get('srtPort');
}
if (urlParams.has('srtStreamId')) {
document.getElementById('srt-stream-id').value = urlParams.get('srtStreamId');
}
if (urlParams.get('autoplay') === "true") {
onConnect();
}
});
</script>