mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-09-27 20:52:08 +08:00
Compare commits
31 Commits
v0.1-beta.
...
v0.1-beta.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
945b486fe0 | ||
![]() |
d72d7b089c | ||
![]() |
d339fbe712 | ||
![]() |
3aeb278c47 | ||
![]() |
c92c1fc3e9 | ||
![]() |
def57119f4 | ||
![]() |
b20275d2b5 | ||
![]() |
a11ca1da6e | ||
![]() |
0fb7132947 | ||
![]() |
0f9e3c97c5 | ||
![]() |
e049a17216 | ||
![]() |
217c8c2bf6 | ||
![]() |
9f0153e2a8 | ||
![]() |
b2eaf03914 | ||
![]() |
8b54444c89 | ||
![]() |
76b352d67f | ||
![]() |
e8edb65a31 | ||
![]() |
88a6208912 | ||
![]() |
14b6df68ce | ||
![]() |
77080663ee | ||
![]() |
d25d27a0ee | ||
![]() |
5460e194e8 | ||
![]() |
e4f565f343 | ||
![]() |
6b274f2a37 | ||
![]() |
f442aab176 | ||
![]() |
0e71bd4dcb | ||
![]() |
e3618d70c3 | ||
![]() |
99c4a3e34a | ||
![]() |
b78de349ab | ||
![]() |
b4990b1e90 | ||
![]() |
687bdadba6 |
60
README.md
60
README.md
@@ -1,12 +1,14 @@
|
||||
# go2rtc
|
||||
|
||||
Ultimate camera streaming application with support RTSP, WebRTC, FFmpeg, RTMP, etc.
|
||||
Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg, RTMP, etc.
|
||||
|
||||

|
||||
|
||||
- zero-dependency and zero-config [small app](#go2rtc-binary) for all OS (Windows, macOS, Linux, ARM)
|
||||
- zero-delay for all supported protocols (lowest possible streaming latency)
|
||||
- streaming from `RTSP`, `RTMP`, `MJPEG`, `HLS`, `USB Cameras`, `files` and [other sources](#module-streams)
|
||||
- streaming to `RTSP` or `WebRTC` (any modern browser)
|
||||
- low CPU load for supported codecs
|
||||
- zero-delay for many supported protocols (lowest possible streaming latency)
|
||||
- streaming from [RTSP](#source-rtsp), [RTMP](#source-rtmp), [MJPEG](#source-ffmpeg), [HLS](#source-ffmpeg), [USB Cameras](#source-ffmpeg-device), [files](#source-ffmpeg) and [other sources](#module-streams)
|
||||
- streaming to [RTSP](#module-rtsp), [WebRTC](#module-webrtc) or [MSE](#module-mp4)
|
||||
- first project in the World with support streaming from [HomeKit Cameras](#source-homekit)
|
||||
- on the fly transcoding for unsupported codecs via [FFmpeg](#source-ffmpeg)
|
||||
- multi-source 2-way [codecs negotiation](#codecs-negotiation)
|
||||
- mixing tracks from different sources to single stream
|
||||
@@ -22,6 +24,7 @@ Ultimate camera streaming application with support RTSP, WebRTC, FFmpeg, RTMP, e
|
||||
- [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server) idea from [@aler9](https://github.com/aler9)
|
||||
- [GStreamer](https://gstreamer.freedesktop.org/) framework pipeline idea
|
||||
- [MediaSoup](https://mediasoup.org/) framework routing idea
|
||||
- HomeKit Accessory Protocol from [@brutella](https://github.com/brutella/hap)
|
||||
|
||||
## Codecs negotiation
|
||||
|
||||
@@ -48,7 +51,7 @@ streams:
|
||||
|
||||
**go2rtc** automatically match codecs for you browser and all your stream sources. This called **multi-source 2-way codecs negotiation**. And this is one of the main features of this app.
|
||||
|
||||

|
||||

|
||||
|
||||
**PS.** You can select `PCMU` or `PCMA` codec in camera setting and don't use transcoding at all. Or you can select `AAC` codec for main stream and `PCMU` codec for second stream and add both RTSP to YAML config, this also will work fine.
|
||||
|
||||
@@ -100,7 +103,7 @@ Don't forget to fix the rights `chmod +x go2rtc_xxx_xxx` on Linux and Mac.
|
||||
|
||||
### go2rtc: Docker
|
||||
|
||||
Container [alexxit/go2rtc](https://hub.docker.com/r/alexxit/go2rtc) with support `amd64`, `386`, `arm64`, `arm`. This container same as [Home Assistant Add-on](#go2rtc-home-assistant-add-on), but can be used separately from the Home Assistant. Container has preinstalled [FFmpeg](#source-ffmpeg) and [Ngrok](#module-ngrok) applications.
|
||||
Container [alexxit/go2rtc](https://hub.docker.com/r/alexxit/go2rtc) with support `amd64`, `386`, `arm64`, `arm`. This container same as [Home Assistant Add-on](#go2rtc-home-assistant-add-on), but can be used separately from the Home Assistant. Container has preinstalled [FFmpeg](#source-ffmpeg), [Ngrok](#module-ngrok) and [Python](#source-echo).
|
||||
|
||||
```yaml
|
||||
services:
|
||||
@@ -144,6 +147,8 @@ Available source types:
|
||||
- [ffmpeg](#source-ffmpeg) - FFmpeg integration (`MJPEG`, `HLS`, `files` and source types)
|
||||
- [ffmpeg:device](#source-ffmpeg-device) - local USB Camera or Webcam
|
||||
- [exec](#source-exec) - advanced FFmpeg and GStreamer integration
|
||||
- [echo](#source-echo) - get stream link via bash or python
|
||||
- [homekit](#source-homekit) - streaming from HomeKit Camera
|
||||
- [hass](#source-hass) - Home Assistant integration
|
||||
|
||||
**PS.** You can use sources like `MJPEG`, `HLS` and others via FFmpeg integration.
|
||||
@@ -249,11 +254,41 @@ streams:
|
||||
stream1: exec:ffmpeg -hide_banner -re -stream_loop -1 -i /media/BigBuckBunny.mp4 -c copy -rtsp_transport tcp -f rtsp {output}
|
||||
```
|
||||
|
||||
#### Source: Echo
|
||||
|
||||
Some sources may have a dynamic link. And you will need to get it using a bash or python script. Your script should echo a link to the source. RTSP, FFmpeg or any of the [supported sources](#module-streams).
|
||||
|
||||
**Docker** and **Hass Add-on** users has preinstalled `python3`, `curl`, `jq`.
|
||||
|
||||
Check examples in [wiki](https://github.com/AlexxIT/go2rtc/wiki/Source-Echo-examples).
|
||||
|
||||
```yaml
|
||||
streams:
|
||||
apple_hls: echo:python3 hls.py https://developer.apple.com/streaming/examples/basic-stream-osx-ios5.html
|
||||
```
|
||||
|
||||
#### Source: HomeKit
|
||||
|
||||
**Important:**
|
||||
|
||||
- You can use HomeKit Cameras **without Apple devices** (iPhone, iPad, etc.), it's just a yet another protocol
|
||||
- HomeKit device can be paired with only one ecosystem. So, if you have paired it to an iPhone (Apple Home) - you can't pair it with Home Assistant or go2rtc. Or if you have paired it to go2rtc - you can't pair it with iPhone
|
||||
- HomeKit device should be in same network with working [mDNS](https://en.wikipedia.org/wiki/Multicast_DNS) between device and go2rtc
|
||||
|
||||
go2rtc support import paired HomeKit devices from [Home Assistant](#source-hass). So you can use HomeKit camera with Hass and go2rtc simultaneously. If you using Hass, I recommend pairing devices with it, it will give you more options.
|
||||
|
||||
You can pair device with go2rtc on the HomeKit page. If you can't see your devices - reload the page. Also try reboot your HomeKit device (power off). If you still can't see it - you have a problems with mDNS.
|
||||
|
||||
If you see a device but it does not have a pair button - it is paired to some ecosystem (Apple Home, Home Assistant, HomeBridge etc). You need to delete device from that ecosystem, and it will be available for pairing. If you cannot unpair device, you will have to reset it.
|
||||
|
||||
**This source is in active development!** Tested only with [Aqara Camera Hub G3](https://www.aqara.com/eu/product/camera-hub-g3) (both EU and CN versions).
|
||||
|
||||
#### Source: Hass
|
||||
|
||||
Support import camera links from [Home Assistant](https://www.home-assistant.io/) config files:
|
||||
|
||||
- support ONLY [Generic Camera](https://www.home-assistant.io/integrations/generic/), setup via GUI
|
||||
- support [Generic Camera](https://www.home-assistant.io/integrations/generic/), setup via GUI
|
||||
- support [HomeKit Camera](https://www.home-assistant.io/integrations/homekit_controller/)
|
||||
|
||||
```yaml
|
||||
hass:
|
||||
@@ -261,6 +296,7 @@ hass:
|
||||
|
||||
streams:
|
||||
generic_camera: hass:Camera1 # Settings > Integrations > Integration Name
|
||||
aqara_g3: hass:Camera-Hub-G3-AB12
|
||||
```
|
||||
|
||||
### Module: API
|
||||
@@ -440,6 +476,14 @@ In other cases you need to use IP-address of server with **go2rtc** application.
|
||||
|
||||
PS. Default Home Assistant lovelace cards don't support 2-way audio. You can use 2-way audio from [Add-on Web UI](https://my.home-assistant.io/redirect/supervisor_addon/?addon=a889bffc_go2rtc&repository_url=https%3A%2F%2Fgithub.com%2FAlexxIT%2Fhassio-addons). But you need use HTTPS to access the microphone. This is a browser restriction and cannot be avoided.
|
||||
|
||||
### Module: MP4
|
||||
|
||||
Provides several features:
|
||||
|
||||
1. MSE stream (fMP4 over WebSocket)
|
||||
2. Camera snapshots in MP4 format (single frame), can be sent to [Telegram](https://www.telegram.org/)
|
||||
3. Progressive MP4 stream - bad format for streaming because of high latency, doesn't work in Safari
|
||||
|
||||
### Module: Log
|
||||
|
||||
You can set different log levels for different modules.
|
||||
|
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 27 KiB |
BIN
assets/go2rtc.png
Normal file
BIN
assets/go2rtc.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 295 KiB |
@@ -1,23 +1,40 @@
|
||||
ARG BUILD_FROM
|
||||
FROM $BUILD_FROM
|
||||
|
||||
RUN apk add --no-cache git go ffmpeg
|
||||
FROM $BUILD_FROM as build
|
||||
|
||||
ARG BUILD_ARCH
|
||||
# 1. Build go2rtc
|
||||
RUN apk add --no-cache git go
|
||||
|
||||
RUN git clone https://github.com/AlexxIT/go2rtc \
|
||||
&& cd go2rtc \
|
||||
&& CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath -o /usr/local/bin
|
||||
&& CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
|
||||
|
||||
# 2. Download ngrok
|
||||
ARG BUILD_ARCH
|
||||
|
||||
# https://github.com/home-assistant/docker-base/blob/master/alpine/Dockerfile
|
||||
RUN if [ "${BUILD_ARCH}" = "aarch64" ]; then BUILD_ARCH="arm64"; \
|
||||
elif [ "${BUILD_ARCH}" = "armv7" ]; then BUILD_ARCH="arm"; fi \
|
||||
&& cd go2rtc \
|
||||
&& curl $(curl -s "https://raw.githubusercontent.com/ngrok/docker-ngrok/main/releases.json" | jq -r ".${BUILD_ARCH}.url") -o ngrok.zip \
|
||||
&& unzip ngrok -d /usr/local/bin
|
||||
&& unzip ngrok
|
||||
|
||||
RUN rm -r /go2rtc
|
||||
|
||||
|
||||
# https://devopscube.com/reduce-docker-image-size/
|
||||
FROM $BUILD_FROM
|
||||
|
||||
# 3. Copy go2rtc and ngrok to release
|
||||
COPY --from=build /go2rtc/go2rtc /usr/local/bin
|
||||
COPY --from=build /go2rtc/ngrok /usr/local/bin
|
||||
|
||||
# 4. Install ffmpeg
|
||||
# apk base OK: 22 MiB in 40 packages
|
||||
# ffmpeg OK: 113 MiB in 110 packages
|
||||
# python3 OK: 161 MiB in 114 packages
|
||||
RUN apk add --no-cache ffmpeg python3
|
||||
|
||||
# 5. Copy run to release
|
||||
COPY run.sh /
|
||||
RUN chmod a+x /run.sh
|
||||
|
||||
|
@@ -2,13 +2,13 @@
|
||||
|
||||
set +e
|
||||
|
||||
# set cwd for go2rtc (for config file, Hass itegration, etc)
|
||||
# set cwd for go2rtc (for config file, Hass integration, etc)
|
||||
cd /config
|
||||
|
||||
# add the feature to override go2rtc binary from Hass config folder
|
||||
export PATH="/config:$PATH"
|
||||
|
||||
while true; do
|
||||
go2rtc
|
||||
sleep 5
|
||||
go2rtc
|
||||
sleep 5
|
||||
done
|
@@ -34,9 +34,8 @@ func Init() {
|
||||
log = app.GetLogger("api")
|
||||
|
||||
initStatic(cfg.Mod.StaticDir)
|
||||
initWS()
|
||||
|
||||
HandleFunc("/api/frame.mp4", frameHandler)
|
||||
HandleFunc("/api/frame.raw", frameHandler)
|
||||
HandleFunc("/api/streams", streamsHandler)
|
||||
HandleFunc("/api/ws", apiWS)
|
||||
|
||||
|
@@ -1,40 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/keyframe"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func frameHandler(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var ch = make(chan []byte)
|
||||
|
||||
cons := new(keyframe.Consumer)
|
||||
cons.IsMP4 = strings.HasSuffix(r.URL.Path, ".mp4")
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case []byte:
|
||||
ch <- msg.([]byte)
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Warn().Err(err).Msg("[api.frame] add consumer")
|
||||
return
|
||||
}
|
||||
|
||||
data := <-ch
|
||||
|
||||
stream.RemoveConsumer(cons)
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.frame] write")
|
||||
}
|
||||
}
|
@@ -4,16 +4,42 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/gorilla/websocket"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type WSHandler func(ctx *Context, msg *streamer.Message)
|
||||
|
||||
var apiWsUp = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 512000,
|
||||
func initWS() {
|
||||
wsUp = &websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 512000,
|
||||
}
|
||||
wsUp.CheckOrigin = func(r *http.Request) bool {
|
||||
origin := r.Header["Origin"]
|
||||
if len(origin) == 0 {
|
||||
return true
|
||||
}
|
||||
o, err := url.Parse(origin[0])
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if o.Host == r.Host {
|
||||
return true
|
||||
}
|
||||
log.Trace().Msgf("[api.ws] origin: %s, host: %s", o.Host, r.Host)
|
||||
// some users change Nginx external port using Docker port
|
||||
// so origin will be with a port and host without
|
||||
if i := strings.IndexByte(o.Host, ':'); i > 0 {
|
||||
return o.Host[:i] == r.Host
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
var wsUp *websocket.Upgrader
|
||||
|
||||
type WSHandler func(ctx *Context, msg *streamer.Message)
|
||||
|
||||
type Context struct {
|
||||
Conn *websocket.Conn
|
||||
Request *http.Request
|
||||
@@ -24,7 +50,7 @@ type Context struct {
|
||||
}
|
||||
|
||||
func (ctx *Context) Upgrade(w http.ResponseWriter, r *http.Request) (err error) {
|
||||
ctx.Conn, err = apiWsUp.Upgrade(w, r, nil)
|
||||
ctx.Conn, err = wsUp.Upgrade(w, r, nil)
|
||||
ctx.Request = r
|
||||
return
|
||||
}
|
||||
|
29
cmd/echo/echo.go
Normal file
29
cmd/echo/echo.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package echo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/shell"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"os/exec"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
log := app.GetLogger("echo")
|
||||
|
||||
streams.HandleFunc("echo", func(url string) (streamer.Producer, error) {
|
||||
args := shell.QuoteSplit(url[5:])
|
||||
|
||||
b, err := exec.Command(args[0], args[1:]...).Output()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b = bytes.TrimSpace(b)
|
||||
|
||||
log.Debug().Str("url", url).Msgf("[echo] %s", b)
|
||||
|
||||
return streams.GetProducer(string(b))
|
||||
})
|
||||
}
|
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/rtsp"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
|
||||
"github.com/AlexxIT/go2rtc/pkg/shell"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/rs/zerolog"
|
||||
"os"
|
||||
@@ -49,7 +50,7 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
)
|
||||
|
||||
// remove `exec:`
|
||||
args := QuoteSplit(url[5:])
|
||||
args := shell.QuoteSplit(url[5:])
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
|
||||
if log.Trace().Enabled() {
|
||||
@@ -64,6 +65,8 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
|
||||
log.Debug().Str("url", url).Msg("[exec] run")
|
||||
|
||||
ts := time.Now()
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Error().Err(err).Str("url", url).Msg("[exec]")
|
||||
return nil, err
|
||||
@@ -75,6 +78,7 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
log.Error().Str("url", url).Msg("[exec] timeout")
|
||||
return nil, errors.New("timeout")
|
||||
case prod := <-ch:
|
||||
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
|
||||
return prod, nil
|
||||
}
|
||||
}
|
||||
@@ -83,39 +87,3 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
|
||||
var log zerolog.Logger
|
||||
var waiters map[string]chan streamer.Producer
|
||||
|
||||
func QuoteSplit(s string) []string {
|
||||
var a []string
|
||||
|
||||
for len(s) > 0 {
|
||||
is := strings.IndexByte(s, ' ')
|
||||
if is >= 0 {
|
||||
// skip prefix and double spaces
|
||||
if is == 0 {
|
||||
// goto next symbol
|
||||
s = s[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
// check if quote in word
|
||||
if i := strings.IndexByte(s[:is], '"'); i >= 0 {
|
||||
// search quote end
|
||||
if is = strings.Index(s, `" `); is > 0 {
|
||||
is += 1
|
||||
} else {
|
||||
is = -1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if is >= 0 {
|
||||
a = append(a, strings.ReplaceAll(s[:is], `"`, ""))
|
||||
s = s[is+1:]
|
||||
} else {
|
||||
//add last word
|
||||
a = append(a, s)
|
||||
break
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
@@ -22,11 +22,11 @@ func Init() {
|
||||
|
||||
// inputs
|
||||
"file": "-re -stream_loop -1 -i {input}",
|
||||
"http": "-i {input}",
|
||||
"http": "-fflags nobuffer -flags low_delay -i {input}",
|
||||
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
|
||||
|
||||
// output
|
||||
"out": "-rtsp_transport tcp -f rtsp {output}",
|
||||
"output": "-rtsp_transport tcp -f rtsp {output}",
|
||||
|
||||
// `-g 30` - group of picture, GOP, keyframe interval
|
||||
// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
|
||||
@@ -55,9 +55,16 @@ func Init() {
|
||||
s = s[7:] // remove `ffmpeg:`
|
||||
|
||||
var query url.Values
|
||||
var queryVideo, queryAudio bool
|
||||
if i := strings.IndexByte(s, '#'); i > 0 {
|
||||
query = parseQuery(s[i+1:])
|
||||
queryVideo = query["video"] != nil
|
||||
queryAudio = query["audio"] != nil
|
||||
s = s[:i]
|
||||
} else {
|
||||
// by default query both video and audio
|
||||
queryVideo = true
|
||||
queryAudio = true
|
||||
}
|
||||
|
||||
var input string
|
||||
@@ -66,7 +73,18 @@ func Init() {
|
||||
case "http", "https":
|
||||
input = strings.Replace(tpl["http"], "{input}", s, 1)
|
||||
case "rtsp", "rtsps":
|
||||
input = strings.Replace(tpl["rtsp"], "{input}", s, 1)
|
||||
// https://ffmpeg.org/ffmpeg-protocols.html#rtsp
|
||||
// skip unnecessary input tracks
|
||||
switch {
|
||||
case queryVideo && queryAudio:
|
||||
input = "-allowed_media_types video+audio "
|
||||
case queryVideo:
|
||||
input = "-allowed_media_types video "
|
||||
case queryAudio:
|
||||
input = "-allowed_media_types audio "
|
||||
}
|
||||
|
||||
input += strings.Replace(tpl["rtsp"], "{input}", s, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,17 +126,17 @@ func Init() {
|
||||
}
|
||||
}
|
||||
|
||||
if query["video"] == nil {
|
||||
s += " -vn"
|
||||
}
|
||||
if query["audio"] == nil {
|
||||
switch {
|
||||
case queryVideo && !queryAudio:
|
||||
s += " -an"
|
||||
case queryAudio && !queryVideo:
|
||||
s += " -vn"
|
||||
}
|
||||
} else {
|
||||
s += " -c copy"
|
||||
}
|
||||
|
||||
s += " " + tpl["out"]
|
||||
s += " " + tpl["output"]
|
||||
|
||||
return exec.Handle(s)
|
||||
})
|
||||
|
@@ -6,12 +6,12 @@ import (
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/cmd/api"
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/rtsp"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/cmd/webrtc"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/rs/zerolog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
@@ -26,7 +26,7 @@ func Init() {
|
||||
|
||||
app.LoadConfig(&conf)
|
||||
|
||||
log = app.GetLogger("api")
|
||||
log = app.GetLogger("hass")
|
||||
|
||||
// support https://www.home-assistant.io/integrations/rtsp_to_webrtc/
|
||||
api.HandleFunc("/static", func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -78,7 +78,8 @@ func Init() {
|
||||
continue
|
||||
}
|
||||
|
||||
streams.Get("hass:" + entrie.Title)
|
||||
log.Info().Str("url", "hass:" + entrie.Title).Msg("[hass] load stream")
|
||||
//streams.Get("hass:" + entrie.Title)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +91,13 @@ func handler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
url := r.FormValue("url")
|
||||
src := r.FormValue("url")
|
||||
src, err := url.QueryUnescape(src)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.hass] query unescape")
|
||||
return
|
||||
}
|
||||
|
||||
str := r.FormValue("sdp64")
|
||||
|
||||
offer, err := base64.StdEncoding.DecodeString(str)
|
||||
@@ -99,16 +106,20 @@ func handler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: fixme
|
||||
if strings.HasPrefix(url, "rtsp://") {
|
||||
port := ":" + rtsp.Port + "/"
|
||||
i := strings.Index(url, port)
|
||||
if i > 0 {
|
||||
url = url[i+len(port):]
|
||||
// check if stream links to our rtsp server
|
||||
if strings.HasPrefix(src, "rtsp://") {
|
||||
i := strings.IndexByte(src[7:], '/')
|
||||
if i > 0 && streams.Has(src[8+i:]) {
|
||||
src = src[8+i:]
|
||||
}
|
||||
}
|
||||
|
||||
stream := streams.Get(url)
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
log.Error().Str("url", src).Msg("[api.hass] unsupported source")
|
||||
return
|
||||
}
|
||||
|
||||
str, err = webrtc.ExchangeSDP(stream, string(offer), r.UserAgent())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.hass] exchange SDP")
|
||||
|
19
cmd/ivideon/ivideon.go
Normal file
19
cmd/ivideon/ivideon.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package ivideon
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/ivideon"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
streams.HandleFunc("ivideon", func(url string) (streamer.Producer, error) {
|
||||
id := strings.Replace(url[8:], "/", ":", 1)
|
||||
prod := ivideon.NewClient(id)
|
||||
if err := prod.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prod, nil
|
||||
})
|
||||
}
|
138
cmd/mp4/mp4.go
Normal file
138
cmd/mp4/mp4.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/api"
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mp4"
|
||||
"github.com/rs/zerolog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
log = app.GetLogger("mp4")
|
||||
|
||||
api.HandleWS(MsgTypeMSE, handlerWS)
|
||||
|
||||
api.HandleFunc("/api/frame.mp4", handlerKeyframe)
|
||||
api.HandleFunc("/api/stream.mp4", handlerMP4)
|
||||
}
|
||||
|
||||
var log zerolog.Logger
|
||||
|
||||
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
if isChromeFirst(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan []byte)
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
exit <- msg
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
return
|
||||
}
|
||||
|
||||
defer stream.RemoveConsumer(cons)
|
||||
|
||||
w.Header().Set("Content-Type", cons.MimeType())
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] init")
|
||||
return
|
||||
}
|
||||
data = append(data, <-exit...)
|
||||
|
||||
// Apple Safari won't show frame without length
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
}
|
||||
}
|
||||
|
||||
func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
if isChromeFirst(w, r) || isSafari(w, r) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Trace().Msgf("[api.mp4] %+v", r)
|
||||
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan struct{})
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
if _, err := w.Write(msg); err != nil {
|
||||
exit <- struct{}{}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] add consumer")
|
||||
return
|
||||
}
|
||||
|
||||
defer stream.RemoveConsumer(cons)
|
||||
|
||||
w.Header().Set("Content-Type", cons.MimeType())
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] init")
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] write")
|
||||
return
|
||||
}
|
||||
|
||||
<-exit
|
||||
|
||||
log.Trace().Msg("[api.mp4] close")
|
||||
}
|
||||
|
||||
func isChromeFirst(w http.ResponseWriter, r *http.Request) bool {
|
||||
// Chrome 105 does two requests: without Range and with `Range: bytes=0-`
|
||||
if strings.Contains(r.UserAgent(), " Chrome/") {
|
||||
if r.Header.Values("Range") == nil {
|
||||
w.Header().Set("Content-Type", "video/mp4")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isSafari(w http.ResponseWriter, r *http.Request) bool {
|
||||
if r.Header.Get("Range") == "bytes=0-1" {
|
||||
handlerKeyframe(w, r)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
@@ -1,35 +1,34 @@
|
||||
package mse
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/api"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mse"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mp4"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
api.HandleWS("mse", handler)
|
||||
}
|
||||
const MsgTypeMSE = "mse" // fMP4
|
||||
|
||||
func handler(ctx *api.Context, msg *streamer.Message) {
|
||||
func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
src := ctx.Request.URL.Query().Get("src")
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
cons := new(mse.Consumer)
|
||||
cons := &mp4.Consumer{}
|
||||
cons.UserAgent = ctx.Request.UserAgent()
|
||||
cons.RemoteAddr = ctx.Request.RemoteAddr
|
||||
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case *streamer.Message, []byte:
|
||||
ctx.Write(msg)
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] Add consumer")
|
||||
log.Warn().Err(err).Msg("[api.mse] add consumer")
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
@@ -38,5 +37,16 @@ func handler(ctx *api.Context, msg *streamer.Message) {
|
||||
stream.RemoveConsumer(cons)
|
||||
})
|
||||
|
||||
cons.Init()
|
||||
ctx.Write(&streamer.Message{
|
||||
Type: MsgTypeMSE, Value: cons.MimeType(),
|
||||
})
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] init")
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Write(data)
|
||||
}
|
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"github.com/rs/zerolog"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -47,6 +48,15 @@ var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
|
||||
var log zerolog.Logger
|
||||
|
||||
func rtspHandler(url string) (streamer.Producer, error) {
|
||||
backchannel := true
|
||||
|
||||
if i := strings.IndexByte(url, '#'); i > 0 {
|
||||
if url[i+1:] == "backchannel=0" {
|
||||
backchannel = false
|
||||
}
|
||||
url = url[:i]
|
||||
}
|
||||
|
||||
conn, err := rtsp.NewClient(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -67,8 +77,12 @@ func rtspHandler(url string) (streamer.Producer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.Backchannel = true
|
||||
conn.Backchannel = backchannel
|
||||
if err = conn.Describe(); err != nil {
|
||||
if !backchannel {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// second try without backchannel, we need to reconnect
|
||||
if err = conn.Dial(); err != nil {
|
||||
return nil, err
|
||||
|
@@ -79,7 +79,11 @@ func (p *Producer) start() {
|
||||
log.Debug().Str("url", p.url).Msg("[streams] start producer")
|
||||
|
||||
p.state = stateStart
|
||||
go p.element.Start()
|
||||
go func() {
|
||||
if err := p.element.Start(); err != nil {
|
||||
log.Warn().Err(err).Str("url", p.url).Msg("[streams] start")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Producer) stop() {
|
||||
|
@@ -24,19 +24,24 @@ func Init() {
|
||||
}
|
||||
}
|
||||
|
||||
func Get(name string) *Stream {
|
||||
if stream, ok := streams[name]; ok {
|
||||
func Get(src string) *Stream {
|
||||
if stream, ok := streams[src]; ok {
|
||||
return stream
|
||||
|
||||
}
|
||||
|
||||
if HasProducer(name) {
|
||||
log.Info().Str("url", name).Msg("[streams] create new stream")
|
||||
stream := NewStream(name)
|
||||
streams[name] = stream
|
||||
return stream
|
||||
if !HasProducer(src) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
log.Info().Str("url", src).Msg("[streams] create new stream")
|
||||
stream := NewStream(src)
|
||||
streams[src] = stream
|
||||
return stream
|
||||
}
|
||||
|
||||
func Has(src string) bool {
|
||||
return streams[src] != nil
|
||||
}
|
||||
|
||||
func New(name string, source interface{}) {
|
||||
|
@@ -69,7 +69,7 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Str("src", src).Msg("[webrtc] new consumer")
|
||||
log.Debug().Str("url", src).Msg("[webrtc] new consumer")
|
||||
|
||||
var err error
|
||||
|
||||
|
10
main.go
10
main.go
@@ -4,11 +4,13 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/api"
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/debug"
|
||||
"github.com/AlexxIT/go2rtc/cmd/echo"
|
||||
"github.com/AlexxIT/go2rtc/cmd/exec"
|
||||
"github.com/AlexxIT/go2rtc/cmd/ffmpeg"
|
||||
"github.com/AlexxIT/go2rtc/cmd/hass"
|
||||
"github.com/AlexxIT/go2rtc/cmd/homekit"
|
||||
"github.com/AlexxIT/go2rtc/cmd/mse"
|
||||
"github.com/AlexxIT/go2rtc/cmd/ivideon"
|
||||
"github.com/AlexxIT/go2rtc/cmd/mp4"
|
||||
"github.com/AlexxIT/go2rtc/cmd/ngrok"
|
||||
"github.com/AlexxIT/go2rtc/cmd/rtmp"
|
||||
"github.com/AlexxIT/go2rtc/cmd/rtsp"
|
||||
@@ -24,6 +26,8 @@ func main() {
|
||||
app.Init() // init config and logs
|
||||
streams.Init() // load streams list
|
||||
|
||||
echo.Init()
|
||||
|
||||
rtsp.Init() // add support RTSP client and RTSP server
|
||||
rtmp.Init() // add support RTMP client
|
||||
exec.Init() // add support exec scheme (depends on RTSP server)
|
||||
@@ -33,11 +37,13 @@ func main() {
|
||||
api.Init() // init HTTP API server
|
||||
|
||||
webrtc.Init()
|
||||
mse.Init()
|
||||
mp4.Init()
|
||||
|
||||
srtp.Init()
|
||||
homekit.Init()
|
||||
|
||||
ivideon.Init()
|
||||
|
||||
ngrok.Init()
|
||||
debug.Init()
|
||||
|
||||
|
5
pkg/README.md
Normal file
5
pkg/README.md
Normal file
@@ -0,0 +1,5 @@
|
||||
## Useful links
|
||||
|
||||
- https://www.wowza.com/blog/streaming-protocols
|
||||
- https://vimeo.com/blog/post/rtmp-stream/
|
||||
- https://sanjeev-pandey.medium.com/understanding-the-mpeg-4-moov-atom-pseudo-streaming-in-mp4-93935e1b9e9a
|
@@ -58,3 +58,21 @@ func RepairAVC(track *streamer.Track) streamer.WrapperFunc {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func SplitAVC(data []byte) [][]byte {
|
||||
var nals [][]byte
|
||||
for {
|
||||
// get AVC length
|
||||
size := int(binary.BigEndian.Uint32(data))
|
||||
|
||||
// check if multiple items in one packet
|
||||
if size+4 < len(data) {
|
||||
nals = append(nals, data[:size+4])
|
||||
data = data[size+4:]
|
||||
} else {
|
||||
nals = append(nals, data)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nals
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
const (
|
||||
NALUTypePFrame = 1
|
||||
NALUTypeIFrame = 5
|
||||
NALUTypeSEI = 6
|
||||
NALUTypeSPS = 7
|
||||
NALUTypePPS = 8
|
||||
)
|
||||
@@ -17,6 +18,17 @@ func NALUType(b []byte) byte {
|
||||
return b[4] & 0x1F
|
||||
}
|
||||
|
||||
func IsKeyframe(b []byte) bool {
|
||||
return NALUType(b) == NALUTypeIFrame
|
||||
}
|
||||
|
||||
func GetProfileLevelID(fmtp string) string {
|
||||
if fmtp == "" {
|
||||
return ""
|
||||
}
|
||||
return streamer.Between(fmtp, "profile-level-id=", ";")
|
||||
}
|
||||
|
||||
func GetParameterSet(fmtp string) (sps, pps []byte) {
|
||||
if fmtp == "" {
|
||||
return
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package h264
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
@@ -19,62 +20,84 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
//println(packet.SequenceNumber, packet.Payload[0]&0x1F, packet.Payload[0], packet.Payload[1], packet.Marker, packet.Timestamp)
|
||||
//nalUnitType := packet.Payload[0] & 0x1F
|
||||
//fmt.Printf(
|
||||
// "[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d\n",
|
||||
// track.Codec.Name, nalUnitType, len(packet.Payload), packet.Timestamp,
|
||||
// packet.PayloadType, packet.SSRC,
|
||||
//)
|
||||
|
||||
data, err := depack.Unmarshal(packet.Payload)
|
||||
if len(data) == 0 || err != nil {
|
||||
// NALu packets can be split in different ways:
|
||||
// - single type 7 and type 8 packets
|
||||
// - join type 7 and type 8 packet (type 24)
|
||||
// - split type 5 on multiple 28 packets
|
||||
// - split type 5 on multiple separate 28 packets
|
||||
units, err := depack.Unmarshal(packet.Payload)
|
||||
if len(units) == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
naluType := NALUType(data)
|
||||
//println(naluType, len(data))
|
||||
for len(units) > 0 {
|
||||
i := int(binary.BigEndian.Uint32(units)) + 4
|
||||
unit := units[:i] // NAL Unit with AVC header
|
||||
units = units[i:]
|
||||
|
||||
switch naluType {
|
||||
case NALUTypeSPS:
|
||||
//println("new SPS")
|
||||
sps = data
|
||||
return nil
|
||||
case NALUTypePPS:
|
||||
//println("new PPS")
|
||||
pps = data
|
||||
return nil
|
||||
}
|
||||
unitType := NALUType(unit)
|
||||
//fmt.Printf("[H264] type: %2d, size: %6d\n", unitType, i)
|
||||
switch unitType {
|
||||
case NALUTypeSPS:
|
||||
//println("new SPS")
|
||||
sps = unit
|
||||
continue
|
||||
case NALUTypePPS:
|
||||
//println("new PPS")
|
||||
pps = unit
|
||||
continue
|
||||
case NALUTypeSEI:
|
||||
// some unnecessary text information
|
||||
continue
|
||||
}
|
||||
|
||||
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
|
||||
// and every NALU will be sliced to multiple NALUs
|
||||
if !packet.Marker {
|
||||
buffer = append(buffer, data...)
|
||||
return nil
|
||||
}
|
||||
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
|
||||
// and every NALU will be sliced to multiple NALUs
|
||||
if !packet.Marker {
|
||||
buffer = append(buffer, unit...)
|
||||
continue
|
||||
}
|
||||
|
||||
if buffer != nil {
|
||||
buffer = append(buffer, data...)
|
||||
data = buffer
|
||||
buffer = nil
|
||||
}
|
||||
if buffer != nil {
|
||||
buffer = append(buffer, unit...)
|
||||
unit = buffer
|
||||
buffer = nil
|
||||
}
|
||||
|
||||
var clone rtp.Packet
|
||||
var clone rtp.Packet
|
||||
|
||||
if naluType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
if unitType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = pps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = pps
|
||||
clone.Payload = unit
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = data
|
||||
return push(&clone)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
286
pkg/ivideon/client.go
Normal file
286
pkg/ivideon/client.go
Normal file
@@ -0,0 +1,286 @@
|
||||
package ivideon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pion/rtp"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
streamer.Element
|
||||
|
||||
ID string
|
||||
|
||||
conn *websocket.Conn
|
||||
medias []*streamer.Media
|
||||
tracks map[byte]*streamer.Track
|
||||
|
||||
closed bool
|
||||
|
||||
msg *message
|
||||
t0 time.Time
|
||||
|
||||
buffer chan []byte
|
||||
}
|
||||
|
||||
func NewClient(id string) *Client {
|
||||
return &Client{ID: id}
|
||||
}
|
||||
|
||||
func (c *Client) Dial() (err error) {
|
||||
resp, err := http.Get(
|
||||
"https://openapi-alpha.ivideon.com/cameras/" + c.ID +
|
||||
"/live_stream?op=GET&access_token=public&q=2&" +
|
||||
"video_codecs=h264&format=ws-fmp4",
|
||||
)
|
||||
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var v liveResponse
|
||||
if err = json.Unmarshal(data, &v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !v.Success {
|
||||
return fmt.Errorf("wrong response: %s", data)
|
||||
}
|
||||
|
||||
c.conn, _, err = websocket.DefaultDialer.Dial(v.Result.URL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.getTracks(); err != nil {
|
||||
_ = c.conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Handle() error {
|
||||
c.buffer = make(chan []byte, 5)
|
||||
// add delay to the stream for smooth playing (not a best solution)
|
||||
c.t0 = time.Now().Add(time.Second)
|
||||
|
||||
// processing stream in separate thread for lower delay between packets
|
||||
go c.worker()
|
||||
|
||||
_, data, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
track := c.tracks[c.msg.Track]
|
||||
if track != nil {
|
||||
c.buffer <- data
|
||||
}
|
||||
|
||||
// we have one unprocessed msg after getTracks
|
||||
for {
|
||||
_, data, err = c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var msg message
|
||||
if err = json.Unmarshal(data, &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case "stream-init":
|
||||
continue
|
||||
|
||||
case "fragment":
|
||||
_, data, err = c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
track = c.tracks[msg.Track]
|
||||
if track != nil {
|
||||
c.buffer <- data
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("wrong message type: %s", data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
if c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
close(c.buffer)
|
||||
c.closed = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *Client) getTracks() error {
|
||||
c.tracks = map[byte]*streamer.Track{}
|
||||
|
||||
for {
|
||||
_, data, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var msg message
|
||||
if err = json.Unmarshal(data, &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case "stream-init":
|
||||
s := msg.CodecString
|
||||
i := strings.IndexByte(s, '.')
|
||||
if i > 0 {
|
||||
s = s[:i]
|
||||
}
|
||||
|
||||
switch s {
|
||||
case "avc1": // avc1.4d0029
|
||||
// skip multiple identical init
|
||||
if c.tracks[msg.TrackID] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
codec := streamer.NewCodec(streamer.CodecH264)
|
||||
codec.FmtpLine = "profile-level-id=" + msg.CodecString[i+1:]
|
||||
codec.PayloadType = h264.PayloadTypeAVC
|
||||
|
||||
i = bytes.Index(msg.Data, []byte("avcC")) - 4
|
||||
if i < 0 {
|
||||
return fmt.Errorf("wrong AVC: %s", msg.Data)
|
||||
}
|
||||
|
||||
avccLen := binary.BigEndian.Uint32(msg.Data[i:])
|
||||
data = msg.Data[i+8 : i+int(avccLen)]
|
||||
|
||||
record := h264parser.AVCDecoderConfRecord{}
|
||||
if _, err = record.Unmarshal(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
codec.FmtpLine += ";sprop-parameter-sets=" +
|
||||
base64.StdEncoding.EncodeToString(record.SPS[0]) + "," +
|
||||
base64.StdEncoding.EncodeToString(record.PPS[0])
|
||||
|
||||
media := &streamer.Media{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionSendonly,
|
||||
Codecs: []*streamer.Codec{codec},
|
||||
}
|
||||
c.medias = append(c.medias, media)
|
||||
|
||||
track := &streamer.Track{
|
||||
Direction: streamer.DirectionSendonly,
|
||||
Codec: codec,
|
||||
}
|
||||
c.tracks[msg.TrackID] = track
|
||||
|
||||
case "mp4a": // mp4a.40.2
|
||||
}
|
||||
|
||||
case "fragment":
|
||||
c.msg = &msg
|
||||
return nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("wrong message type: %s", data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) worker() {
|
||||
var track *streamer.Track
|
||||
for _, track = range c.tracks {
|
||||
break
|
||||
}
|
||||
|
||||
for data := range c.buffer {
|
||||
moof := &fmp4io.MovieFrag{}
|
||||
if _, err := moof.Unmarshal(data, 0); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
moofLen := binary.BigEndian.Uint32(data)
|
||||
_ = moofLen
|
||||
|
||||
mdat := moof.Unknowns[0]
|
||||
if mdat.Tag() != fmp4io.MDAT {
|
||||
continue
|
||||
}
|
||||
i, _ := mdat.Pos() // offset, size
|
||||
data = data[i+8:]
|
||||
|
||||
traf := moof.Tracks[0]
|
||||
ts := uint32(traf.DecodeTime.Time)
|
||||
|
||||
//println("!!!", (time.Duration(ts) * time.Millisecond).String(), time.Since(c.t0).String())
|
||||
|
||||
for _, entry := range traf.Run.Entries {
|
||||
// synchronize framerate for WebRTC and MSE
|
||||
d := time.Duration(ts)*time.Millisecond - time.Since(c.t0)
|
||||
if d < 0 {
|
||||
d = time.Duration(entry.Duration) * time.Millisecond / 2
|
||||
}
|
||||
time.Sleep(d)
|
||||
|
||||
// can be SPS, PPS and IFrame in one packet
|
||||
for _, payload := range h264.SplitAVC(data[:entry.Size]) {
|
||||
packet := &rtp.Packet{
|
||||
// ivideon clockrate=1000, RTP clockrate=90000
|
||||
Header: rtp.Header{Timestamp: ts * 90},
|
||||
Payload: payload,
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
}
|
||||
|
||||
data = data[entry.Size:]
|
||||
ts += entry.Duration
|
||||
}
|
||||
|
||||
if len(data) != 0 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type liveResponse struct {
|
||||
Result struct {
|
||||
URL string `json:"url"`
|
||||
} `json:"result"`
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
||||
type message struct {
|
||||
Type string `json:"type"`
|
||||
|
||||
CodecString string `json:"codec_string"`
|
||||
Data []byte `json:"data"`
|
||||
TrackID byte `json:"track_id"`
|
||||
|
||||
Track byte `json:"track"`
|
||||
StartTime float32 `json:"start_time"`
|
||||
Duration float32 `json:"duration"`
|
||||
IsKey bool `json:"is_key"`
|
||||
DataOffset uint32 `json:"data_offset"`
|
||||
}
|
31
pkg/ivideon/streamer.go
Normal file
31
pkg/ivideon/streamer.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package ivideon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
)
|
||||
|
||||
func (c *Client) GetMedias() []*streamer.Media {
|
||||
return c.medias
|
||||
}
|
||||
|
||||
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
|
||||
for _, track := range c.tracks {
|
||||
if track.Codec == codec {
|
||||
return track
|
||||
}
|
||||
}
|
||||
panic(fmt.Sprintf("wrong media/codec: %+v %+v", media, codec))
|
||||
}
|
||||
|
||||
func (c *Client) Start() error {
|
||||
err := c.Handle()
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) Stop() error {
|
||||
return c.Close()
|
||||
}
|
@@ -1,72 +0,0 @@
|
||||
package keyframe
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mp4"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
var annexB = []byte{0, 0, 0, 1}
|
||||
|
||||
type Consumer struct {
|
||||
streamer.Element
|
||||
IsMP4 bool
|
||||
}
|
||||
|
||||
func (k *Consumer) GetMedias() []*streamer.Media {
|
||||
// support keyframe extraction only for one coded...
|
||||
codec := streamer.NewCodec(streamer.CodecH264)
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{codec},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (k *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
// sps and pps without AVC headers
|
||||
sps, pps := h264.GetParameterSet(track.Codec.FmtpLine)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
// TODO: remove it, unnecessary
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
panic("wrong packet type")
|
||||
}
|
||||
|
||||
switch h264.NALUType(packet.Payload) {
|
||||
case h264.NALUTypeSPS:
|
||||
sps = packet.Payload[4:] // remove AVC header
|
||||
case h264.NALUTypePPS:
|
||||
pps = packet.Payload[4:] // remove AVC header
|
||||
case h264.NALUTypeIFrame:
|
||||
if sps == nil || pps == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var data []byte
|
||||
|
||||
if k.IsMP4 {
|
||||
data = mp4.MarshalMP4(sps, pps, packet.Payload)
|
||||
} else {
|
||||
data = append(data, annexB...)
|
||||
data = append(data, sps...)
|
||||
data = append(data, annexB...)
|
||||
data = append(data, pps...)
|
||||
data = append(data, annexB...)
|
||||
data = append(data, packet.Payload[4:]...)
|
||||
}
|
||||
|
||||
k.Fire(data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(track.Codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
94
pkg/mp4/const.go
Normal file
94
pkg/mp4/const.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/deepch/vdk/format/mp4/mp4io"
|
||||
"time"
|
||||
)
|
||||
|
||||
var matrix = [9]int32{0x10000, 0, 0, 0, 0x10000, 0, 0, 0, 0x40000000}
|
||||
var time0 = time.Date(1904, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
func FTYP() []byte {
|
||||
b := make([]byte, 0x18)
|
||||
binary.BigEndian.PutUint32(b, 0x18)
|
||||
copy(b[0x04:], "ftyp")
|
||||
copy(b[0x08:], "iso5")
|
||||
copy(b[0x10:], "iso5")
|
||||
copy(b[0x14:], "avc1")
|
||||
return b
|
||||
}
|
||||
|
||||
func MOOV() *mp4io.Movie {
|
||||
return &mp4io.Movie{
|
||||
Header: &mp4io.MovieHeader{
|
||||
PreferredRate: 1,
|
||||
PreferredVolume: 1,
|
||||
Matrix: matrix,
|
||||
NextTrackId: -1,
|
||||
Duration: 0,
|
||||
TimeScale: 1000,
|
||||
CreateTime: time0,
|
||||
ModifyTime: time0,
|
||||
PreviewTime: time0,
|
||||
PreviewDuration: time0,
|
||||
PosterTime: time0,
|
||||
SelectionTime: time0,
|
||||
SelectionDuration: time0,
|
||||
CurrentTime: time0,
|
||||
},
|
||||
MovieExtend: &mp4io.MovieExtend{
|
||||
Tracks: []*mp4io.TrackExtend{
|
||||
{
|
||||
TrackId: 1,
|
||||
DefaultSampleDescIdx: 1,
|
||||
DefaultSampleDuration: 40,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TRAK() *mp4io.Track {
|
||||
return &mp4io.Track{
|
||||
// trak > tkhd
|
||||
Header: &mp4io.TrackHeader{
|
||||
TrackId: int32(1), // change me
|
||||
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
|
||||
Duration: 0, // OK
|
||||
Matrix: matrix,
|
||||
CreateTime: time0,
|
||||
ModifyTime: time0,
|
||||
},
|
||||
// trak > mdia
|
||||
Media: &mp4io.Media{
|
||||
// trak > mdia > mdhd
|
||||
Header: &mp4io.MediaHeader{
|
||||
TimeScale: 1000,
|
||||
Duration: 0,
|
||||
Language: 0x55C4,
|
||||
CreateTime: time0,
|
||||
ModifyTime: time0,
|
||||
},
|
||||
// trak > mdia > minf
|
||||
Info: &mp4io.MediaInfo{
|
||||
// trak > mdia > minf > dinf
|
||||
Data: &mp4io.DataInfo{
|
||||
Refer: &mp4io.DataRefer{
|
||||
Url: &mp4io.DataReferUrl{
|
||||
Flags: 0x000001, // self reference
|
||||
},
|
||||
},
|
||||
},
|
||||
// trak > mdia > minf > stbl
|
||||
Sample: &mp4io.SampleTable{
|
||||
SampleDesc: &mp4io.SampleDesc{},
|
||||
TimeToSample: &mp4io.TimeToSample{},
|
||||
SampleToChunk: &mp4io.SampleToChunk{},
|
||||
SampleSize: &mp4io.SampleSize{},
|
||||
ChunkOffset: &mp4io.ChunkOffset{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
107
pkg/mp4/consumer.go
Normal file
107
pkg/mp4/consumer.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
streamer.Element
|
||||
|
||||
UserAgent string
|
||||
RemoteAddr string
|
||||
|
||||
muxer *Muxer
|
||||
codecs []*streamer.Codec
|
||||
start bool
|
||||
|
||||
send int
|
||||
}
|
||||
|
||||
func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
},
|
||||
},
|
||||
//{
|
||||
// Kind: streamer.KindAudio,
|
||||
// Direction: streamer.DirectionRecvonly,
|
||||
// Codecs: []*streamer.Codec{
|
||||
// {Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
// },
|
||||
//},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
codec := track.Codec
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch h264.NALUType(packet.Payload) {
|
||||
case h264.NALUTypeIFrame:
|
||||
c.start = true
|
||||
case h264.NALUTypePFrame:
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
fmt.Printf("[rtmp] unsupported codec: %+v\n", track.Codec)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Consumer) MimeType() string {
|
||||
return c.muxer.MimeType(c.codecs)
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() ([]byte, error) {
|
||||
if c.muxer == nil {
|
||||
c.muxer = &Muxer{}
|
||||
}
|
||||
return c.muxer.GetInit(c.codecs)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
"type": "MP4 server consumer",
|
||||
"send": c.send,
|
||||
"remote_addr": c.RemoteAddr,
|
||||
"user_agent": c.UserAgent,
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
}
|
@@ -1,47 +0,0 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type MemoryWriter struct {
|
||||
buf []byte
|
||||
pos int
|
||||
}
|
||||
|
||||
func (m *MemoryWriter) Write(p []byte) (n int, err error) {
|
||||
minCap := m.pos + len(p)
|
||||
if minCap > cap(m.buf) { // Make sure buf has enough capacity:
|
||||
buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra
|
||||
copy(buf2, m.buf)
|
||||
m.buf = buf2
|
||||
}
|
||||
if minCap > len(m.buf) {
|
||||
m.buf = m.buf[:minCap]
|
||||
}
|
||||
copy(m.buf[m.pos:], p)
|
||||
m.pos += len(p)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (m *MemoryWriter) Seek(offset int64, whence int) (int64, error) {
|
||||
newPos, offs := 0, int(offset)
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
newPos = offs
|
||||
case io.SeekCurrent:
|
||||
newPos = m.pos + offs
|
||||
case io.SeekEnd:
|
||||
newPos = len(m.buf) + offs
|
||||
}
|
||||
if newPos < 0 {
|
||||
return 0, errors.New("negative result pos")
|
||||
}
|
||||
m.pos = newPos
|
||||
return int64(newPos), nil
|
||||
}
|
||||
|
||||
func (m *MemoryWriter) Bytes() []byte {
|
||||
return m.buf
|
||||
}
|
184
pkg/mp4/muxer.go
184
pkg/mp4/muxer.go
@@ -1,37 +1,161 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"github.com/deepch/vdk/av"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/mp4"
|
||||
"time"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
"github.com/deepch/vdk/format/mp4/mp4io"
|
||||
"github.com/deepch/vdk/format/mp4f/mp4fio"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
func MarshalMP4(sps, pps, frame []byte) []byte {
|
||||
writer := &MemoryWriter{}
|
||||
muxer := mp4.NewMuxer(writer)
|
||||
|
||||
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = muxer.WriteHeader([]av.CodecData{stream}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pkt := av.Packet{
|
||||
CompositionTime: time.Millisecond,
|
||||
IsKeyFrame: true,
|
||||
Duration: time.Second,
|
||||
Data: frame,
|
||||
}
|
||||
if err = muxer.WritePacket(pkt); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err = muxer.WriteTrailer(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return writer.buf
|
||||
type Muxer struct {
|
||||
fragIndex uint32
|
||||
dts uint64
|
||||
pts uint32
|
||||
data []byte
|
||||
total int
|
||||
}
|
||||
|
||||
func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
|
||||
s := `video/mp4; codecs="`
|
||||
|
||||
for _, codec := range codecs {
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
s += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
|
||||
}
|
||||
}
|
||||
|
||||
return s + `"`
|
||||
}
|
||||
|
||||
func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
moov := MOOV()
|
||||
|
||||
for _, codec := range codecs {
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
if sps == nil {
|
||||
return nil, fmt.Errorf("empty SPS: %#v", codec)
|
||||
}
|
||||
|
||||
// TODO: remove
|
||||
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
width := codecData.Width()
|
||||
height := codecData.Height()
|
||||
|
||||
trak := TRAK()
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak.Header.TrackWidth = float64(width)
|
||||
trak.Header.TrackHeight = float64(height)
|
||||
|
||||
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
|
||||
Flags: 0x000001,
|
||||
}
|
||||
trak.Media.Info.Sample.SampleDesc.AVC1Desc = &mp4io.AVC1Desc{
|
||||
DataRefIdx: 1,
|
||||
HorizontalResolution: 72,
|
||||
VorizontalResolution: 72,
|
||||
Width: int16(width),
|
||||
Height: int16(height),
|
||||
FrameCount: 1,
|
||||
Depth: 24,
|
||||
ColorTableId: -1,
|
||||
Conf: &mp4io.AVC1Conf{
|
||||
Data: codecData.AVCDecoderConfRecordBytes(),
|
||||
},
|
||||
}
|
||||
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
}
|
||||
}
|
||||
|
||||
data := make([]byte, moov.Len())
|
||||
moov.Marshal(data)
|
||||
|
||||
return append(FTYP(), data...), nil
|
||||
}
|
||||
|
||||
func (m *Muxer) Rewind() {
|
||||
m.dts = 0
|
||||
m.pts = 0
|
||||
}
|
||||
|
||||
func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
trackID := uint8(1)
|
||||
|
||||
run := &mp4fio.TrackFragRun{
|
||||
Flags: 0x000b05,
|
||||
FirstSampleFlags: uint32(fmp4io.SampleNoDependencies),
|
||||
DataOffset: 0,
|
||||
Entries: []mp4io.TrackFragRunEntry{},
|
||||
}
|
||||
|
||||
moof := &mp4fio.MovieFrag{
|
||||
Header: &mp4fio.MovieFragHeader{
|
||||
Seqnum: m.fragIndex + 1,
|
||||
},
|
||||
Tracks: []*mp4fio.TrackFrag{
|
||||
{
|
||||
Header: &mp4fio.TrackFragHeader{
|
||||
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID, 0x01, 0x01, 0x00, 0x00},
|
||||
},
|
||||
DecodeTime: &mp4fio.TrackFragDecodeTime{
|
||||
Version: 1,
|
||||
Flags: 0,
|
||||
Time: m.dts,
|
||||
},
|
||||
Run: run,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
entry := mp4io.TrackFragRunEntry{
|
||||
//Duration: 90000,
|
||||
Size: uint32(len(packet.Payload)),
|
||||
}
|
||||
|
||||
newTime := packet.Timestamp
|
||||
if m.pts > 0 {
|
||||
//m.dts += uint64(newTime - m.pts)
|
||||
entry.Duration = newTime - m.pts
|
||||
m.dts += uint64(entry.Duration)
|
||||
}
|
||||
m.pts = newTime
|
||||
|
||||
// important before moof.Len()
|
||||
run.Entries = append(run.Entries, entry)
|
||||
|
||||
moofLen := moof.Len()
|
||||
mdatLen := 8 + len(packet.Payload)
|
||||
|
||||
// important after moof.Len()
|
||||
run.DataOffset = uint32(moofLen + 8)
|
||||
|
||||
buf := make([]byte, moofLen+mdatLen)
|
||||
moof.Marshal(buf)
|
||||
|
||||
binary.BigEndian.PutUint32(buf[moofLen:], uint32(mdatLen))
|
||||
copy(buf[moofLen+4:], "mdat")
|
||||
copy(buf[moofLen+8:], packet.Payload)
|
||||
|
||||
m.fragIndex++
|
||||
|
||||
m.total += moofLen + mdatLen
|
||||
|
||||
return buf
|
||||
}
|
||||
|
@@ -1,131 +0,0 @@
|
||||
package mse
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/mp4f"
|
||||
"github.com/pion/rtp"
|
||||
"time"
|
||||
)
|
||||
|
||||
const MsgTypeMSE = "mse"
|
||||
|
||||
type Consumer struct {
|
||||
streamer.Element
|
||||
|
||||
UserAgent string
|
||||
RemoteAddr string
|
||||
|
||||
muxer *mp4f.Muxer
|
||||
streams []av.CodecData
|
||||
start bool
|
||||
|
||||
send int
|
||||
}
|
||||
|
||||
func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
},
|
||||
}, {
|
||||
Kind: streamer.KindAudio,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
codec := track.Codec
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
idx := int8(len(c.streams))
|
||||
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c.streams = append(c.streams, stream)
|
||||
|
||||
pkt := av.Packet{Idx: idx, CompositionTime: time.Millisecond}
|
||||
|
||||
ts2time := time.Second / time.Duration(codec.ClockRate)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch h264.NALUType(packet.Payload) {
|
||||
case h264.NALUTypeIFrame:
|
||||
c.start = true
|
||||
pkt.IsKeyFrame = true
|
||||
case h264.NALUTypePFrame:
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
pkt.Data = packet.Payload
|
||||
newTime := time.Duration(packet.Timestamp) * ts2time
|
||||
if pkt.Time > 0 {
|
||||
pkt.Duration = newTime - pkt.Time
|
||||
}
|
||||
pkt.Time = newTime
|
||||
|
||||
for _, buf := range c.muxer.WritePacketV5(pkt) {
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
panic("unsupported codec")
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() {
|
||||
c.muxer = mp4f.NewMuxer(nil)
|
||||
if err := c.muxer.WriteHeader(c.streams); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
codecs, buf := c.muxer.GetInit(c.streams)
|
||||
c.Fire(&streamer.Message{Type: MsgTypeMSE, Value: codecs})
|
||||
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
"type": "MSE server consumer",
|
||||
"send": c.send,
|
||||
"remote_addr": c.RemoteAddr,
|
||||
"user_agent": c.UserAgent,
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
}
|
@@ -2,7 +2,6 @@ package rtmp
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
@@ -134,7 +133,7 @@ func (c *Client) Handle() (err error) {
|
||||
|
||||
var payloads [][]byte
|
||||
if track.Codec.Name == streamer.CodecH264 {
|
||||
payloads = splitAVC(pkt.Data)
|
||||
payloads = h264.SplitAVC(pkt.Data)
|
||||
} else {
|
||||
payloads = [][]byte{pkt.Data}
|
||||
}
|
||||
@@ -156,21 +155,3 @@ func (c *Client) Close() error {
|
||||
c.closed = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func splitAVC(data []byte) [][]byte {
|
||||
var nals [][]byte
|
||||
for {
|
||||
// get AVC length
|
||||
size := int(binary.BigEndian.Uint32(data))
|
||||
|
||||
// check if multiple items in one packet
|
||||
if size+4 < len(data) {
|
||||
nals = append(nals, data[:size+4])
|
||||
data = data[size+4:]
|
||||
} else {
|
||||
nals = append(nals, data)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nals
|
||||
}
|
||||
|
@@ -61,10 +61,10 @@ type Conn struct {
|
||||
|
||||
auth *tcp.Auth
|
||||
conn net.Conn
|
||||
mode Mode
|
||||
reader *bufio.Reader
|
||||
sequence int
|
||||
|
||||
mode Mode
|
||||
uri string
|
||||
|
||||
tracks []*streamer.Track
|
||||
channels map[byte]*streamer.Track
|
||||
@@ -76,24 +76,10 @@ type Conn struct {
|
||||
}
|
||||
|
||||
func NewClient(uri string) (*Conn, error) {
|
||||
var err error
|
||||
|
||||
c := new(Conn)
|
||||
c.URL, err = url.Parse(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if strings.IndexByte(c.URL.Host, ':') < 0 {
|
||||
c.URL.Host += ":554"
|
||||
}
|
||||
|
||||
// remove UserInfo from URL
|
||||
c.auth = tcp.NewAuth(c.URL.User)
|
||||
c.mode = ModeClientProducer
|
||||
c.URL.User = nil
|
||||
|
||||
return c, nil
|
||||
c.uri = uri
|
||||
return c, c.parseURI()
|
||||
}
|
||||
|
||||
func NewServer(conn net.Conn) *Conn {
|
||||
@@ -104,12 +90,29 @@ func NewServer(conn net.Conn) *Conn {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Conn) parseURI() (err error) {
|
||||
c.URL, err = url.Parse(c.uri)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if strings.IndexByte(c.URL.Host, ':') < 0 {
|
||||
c.URL.Host += ":554"
|
||||
}
|
||||
|
||||
// remove UserInfo from URL
|
||||
c.auth = tcp.NewAuth(c.URL.User)
|
||||
c.URL.User = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) Dial() (err error) {
|
||||
//if c.state != StateClientInit {
|
||||
// panic("wrong state")
|
||||
//}
|
||||
if c.conn != nil && c.auth != nil {
|
||||
c.auth.Reset()
|
||||
if c.conn != nil {
|
||||
_ = c.parseURI()
|
||||
}
|
||||
|
||||
c.conn, err = net.DialTimeout(
|
||||
@@ -331,11 +334,18 @@ func (c *Conn) SetupMedia(
|
||||
return nil, fmt.Errorf("wrong media: %v", media)
|
||||
}
|
||||
|
||||
trackURL, err := url.Parse(media.Control)
|
||||
rawURL := media.Control
|
||||
if !strings.Contains(rawURL, "://") {
|
||||
rawURL = c.URL.String()
|
||||
if !strings.HasSuffix(rawURL, "/") {
|
||||
rawURL += "/"
|
||||
}
|
||||
rawURL += media.Control
|
||||
}
|
||||
trackURL, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
trackURL = c.URL.ResolveReference(trackURL)
|
||||
|
||||
req := &tcp.Request{
|
||||
Method: MethodSetup,
|
||||
@@ -352,7 +362,21 @@ func (c *Conn) SetupMedia(
|
||||
var res *tcp.Response
|
||||
res, err = c.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Dahua VTO2111D fail on this step because of backchannel
|
||||
if c.Backchannel {
|
||||
if err = c.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Backchannel = false
|
||||
if err = c.Describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err = c.Do(req)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if c.Session == "" {
|
||||
|
41
pkg/shell/shell.go
Normal file
41
pkg/shell/shell.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
func QuoteSplit(s string) []string {
|
||||
var a []string
|
||||
|
||||
for len(s) > 0 {
|
||||
is := strings.IndexByte(s, ' ')
|
||||
if is >= 0 {
|
||||
// skip prefix and double spaces
|
||||
if is == 0 {
|
||||
// goto next symbol
|
||||
s = s[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
// check if quote in word
|
||||
if i := strings.IndexByte(s[:is], '"'); i >= 0 {
|
||||
// search quote end
|
||||
if is = strings.Index(s, `" `); is > 0 {
|
||||
is += 1
|
||||
} else {
|
||||
is = -1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if is >= 0 {
|
||||
a = append(a, strings.ReplaceAll(s[:is], `"`, ""))
|
||||
s = s[is+1:]
|
||||
} else {
|
||||
//add last word
|
||||
a = append(a, s)
|
||||
break
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/pion/sdp/v3"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -30,13 +31,14 @@ const (
|
||||
CodecAAC = "MPEG4-GENERIC"
|
||||
CodecOpus = "OPUS" // payloadType: 111
|
||||
CodecG722 = "G722"
|
||||
CodecMPA = "MPA" // payload: 14
|
||||
)
|
||||
|
||||
func GetKind(name string) string {
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1:
|
||||
return KindVideo
|
||||
case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722:
|
||||
case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722, CodecMPA:
|
||||
return KindAudio
|
||||
}
|
||||
return ""
|
||||
@@ -153,8 +155,8 @@ func (c *Codec) Clone() *Codec {
|
||||
|
||||
func (c *Codec) Match(codec *Codec) bool {
|
||||
return c.Name == codec.Name &&
|
||||
c.ClockRate == codec.ClockRate &&
|
||||
c.Channels == codec.Channels
|
||||
(c.ClockRate == codec.ClockRate || codec.ClockRate == 0) &&
|
||||
(c.Channels == codec.Channels || codec.Channels == 0)
|
||||
}
|
||||
|
||||
func UnmarshalSDP(rawSDP []byte) ([]*Media, error) {
|
||||
@@ -241,7 +243,8 @@ func UnmarshalCodec(md *sdp.MediaDescription, payloadType string) *Codec {
|
||||
ss := strings.Split(attr.Value[i+1:], "/")
|
||||
|
||||
c.Name = strings.ToUpper(ss[0])
|
||||
c.ClockRate = uint32(atoi(ss[1]))
|
||||
// fix tailing space: `a=rtpmap:96 H264/90000 `
|
||||
c.ClockRate = uint32(atoi(strings.TrimRightFunc(ss[1], unicode.IsSpace)))
|
||||
|
||||
if len(ss) == 3 && ss[2] == "2" {
|
||||
c.Channels = 2
|
||||
@@ -256,11 +259,14 @@ func UnmarshalCodec(md *sdp.MediaDescription, payloadType string) *Codec {
|
||||
if c.Name == "" {
|
||||
switch payloadType {
|
||||
case "0":
|
||||
c.Name = "PCMU"
|
||||
c.Name = CodecPCMU
|
||||
c.ClockRate = 8000
|
||||
case "8":
|
||||
c.Name = "PCMA"
|
||||
c.Name = CodecPCMA
|
||||
c.ClockRate = 8000
|
||||
case "14":
|
||||
c.Name = CodecMPA
|
||||
c.ClockRate = 44100
|
||||
default:
|
||||
c.Name = payloadType
|
||||
}
|
||||
|
@@ -80,12 +80,6 @@ func (a *Auth) Write(req *Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Auth) Reset() {
|
||||
if a.Method == AuthDigest {
|
||||
a.Method = AuthUnknown
|
||||
}
|
||||
}
|
||||
|
||||
func Between(s, sub1, sub2 string) string {
|
||||
i := strings.Index(s, sub1)
|
||||
if i < 0 {
|
||||
|
@@ -51,4 +51,5 @@ pc.ontrack = ev => {
|
||||
## Useful links
|
||||
|
||||
- https://www.webrtc-experiment.com/DetectRTC/
|
||||
- https://divtable.com/table-styler/
|
||||
- https://divtable.com/table-styler/
|
||||
- https://www.chromium.org/audio-video/
|
||||
|
@@ -66,7 +66,9 @@
|
||||
const links = [
|
||||
'<a href="webrtc.html?src={name}">webrtc</a>',
|
||||
'<a href="mse.html?src={name}">mse</a>',
|
||||
'<a href="api/frame.mp4?src={name}">frame.mp4</a>',
|
||||
// '<a href="video.html?src={name}">video</a>',
|
||||
'<a href="api/stream.mp4?src={name}">mp4</a>',
|
||||
'<a href="api/frame.mp4?src={name}">frame</a>',
|
||||
'<a href="api/streams?src={name}">info</a>',
|
||||
];
|
||||
|
||||
|
@@ -60,9 +60,7 @@
|
||||
console.debug("ws.onmessage", data);
|
||||
|
||||
if (data.type === "mse") {
|
||||
sourceBuffer = mediaSource.addSourceBuffer(
|
||||
`video/mp4; codecs="${data.value}"`
|
||||
);
|
||||
sourceBuffer = mediaSource.addSourceBuffer(data.value);
|
||||
// important: segments supports TrackFragDecodeTime
|
||||
// sequence supports only TrackFragRunEntry Duration
|
||||
sourceBuffer.mode = "segments";
|
||||
|
53
www/video.html
Normal file
53
www/video.html
Normal file
@@ -0,0 +1,53 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||
<title>go2rtc - WebRTC</title>
|
||||
<style>
|
||||
body {
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
html, body {
|
||||
height: 100%;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
#video {
|
||||
/* video "container" size */
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
background: black;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<video id="video" autoplay controls playsinline muted></video>
|
||||
<!--<video id="video" preload="auto" controls playsinline muted></video>-->
|
||||
<script>
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
const video = document.getElementById('video');
|
||||
|
||||
video.oncanplay = ev => console.log(ev.type, ev);
|
||||
video.onplaying = ev => console.log(ev.type, ev);
|
||||
video.onwaiting = ev => console.log(ev.type, ev);
|
||||
video.onseeking = ev => console.log(ev.type, ev);
|
||||
video.onloadeddata = ev => console.log(ev.type, ev);
|
||||
video.oncanplaythrough = ev => console.log(ev.type, ev);
|
||||
// video.ondurationchange = ev => console.log(ev.type, ev);
|
||||
// video.ontimeupdate = ev => console.log(ev.type, ev);
|
||||
video.onplay = ev => console.log(ev.type, ev);
|
||||
video.onpause = ev => console.log(ev.type, ev);
|
||||
video.onsuspended = ev => console.log(ev.type, ev);
|
||||
video.onemptied = ev => console.log(ev.type, ev);
|
||||
video.onstalled = ev => console.log(ev.type, ev);
|
||||
|
||||
console.log("start");
|
||||
|
||||
video.src = baseUrl + "/api/stream.mp4" + location.search;
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
Reference in New Issue
Block a user