Compare commits

..

71 Commits

Author SHA1 Message Date
Alexey Khit
3acea1ed5a Update version to 1.0.1 2023-01-24 22:29:15 +03:00
Alexey Khit
3fb8d9af66 Disable release autobuild 2023-01-24 22:29:04 +03:00
Alexey Khit
9bbaf41d54 Second fix for Chinese buggy cameras 2023-01-24 21:38:58 +03:00
Alexey Khit
c43530fbd3 Fix mp4f consumer 2023-01-24 21:05:51 +03:00
Alexey Khit
15777a3d94 Fix Chinese buggy cameras 2023-01-24 21:05:35 +03:00
Alexey Khit
6e61ac6d2f Fix HTTP-FLV for Reolink cameras 2023-01-24 17:48:31 +03:00
Alexey Khit
6d7d5f53d8 Update websocket disconnect log message 2023-01-24 17:48:08 +03:00
Alexey Khit
d2bca8d461 Update processing HTTP-FLV without video or audio 2023-01-24 17:47:26 +03:00
Alexey Khit
94b089d1e3 Fix bug in URL for D-Link cameras 2023-01-23 21:14:52 +03:00
Alexey Khit
b3d16c9fcc Update TOC in readme 2023-01-23 15:37:06 +03:00
Alexey Khit
f0def68482 Update readme 2023-01-20 17:45:35 +03:00
Alexey Khit
9ddbb326b4 Update version to 1.0.0 2023-01-20 17:07:43 +03:00
Alexey Khit
a2e58d928e Fix timezone in logs 2023-01-20 13:45:01 +03:00
Alexey Khit
3c48fb8bea Simplify Dockerfile 2023-01-20 11:23:28 +03:00
Alexey Khit
4b0cbb5a73 Add support basic auth for API 2023-01-20 10:54:26 +03:00
Alexey Khit
e28b49ea86 Ignore errors for RTCP packets 2023-01-20 10:26:57 +03:00
Alexey Khit
5c17d8fcb6 Add support AAC audio for HTTP-FLV 2023-01-19 21:44:15 +03:00
Alexey Khit
e040fb591f Disable CGO for git releases 2023-01-18 15:07:42 +03:00
Alexey Khit
140014f2a6 Fix info for WS/MP4 2023-01-18 15:04:06 +03:00
Alexey Khit
23f72d111e Add Teardown handler for RTSP server (untested) 2023-01-18 12:21:54 +03:00
Alexey Khit
f9d5ab9d0a Fix RTSP server SDP for some clients 2023-01-18 11:45:39 +03:00
Alexey Khit
8628c48db8 Add no-cache for all GET API requests 2023-01-18 10:01:00 +03:00
Alexey Khit
6e49d51c33 Update GET config API when config file not set 2023-01-18 10:00:20 +03:00
Alexey Khit
6a61b5234e Fix HTTP-FLV support for Reolink cameras 2023-01-18 09:36:32 +03:00
Alexey Khit
7a0091777d Fix relative config path #171 2023-01-16 11:00:04 +03:00
Alexey Khit
d23d2a7eff Fix release binaries for mac 2023-01-16 00:40:02 +03:00
Alexey Khit
cecbe4166c Update version to 0.1-rc.9 2023-01-16 00:06:55 +03:00
Alexey Khit
dcb457235c Rewrite stream info API 2023-01-15 23:51:20 +03:00
Alexey Khit
bc4e032830 Update readme 2023-01-15 11:13:38 +03:00
Alexey Khit
8218cda149 Add version, config_path to web UI and fix RTSP link 2023-01-15 09:57:15 +03:00
Alexey Khit
d1e56feeb6 Update full path to config file 2023-01-15 09:55:32 +03:00
Alexey Khit
463d05dfd3 Update readme 2023-01-15 00:28:48 +03:00
Alexey Khit
a1a73f7b45 Rewrite WS+MP4 format to keyframes stream 2023-01-15 00:12:26 +03:00
Alexey Khit
39662e10af Fix errors in JS player 2023-01-15 00:11:31 +03:00
Alexey Khit
1c830d6e60 Code refactoring 2023-01-14 22:49:12 +03:00
Alex X
2039aa60b3 Merge pull request #170 from skrashevich/config-api-patch-method
PATH api/config method for merge configuration
2023-01-14 21:57:34 +03:00
Sergey Krashevich
b7016e798f Update config.go 2023-01-14 21:27:23 +03:00
Alexey Khit
0b291f5185 Support multiple configs and config in raw yaml form 2023-01-14 21:12:17 +03:00
Alexey Khit
395304654a Code refactoring 2023-01-14 19:15:13 +03:00
Alexey Khit
e472397705 Add general info API 2023-01-14 18:00:43 +03:00
Alexey Khit
7c1f48e0ad Support empty default environment value 2023-01-14 17:25:05 +03:00
Alexey Khit
f4346a104f Add support env variables in config file #143 2023-01-14 17:19:51 +03:00
Alexey Khit
030972b436 Auto build binaries on release #158 2023-01-14 14:14:23 +03:00
Alexey Khit
efddefa123 Add web config editor #153 2023-01-14 13:47:34 +03:00
Alexey Khit
3c1bdd0dab Fix WebRTC candidate type 2023-01-14 09:45:03 +03:00
Alexey Khit
7e7e15d7c8 Update readme 2023-01-14 09:22:22 +03:00
Alex X
a1a9f77535 Merge pull request #167 from felipecrs/master
Match docs with new webrtc udp fixed port
2023-01-14 09:10:46 +03:00
Alexey Khit
a06462729d Code refactoring 2023-01-14 09:04:54 +03:00
Alex X
331c5bbcad Merge pull request #166 from tsightler/udp-candidate-fix
Fix invalid tcpType for UDP candidate
2023-01-14 08:59:25 +03:00
Felipe Santos
58a76efc8a Match docs with new webrtc udp fixed port 2023-01-13 23:15:04 -03:00
tsightler
5e0f010885 Update helper.go 2023-01-13 18:18:39 -05:00
Alexey Khit
4ae733aa11 Update version to 0.1-rc.8 2023-01-13 22:39:24 +03:00
Alexey Khit
27d8b33b62 Fix concurrency in ivideon 2023-01-13 21:52:29 +03:00
Alexey Khit
ff8b0fbb9c Set default 8555 port for WebRTC (UDP+TCP) 2023-01-13 21:51:48 +03:00
Alexey Khit
c6ad7ac39f Add single UDP port for WebRTC Server 2023-01-13 21:51:48 +03:00
Alexey Khit
7a3adf17be Fix mp4f consumer (unused) 2023-01-13 21:51:24 +03:00
Alexey Khit
94f6c07b28 Fix mjpeg client network connection 2023-01-13 18:03:54 +03:00
Alexey Khit
7b326d4753 Fix simultaneous stream reconnect and start 2023-01-13 18:03:17 +03:00
Alexey Khit
5407a3bc4b Fix multiple requests from different consumers 2023-01-13 18:02:03 +03:00
Alexey Khit
6b24421722 Fix unblocking exec error 2023-01-13 18:01:01 +03:00
Alexey Khit
d12775a2d7 Fix unblocking exec waiter 2023-01-13 18:00:48 +03:00
Alexey Khit
6151593c08 Fix ws lock on write and close 2023-01-13 17:28:01 +03:00
Alexey Khit
dba0989c54 Fix empty streams json on stream lock 2023-01-13 13:37:36 +03:00
Alexey Khit
ba0c7d911d Fix ffmpeg link to same stream 2023-01-13 13:36:43 +03:00
Alexey Khit
09fefca712 Remove backchannel codec from add consumer error 2023-01-13 13:35:58 +03:00
Alexey Khit
b3f177e2ec Handle closed state for ws connection 2023-01-13 13:34:41 +03:00
Alexey Khit
228abb8fbe Change logs msg from WRN to DBG for fail on add consumer 2023-01-13 13:33:55 +03:00
Alexey Khit
eee70c07b7 Fix closer for ivideon source 2023-01-13 13:32:48 +03:00
Alexey Khit
d92b0f29af Fix states handle for RTSP 2023-01-13 13:32:09 +03:00
Alexey Khit
fca6c87b2c Fix RTSP tracks list in info json 2023-01-13 13:31:22 +03:00
Alexey Khit
0601091772 Fix closer for RTSP server #163 2023-01-13 13:30:41 +03:00
56 changed files with 1582 additions and 464 deletions

View File

@@ -1,4 +1,4 @@
name: ci name: docker
on: on:
workflow_dispatch: workflow_dispatch:
@@ -19,7 +19,7 @@ jobs:
id: meta id: meta
uses: docker/metadata-action@v4 uses: docker/metadata-action@v4
with: with:
images: alexxit/go2rtc images: ${{ github.repository }}
tags: | tags: |
type=ref,event=branch type=ref,event=branch
type=semver,pattern={{version}},enable=false type=semver,pattern={{version}},enable=false
@@ -29,7 +29,7 @@ jobs:
id: meta-hw id: meta-hw
uses: docker/metadata-action@v4 uses: docker/metadata-action@v4
with: with:
images: alexxit/go2rtc images: ${{ github.repository }}
flavor: | flavor: |
suffix=-hardware suffix=-hardware
latest=false latest=false

92
.github/workflows/release.yml vendored Normal file
View File

@@ -0,0 +1,92 @@
name: release
on:
workflow_dispatch:
# push:
# tags:
# - 'v*'
jobs:
build-and-release:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Generate changelog
run: |
echo -e "$(git log $(git describe --tags --abbrev=0)..HEAD --oneline | awk '{print "- "$0}')" > CHANGELOG.md
- name: Build Go binaries
run: |
#!/bin/bash
esport CGO_ENABLED=0
mkdir artifacts
export GOOS=windows
export GOARCH=amd64
export FILENAME=artifacts/go2rtc_win64.zip
go build -ldflags "-s -w" -trimpath && 7z a -mx9 -sdel "$FILENAME" go2rtc.exe
export GOOS=windows
export GOARCH=386
export FILENAME=artifacts/go2rtc_win32.zip
go build -ldflags "-s -w" -trimpath && 7z a -mx9 -sdel "$FILENAME" go2rtc.exe
export GOOS=windows
export GOARCH=arm64
export FILENAME=artifacts/go2rtc_win_arm64.zip
go build -ldflags "-s -w" -trimpath && 7z a -mx9 -sdel "$FILENAME" go2rtc.exe
export GOOS=linux
export GOARCH=amd64
export FILENAME=artifacts/go2rtc_linux_amd64
go build -ldflags "-s -w" -trimpath -o "$FILENAME"
export GOOS=linux
export GOARCH=386
export FILENAME=artifacts/go2rtc_linux_i386
go build -ldflags "-s -w" -trimpath -o "$FILENAME"
export GOOS=linux
export GOARCH=arm64
export FILENAME=artifacts/go2rtc_linux_arm64
go build -ldflags "-s -w" -trimpath -o "$FILENAME"
export GOOS=linux
export GOARCH=arm
export GOARM=7
export FILENAME=artifacts/go2rtc_linux_arm
go build -ldflags "-s -w" -trimpath -o "$FILENAME"
export GOOS=linux
export GOARCH=mipsle
export FILENAME=artifacts/go2rtc_linux_mipsel
go build -ldflags "-s -w" -trimpath -o "$FILENAME"
export GOOS=darwin
export GOARCH=amd64
export FILENAME=artifacts/go2rtc_mac_amd64.zip
go build -ldflags "-s -w" -trimpath && 7z a -mx9 -sdel "$FILENAME" go2rtc
export GOOS=darwin
export GOARCH=arm64
export FILENAME=artifacts/go2rtc_mac_arm64.zip
go build -ldflags "-s -w" -trimpath && 7z a -mx9 -sdel "$FILENAME" go2rtc
parallel --jobs $(nproc) "upx {}" ::: artifacts/go2rtc_linux_*
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: ${{ failure() }}
- name: Set env
run: echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV
- name: Create GitHub release
uses: softprops/action-gh-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
files: artifacts/*
generate_release_notes: true
name: Release ${{ env.RELEASE_VERSION }}
body_path: CHANGELOG.md
draft: false
prerelease: false

View File

@@ -33,13 +33,12 @@ FROM scratch AS rootfs
COPY --from=build /build/go2rtc /usr/local/bin/ COPY --from=build /build/go2rtc /usr/local/bin/
COPY --from=ngrok /bin/ngrok /usr/local/bin/ COPY --from=ngrok /bin/ngrok /usr/local/bin/
COPY ./build/docker/run.sh /
# 3. Final image # 3. Final image
FROM base FROM base
# Install ffmpeg, bash (for run.sh), tini (for signal handling), # Install ffmpeg, tini (for signal handling),
# and other common tools for the echo source. # and other common tools for the echo source.
RUN apk add --no-cache tini ffmpeg bash curl jq RUN apk add --no-cache tini ffmpeg bash curl jq
@@ -55,8 +54,8 @@ RUN if [ "${TARGETARCH}" = "amd64" ]; then apk add --no-cache libva-intel-driver
COPY --from=rootfs / / COPY --from=rootfs / /
RUN chmod a+x /run.sh && mkdir -p /config
ENTRYPOINT ["/sbin/tini", "--"] ENTRYPOINT ["/sbin/tini", "--"]
VOLUME /config
WORKDIR /config
CMD ["/run.sh"] CMD ["go2rtc", "-config", "/config/go2rtc.yaml"]

128
README.md
View File

@@ -9,7 +9,7 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg
- streaming from [RTSP](#source-rtsp), [RTMP](#source-rtmp), [HTTP](#source-http) (FLV/MJPEG/JPEG), [FFmpeg](#source-ffmpeg), [USB Cameras](#source-ffmpeg-device) and [other sources](#module-streams) - streaming from [RTSP](#source-rtsp), [RTMP](#source-rtmp), [HTTP](#source-http) (FLV/MJPEG/JPEG), [FFmpeg](#source-ffmpeg), [USB Cameras](#source-ffmpeg-device) and [other sources](#module-streams)
- streaming to [RTSP](#module-rtsp), [WebRTC](#module-webrtc), [MSE/MP4](#module-mp4) or [MJPEG](#module-mjpeg) - streaming to [RTSP](#module-rtsp), [WebRTC](#module-webrtc), [MSE/MP4](#module-mp4) or [MJPEG](#module-mjpeg)
- first project in the World with support streaming from [HomeKit Cameras](#source-homekit) - first project in the World with support streaming from [HomeKit Cameras](#source-homekit)
- first project in the World with support H265 for WebRTC in browser ([read more](https://github.com/AlexxIT/Blog/issues/5)) - first project in the World with support H265 for WebRTC in browser (Safari only, [read more](https://github.com/AlexxIT/Blog/issues/5))
- on the fly transcoding for unsupported codecs via [FFmpeg](#source-ffmpeg) - on the fly transcoding for unsupported codecs via [FFmpeg](#source-ffmpeg)
- multi-source 2-way [codecs negotiation](#codecs-negotiation) - multi-source 2-way [codecs negotiation](#codecs-negotiation)
- mixing tracks from different sources to single stream - mixing tracks from different sources to single stream
@@ -27,6 +27,40 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg
- [MediaSoup](https://mediasoup.org/) framework routing idea - [MediaSoup](https://mediasoup.org/) framework routing idea
- HomeKit Accessory Protocol from [@brutella](https://github.com/brutella/hap) - HomeKit Accessory Protocol from [@brutella](https://github.com/brutella/hap)
---
* [Fast start](#fast-start)
* [go2rtc: Binary](#go2rtc-binary)
* [go2rtc: Home Assistant Add-on](#go2rtc-home-assistant-add-on)
* [go2rtc: Docker](#go2rtc-docker)
* [Configuration](#configuration)
* [Module: Streams](#module-streams)
* [Source: RTSP](#source-rtsp)
* [Source: RTMP](#source-rtmp)
* [Source: HTTP](#source-http)
* [Source: FFmpeg](#source-ffmpeg)
* [Source: FFmpeg Device](#source-ffmpeg-device)
* [Source: Exec](#source-exec)
* [Source: Echo](#source-echo)
* [Source: HomeKit](#source-homekit)
* [Source: Ivideon](#source-ivideon)
* [Source: Hass](#source-hass)
* [Module: API](#module-api)
* [Module: RTSP](#module-rtsp)
* [Module: WebRTC](#module-webrtc)
* [Module: Ngrok](#module-ngrok)
* [Module: Hass](#module-hass)
* [From go2rtc to Hass](#from-go2rtc-to-hass)
* [From Hass to go2rtc](#from-hass-to-go2rtc)
* [Module: MP4](#module-mp4)
* [Module: MJPEG](#module-mjpeg)
* [Module: Log](#module-log)
* [Security](#security)
* [Codecs madness](#codecs-madness)
* [Codecs negotiation](#codecs-negotiation)
* [TIPS](#tips)
* [FAQ](#faq)
## Fast start ## Fast start
1. Download [binary](#go2rtc-binary) or use [Docker](#go2rtc-docker) or [Home Assistant Add-on](#go2rtc-home-assistant-add-on) 1. Download [binary](#go2rtc-binary) or use [Docker](#go2rtc-docker) or [Home Assistant Add-on](#go2rtc-home-assistant-add-on)
@@ -36,8 +70,6 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg
- add your [streams](#module-streams) to [config](#configuration) file - add your [streams](#module-streams) to [config](#configuration) file
- setup [external access](#module-webrtc) to webrtc - setup [external access](#module-webrtc) to webrtc
- setup [external access](#module-ngrok) to web interface
- install [ffmpeg](#source-ffmpeg) for transcoding
**Developers:** **Developers:**
@@ -50,7 +82,6 @@ Download binary for your OS from [latest release](https://github.com/AlexxIT/go2
- `go2rtc_win64.zip` - Windows 64-bit - `go2rtc_win64.zip` - Windows 64-bit
- `go2rtc_win32.zip` - Windows 32-bit - `go2rtc_win32.zip` - Windows 32-bit
- `go2rtc_win_arm64.zip` - Windows ARM 64-bit
- `go2rtc_linux_amd64` - Linux 64-bit - `go2rtc_linux_amd64` - Linux 64-bit
- `go2rtc_linux_i386` - Linux 32-bit - `go2rtc_linux_i386` - Linux 32-bit
- `go2rtc_linux_arm64` - Linux ARM 64-bit (ex. Raspberry 64-bit OS) - `go2rtc_linux_arm64` - Linux ARM 64-bit (ex. Raspberry 64-bit OS)
@@ -72,27 +103,17 @@ Don't forget to fix the rights `chmod +x go2rtc_xxx_xxx` on Linux and Mac.
### go2rtc: Docker ### go2rtc: Docker
Container [alexxit/go2rtc](https://hub.docker.com/r/alexxit/go2rtc) with support `amd64`, `386`, `arm64`, `arm`. This container same as [Home Assistant Add-on](#go2rtc-home-assistant-add-on), but can be used separately from the Home Assistant. Container has preinstalled [FFmpeg](#source-ffmpeg), [Ngrok](#module-ngrok) and [Python](#source-echo). Container [alexxit/go2rtc](https://hub.docker.com/r/alexxit/go2rtc) with support `amd64`, `386`, `arm64`, `arm`. This container is the same as [Home Assistant Add-on](#go2rtc-home-assistant-add-on), but can be used separately from Home Assistant. Container has preinstalled [FFmpeg](#source-ffmpeg), [Ngrok](#module-ngrok) and [Python](#source-echo).
```yaml
services:
go2rtc:
image: alexxit/go2rtc
network_mode: host
restart: always
volumes:
- "~/go2rtc.yaml:/config/go2rtc.yaml"
```
## Configuration ## Configuration
Create file `go2rtc.yaml` next to the app. - by default go2rtc will search `go2rtc.yaml` in the current work dirrectory
- `api` server will start on default **1984 port** (TCP)
- `rtsp` server will start on default **8554 port** (TCP)
- `webrtc` will use port **8555** (TCP/UDP) for connections
- `ffmpeg` will use default transcoding options
- by default, you need to config only your `streams` links Configuration options and a complete list of settings can be found in [the wiki](https://github.com/AlexxIT/go2rtc/wiki/Configuration).
- `api` server will start on default **1984 port**
- `rtsp` server will start on default **8554 port**
- `webrtc` will use random UDP port for each connection
- `ffmpeg` will use default transcoding options (you may install it [manually](https://ffmpeg.org/))
Available modules: Available modules:
@@ -216,7 +237,7 @@ But you can override them via YAML config. You can also add your own formats to
```yaml ```yaml
ffmpeg: ffmpeg:
bin: ffmpeg # path to ffmpeg binary bin: ffmpeg # path to ffmpeg binary
h264: "-codec:v libx264 -g:v 30 -preset:v superfast -tune:v zerolatency -profile:v main -level:v 4.1" h264: "-codec:v libx264 -g:v 30 -preset:v superfast -tune:v zerolatency -profile:v main -level:v 4.1"
mycodec: "-any args that support ffmpeg..." mycodec: "-any args that support ffmpeg..."
``` ```
@@ -224,8 +245,11 @@ ffmpeg:
- You can use `video` and `audio` params multiple times (ex. `#video=copy#audio=copy#audio=pcmu`) - You can use `video` and `audio` params multiple times (ex. `#video=copy#audio=copy#audio=pcmu`)
- You can use go2rtc stream name as ffmpeg input (ex. `ffmpeg:camera1#video=h264`) - You can use go2rtc stream name as ffmpeg input (ex. `ffmpeg:camera1#video=h264`)
- You can use `rotate` params with `90`, `180`, `270` or `-90` values, important with transcoding (ex. `#video=h264#rotate=90`) - You can use `rotate` params with `90`, `180`, `270` or `-90` values, important with transcoding (ex. `#video=h264#rotate=90`)
- You can use `width` and/or `height` params, important with transcoding (ex. `#video=h264#width=1280`)
- You can use `raw` param for any additional FFmpeg arguments (ex. `#raw=-vf transpose=1`). - You can use `raw` param for any additional FFmpeg arguments (ex. `#raw=-vf transpose=1`).
Read more about encoding [hardware acceleration](https://github.com/AlexxIT/go2rtc/wiki/Hardware-acceleration).
#### Source: FFmpeg Device #### Source: FFmpeg Device
You can get video from any USB-camera or Webcam as RTSP or WebRTC stream. This is part of FFmpeg integration. You can get video from any USB-camera or Webcam as RTSP or WebRTC stream. This is part of FFmpeg integration.
@@ -366,18 +390,19 @@ go2rtc has simple HTML page (`stream.html`) with support params in URL:
```yaml ```yaml
api: api:
listen: ":1984" # default ":1984", HTTP API port ("" - disabled) listen: ":1984" # default ":1984", HTTP API port ("" - disabled)
username: "admin" # default "", Basic auth for WebUI
password: "pass" # default "", Basic auth for WebUI
base_path: "/rtc" # default "", API prefix for serve on suburl (/api => /rtc/api) base_path: "/rtc" # default "", API prefix for serve on suburl (/api => /rtc/api)
static_dir: "www" # default "", folder for static files (custom web interface) static_dir: "www" # default "", folder for static files (custom web interface)
origin: "*" # default "", allow CORS requests (only * supported) origin: "*" # default "", allow CORS requests (only * supported)
``` ```
**PS. go2rtc** doesn't provide HTTPS or password protection. Use [Nginx](https://nginx.org/) or [Ngrok](#module-ngrok) or [Home Assistant Add-on](#go2rtc-home-assistant-add-on) for this tasks. **PS:**
**PS2.** You can access microphone (for 2-way audio) only with HTTPS ([read more](https://stackoverflow.com/questions/52759992/how-to-access-camera-and-microphone-in-chrome-without-https)). - go2rtc doesn't provide HTTPS. Use [Nginx](https://nginx.org/) or [Ngrok](#module-ngrok) or [Home Assistant Add-on](#go2rtc-home-assistant-add-on) for this tasks
- you can access microphone (for 2-way audio) only with HTTPS ([read more](https://stackoverflow.com/questions/52759992/how-to-access-camera-and-microphone-in-chrome-without-https))
**PS3.** MJPEG over WebSocket plays better than native MJPEG because Chrome [bug](https://bugs.chromium.org/p/chromium/issues/detail?id=527446). - MJPEG over WebSocket plays better than native MJPEG because Chrome [bug](https://bugs.chromium.org/p/chromium/issues/detail?id=527446)
- MP4 over WebSocket was created only for Apple iOS because it doesn't support MSE and native MP4
**PS4.** MP4 over WebSocket was created only for Apple iOS because it doesn't support MSE and native MP4.
### Module: RTSP ### Module: RTSP
@@ -392,54 +417,52 @@ Password protection always disabled for localhost calls (ex. FFmpeg or Hass on s
```yaml ```yaml
rtsp: rtsp:
listen: ":8554" # RTSP Server TCP port, default - 8554 listen: ":8554" # RTSP Server TCP port, default - 8554
username: admin # optional, default - disabled username: "admin" # optional, default - disabled
password: pass # optional, default - disabled password: "pass" # optional, default - disabled
``` ```
### Module: WebRTC ### Module: WebRTC
WebRTC usually works without problems in the local network. But external access may require additional settings. It depends on what type of Internet do you have. WebRTC usually works without problems in the local network. But external access may require additional settings. It depends on what type of Internet do you have.
- by default, WebRTC use two random UDP ports for each connection (video and audio) - by default, WebRTC uses both TCP and UDP on port 8555 for connections
- you can enable one additional TCP port for all connections and use it for external access - you can use this port for external access
- you can change the port in YAML config:
```yaml
webrtc:
listen: ":8555" # address of your local server and port (TCP/UDP)
```
**Static public IP** **Static public IP**
- add some TCP port to YAML config (ex. 8555) - forward the port 8555 on your router (you can use same 8555 port or any other as external port)
- forward this port on your router (you can use same 8555 port or any other)
- add your external IP-address and external port to YAML config - add your external IP-address and external port to YAML config
```yaml ```yaml
webrtc: webrtc:
listen: ":8555" # address of your local server (TCP)
candidates: candidates:
- 216.58.210.174:8555 # if you have static public IP-address - 216.58.210.174:8555 # if you have static public IP-address
``` ```
**Dynamic public IP** **Dynamic public IP**
- add some TCP port to YAML config (ex. 8555) - forward the port 8555 on your router (you can use same 8555 port or any other as the external port)
- forward this port on your router (you can use same 8555 port or any other)
- add `stun` word and external port to YAML config - add `stun` word and external port to YAML config
- go2rtc automatically detects your external address with STUN-server - go2rtc automatically detects your external address with STUN-server
```yaml ```yaml
webrtc: webrtc:
listen: ":8555" # address of your local server (TCP)
candidates: candidates:
- stun:8555 # if you have dynamic public IP-address - stun:8555 # if you have dynamic public IP-address
``` ```
**Private IP** **Private IP**
- add some TCP port to YAML config (ex. 8555)
- setup integration with [Ngrok service](#module-ngrok) - setup integration with [Ngrok service](#module-ngrok)
```yaml ```yaml
webrtc:
listen: ":8555" # address of your local server (TCP)
ngrok: ngrok:
command: ... command: ...
``` ```
@@ -550,8 +573,13 @@ PS. Default Home Assistant lovelace cards don't support 2-way audio. You can use
Provides several features: Provides several features:
1. MSE stream (fMP4 over WebSocket) 1. MSE stream (fMP4 over WebSocket)
2. Camera snapshots in MP4 format (single frame), can be sent to [Telegram](https://www.telegram.org/) 2. Camera snapshots in MP4 format (single frame), can be sent to [Telegram](https://github.com/AlexxIT/go2rtc/wiki/Snapshot-to-Telegram)
3. MP4 "file stream" - bad format for streaming because of high latency, doesn't work in Safari 3. MP4 "file stream" - bad format for streaming because of high start delay, doesn't work in Safari
API examples:
- MP4 stream: `http://192.168.1.123:1984/api/stream.mp4?src=camera1`
- MP4 snapshot: `http://192.168.1.123:1984/api/frame.mp4?src=camera1`
### Module: MJPEG ### Module: MJPEG
@@ -595,7 +623,7 @@ log:
## Security ## Security
By default `go2rtc` start Web interface on port `1984` and RTSP on port `8554`. Both ports are accessible from your local network. So anyone on your local network can watch video from your cameras without authorization. The same rule applies to the Home Assistant Add-on. By default `go2rtc` starts the Web interface on port `1984` and RTSP on port `8554`, as well as use port `8555` for WebRTC connections. The three ports are accessible from your local network. So anyone on your local network can watch video from your cameras without authorization. The same rule applies to the Home Assistant Add-on.
This is not a problem if you trust your local network as much as I do. But you can change this behaviour with a `go2rtc.yaml` config: This is not a problem if you trust your local network as much as I do. But you can change this behaviour with a `go2rtc.yaml` config:
@@ -607,7 +635,7 @@ rtsp:
listen: "127.0.0.1:8554" # localhost listen: "127.0.0.1:8554" # localhost
webrtc: webrtc:
listen: ":8555" # external TCP port listen: ":8555" # external TCP/UDP port
``` ```
- local access to RTSP is not a problem for [FFmpeg](#source-ffmpeg) integration, because it runs locally on your server - local access to RTSP is not a problem for [FFmpeg](#source-ffmpeg) integration, because it runs locally on your server
@@ -617,7 +645,7 @@ webrtc:
If you need Web interface protection without Home Assistant Add-on - you need to use reverse proxy, like [Nginx](https://nginx.org/), [Caddy](https://caddyserver.com/), [Ngrok](https://ngrok.com/), etc. If you need Web interface protection without Home Assistant Add-on - you need to use reverse proxy, like [Nginx](https://nginx.org/), [Caddy](https://caddyserver.com/), [Ngrok](https://ngrok.com/), etc.
PS. Additionally WebRTC opens a lot of random UDP ports for transmit encrypted media. They work without problems on the local network. And sometimes work for external access, even if you haven't opened ports on your router. But for stable external WebRTC access, you need to configure the TCP port. PS. Additionally WebRTC will try to use the 8555 UDP port for transmit encrypted media. It works without problems on the local network. And sometimes also works for external access, even if you haven't opened this port on your router ([read more](https://en.wikipedia.org/wiki/UDP_hole_punching)). But for stable external WebRTC access, you need to open the 8555 port on your router for both TCP and UDP.
## Codecs madness ## Codecs madness
@@ -687,6 +715,10 @@ streams:
- `ffplay -fflags nobuffer -flags low_delay "rtsp://192.168.1.123:8554/camera1"` - `ffplay -fflags nobuffer -flags low_delay "rtsp://192.168.1.123:8554/camera1"`
- VLC > Preferences > Input / Codecs > Default Caching Level: Lowest Latency - VLC > Preferences > Input / Codecs > Default Caching Level: Lowest Latency
**Snapshots to Telegram**
[read more](https://github.com/AlexxIT/go2rtc/wiki/Snapshot-to-Telegram)
## FAQ ## FAQ
**Q. What's the difference between go2rtc, WebRTC Camera and RTSPtoWebRTC?** **Q. What's the difference between go2rtc, WebRTC Camera and RTSPtoWebRTC?**

View File

@@ -3,16 +3,21 @@ package api
import ( import (
"encoding/json" "encoding/json"
"github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"net" "net"
"net/http" "net/http"
"os"
"strconv"
"strings"
"sync"
) )
func Init() { func Init() {
var cfg struct { var cfg struct {
Mod struct { Mod struct {
Listen string `yaml:"listen"` Listen string `yaml:"listen"`
Username string `yaml:"username"`
Password string `yaml:"password"`
BasePath string `yaml:"base_path"` BasePath string `yaml:"base_path"`
StaticDir string `yaml:"static_dir"` StaticDir string `yaml:"static_dir"`
Origin string `yaml:"origin"` Origin string `yaml:"origin"`
@@ -35,7 +40,9 @@ func Init() {
initStatic(cfg.Mod.StaticDir) initStatic(cfg.Mod.StaticDir)
initWS(cfg.Mod.Origin) initWS(cfg.Mod.Origin)
HandleFunc("api/streams", streamsHandler) HandleFunc("api", apiHandler)
HandleFunc("api/config", configHandler)
HandleFunc("api/exit", exitHandler)
HandleFunc("api/ws", apiWS) HandleFunc("api/ws", apiWS)
// ensure we can listen without errors // ensure we can listen without errors
@@ -48,14 +55,18 @@ func Init() {
log.Info().Str("addr", cfg.Mod.Listen).Msg("[api] listen") log.Info().Str("addr", cfg.Mod.Listen).Msg("[api] listen")
s := http.Server{} s := http.Server{}
s.Handler = http.DefaultServeMux s.Handler = http.DefaultServeMux // 4th
if log.Trace().Enabled() {
s.Handler = middlewareLog(s.Handler)
}
if cfg.Mod.Origin == "*" { if cfg.Mod.Origin == "*" {
s.Handler = middlewareCORS(s.Handler) s.Handler = middlewareCORS(s.Handler) // 3rd
}
if cfg.Mod.Username != "" {
s.Handler = middlewareAuth(cfg.Mod.Username, cfg.Mod.Password, s.Handler) // 2nd
}
if log.Trace().Enabled() {
s.Handler = middlewareLog(s.Handler) // 1st
} }
go func() { go func() {
@@ -83,7 +94,22 @@ var log zerolog.Logger
func middlewareLog(next http.Handler) http.Handler { func middlewareLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Trace().Msgf("[api] %s %s", r.Method, r.URL) log.Trace().Msgf("[api] %s %s %s", r.Method, r.URL, r.RemoteAddr)
next.ServeHTTP(w, r)
})
}
func middlewareAuth(username, password string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.RemoteAddr, "127.") && !strings.HasPrefix(r.RemoteAddr, "[::1]") {
user, pass, ok := r.BasicAuth()
if !ok || user != username || pass != password {
w.Header().Set("Www-Authenticate", `Basic realm="go2rtc"`)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
}
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
} }
@@ -96,31 +122,25 @@ func middlewareCORS(next http.Handler) http.Handler {
}) })
} }
func streamsHandler(w http.ResponseWriter, r *http.Request) { var mu sync.Mutex
src := r.URL.Query().Get("src")
name := r.URL.Query().Get("name")
if name == "" { func apiHandler(w http.ResponseWriter, r *http.Request) {
name = src mu.Lock()
app.Info["host"] = r.Host
mu.Unlock()
if err := json.NewEncoder(w).Encode(app.Info); err != nil {
log.Warn().Err(err).Caller().Send()
} }
}
switch r.Method {
case "PUT": func exitHandler(w http.ResponseWriter, r *http.Request) {
streams.New(name, src) if r.Method != "POST" {
return http.Error(w, "", http.StatusBadRequest)
case "DELETE": return
streams.Delete(src) }
return
} s := r.URL.Query().Get("code")
code, _ := strconv.Atoi(s)
var v interface{} os.Exit(code)
if src != "" {
v = streams.Get(src)
} else {
v = streams.All()
}
e := json.NewEncoder(w)
e.SetIndent("", " ")
_ = e.Encode(v)
} }

102
cmd/api/config.go Normal file
View File

@@ -0,0 +1,102 @@
package api
import (
"github.com/AlexxIT/go2rtc/cmd/app"
"gopkg.in/yaml.v3"
"io"
"net/http"
"os"
)
func configHandler(w http.ResponseWriter, r *http.Request) {
if app.ConfigPath == "" {
http.Error(w, "", http.StatusGone)
return
}
switch r.Method {
case "GET":
data, err := os.ReadFile(app.ConfigPath)
if err != nil {
http.Error(w, "", http.StatusNotFound)
return
}
if _, err = w.Write(data); err != nil {
log.Warn().Err(err).Caller().Send()
}
case "POST", "PATCH":
data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if r.Method == "PATCH" {
// no need to validate after merge
data, err = mergeYAML(app.ConfigPath, data)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
} else {
// validate config
var tmp struct{}
if err = yaml.Unmarshal(data, &tmp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if err = os.WriteFile(app.ConfigPath, data, 0644); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func mergeYAML(file1 string, yaml2 []byte) ([]byte, error) {
// Read the contents of the first YAML file
data1, err := os.ReadFile(file1)
if err != nil {
return nil, err
}
// Unmarshal the first YAML file into a map
var config1 map[string]interface{}
if err = yaml.Unmarshal(data1, &config1); err != nil {
return nil, err
}
// Unmarshal the second YAML document into a map
var config2 map[string]interface{}
if err = yaml.Unmarshal(yaml2, &config2); err != nil {
return nil, err
}
// Merge the two maps
config1 = merge(config1, config2)
// Marshal the merged map into YAML
return yaml.Marshal(&config1)
}
func merge(dst, src map[string]interface{}) map[string]interface{} {
for k, v := range src {
if vv, ok := dst[k]; ok {
switch vv := vv.(type) {
case map[string]interface{}:
v := v.(map[string]interface{})
dst[k] = merge(vv, v)
case []interface{}:
v := v.([]interface{})
dst[k] = v
default:
dst[k] = v
}
} else {
dst[k] = v
}
}
return dst
}

View File

@@ -6,6 +6,7 @@ import (
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
"time"
) )
// Message - struct for data exchange in Web API // Message - struct for data exchange in Web API
@@ -68,6 +69,8 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
tr := &Transport{Request: r} tr := &Transport{Request: r}
tr.OnWrite(func(msg interface{}) { tr.OnWrite(func(msg interface{}) {
_ = ws.SetWriteDeadline(time.Now().Add(time.Second * 5))
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
_ = ws.WriteMessage(websocket.BinaryMessage, data) _ = ws.WriteMessage(websocket.BinaryMessage, data)
} else { } else {
@@ -78,7 +81,9 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
for { for {
msg := new(Message) msg := new(Message)
if err = ws.ReadJSON(msg); err != nil { if err = ws.ReadJSON(msg); err != nil {
log.Trace().Err(err).Caller().Send() if !websocket.IsCloseError(err, websocket.CloseNoStatusReceived) {
log.Trace().Err(err).Caller().Send()
}
_ = ws.Close() _ = ws.Close()
break break
} }
@@ -101,7 +106,9 @@ type Transport struct {
Request *http.Request Request *http.Request
Consumer interface{} // TODO: rewrite Consumer interface{} // TODO: rewrite
mx sync.Mutex closed bool
mx sync.Mutex
wrmx sync.Mutex
onChange func() onChange func()
onWrite func(msg interface{}) onWrite func(msg interface{})
@@ -118,21 +125,32 @@ func (t *Transport) OnWrite(f func(msg interface{})) {
} }
func (t *Transport) Write(msg interface{}) { func (t *Transport) Write(msg interface{}) {
t.mx.Lock() t.wrmx.Lock()
t.onWrite(msg) t.onWrite(msg)
t.mx.Unlock() t.wrmx.Unlock()
} }
func (t *Transport) Close() { func (t *Transport) Close() {
t.mx.Lock()
for _, f := range t.onClose { for _, f := range t.onClose {
f() f()
} }
t.closed = true
t.mx.Unlock()
} }
func (t *Transport) OnChange(f func()) { func (t *Transport) OnChange(f func()) {
t.mx.Lock()
t.onChange = f t.onChange = f
t.mx.Unlock()
} }
func (t *Transport) OnClose(f func()) { func (t *Transport) OnClose(f func()) {
t.onClose = append(t.onClose, f) t.mx.Lock()
if t.closed {
f()
} else {
t.onClose = append(t.onClose, f)
}
t.mx.Unlock()
} }

View File

@@ -2,46 +2,76 @@ package app
import ( import (
"flag" "flag"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"io" "io"
"os" "os"
"path/filepath"
"runtime" "runtime"
"strings"
"time"
) )
var Version = "0.1-rc.7" var Version = "1.0.1"
var UserAgent = "go2rtc/" + Version var UserAgent = "go2rtc/" + Version
func Init() { var ConfigPath string
config := flag.String( var Info = map[string]interface{}{
"config", "version": Version,
"go2rtc.yaml", }
"Path to go2rtc configuration file",
)
func Init() {
var confs Config
flag.Var(&confs, "config", "go2rtc config (path to file or raw text), support multiple")
flag.Parse() flag.Parse()
data, _ = os.ReadFile(*config) if confs == nil {
confs = []string{"go2rtc.yaml"}
}
for _, conf := range confs {
if conf[0] != '{' {
// config as file
if ConfigPath == "" {
ConfigPath = conf
}
data, _ := os.ReadFile(conf)
if data == nil {
continue
}
data = []byte(shell.ReplaceEnvVars(string(data)))
configs = append(configs, data)
} else {
// config as raw YAML
configs = append(configs, []byte(conf))
}
}
if ConfigPath != "" {
if !filepath.IsAbs(ConfigPath) {
if cwd, err := os.Getwd(); err == nil {
ConfigPath = filepath.Join(cwd, ConfigPath)
}
}
Info["config_path"] = ConfigPath
}
var cfg struct { var cfg struct {
Mod map[string]string `yaml:"log"` Mod map[string]string `yaml:"log"`
} }
if data != nil { LoadConfig(&cfg)
if err := yaml.Unmarshal(data, &cfg); err != nil {
println("ERROR: " + err.Error())
}
}
log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"]) log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"])
modules = cfg.Mod modules = cfg.Mod
log.Info().Msgf("go2rtc version %s %s/%s", Version, runtime.GOOS, runtime.GOARCH) log.Info().Msgf("go2rtc version %s %s/%s", Version, runtime.GOOS, runtime.GOARCH)
path, _ := os.Getwd()
log.Debug().Str("cwd", path).Send()
} }
func NewLogger(format string, level string) zerolog.Logger { func NewLogger(format string, level string) zerolog.Logger {
@@ -54,7 +84,7 @@ func NewLogger(format string, level string) zerolog.Logger {
} }
} }
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs zerolog.TimeFieldFormat = time.RFC3339Nano
lvl, err := zerolog.ParseLevel(level) lvl, err := zerolog.ParseLevel(level)
if err != nil || lvl == zerolog.NoLevel { if err != nil || lvl == zerolog.NoLevel {
@@ -65,7 +95,7 @@ func NewLogger(format string, level string) zerolog.Logger {
} }
func LoadConfig(v interface{}) { func LoadConfig(v interface{}) {
if data != nil { for _, data := range configs {
if err := yaml.Unmarshal(data, v); err != nil { if err := yaml.Unmarshal(data, v); err != nil {
log.Warn().Err(err).Msg("[app] read config") log.Warn().Err(err).Msg("[app] read config")
} }
@@ -86,8 +116,18 @@ func GetLogger(module string) zerolog.Logger {
// internal // internal
// data - config content type Config []string
var data []byte
func (c *Config) String() string {
return strings.Join(*c, " ")
}
func (c *Config) Set(value string) error {
*c = append(*c, value)
return nil
}
var configs [][]byte
// modules log levels // modules log levels
var modules map[string]string var modules map[string]string

View File

@@ -4,24 +4,14 @@ import (
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"net/http"
"os"
"strconv"
) )
func Init() { func Init() {
api.HandleFunc("api/stack", stackHandler) api.HandleFunc("api/stack", stackHandler)
api.HandleFunc("api/exit", exitHandler)
streams.HandleFunc("null", nullHandler) streams.HandleFunc("null", nullHandler)
} }
func exitHandler(_ http.ResponseWriter, r *http.Request) {
s := r.URL.Query().Get("code")
code, _ := strconv.Atoi(s)
os.Exit(code)
}
func nullHandler(string) (streamer.Producer, error) { func nullHandler(string) (streamer.Producer, error) {
return nil, nil return nil, nil
} }

View File

@@ -25,6 +25,7 @@ var stackSkip = [][]byte{
// webrtc/api.go // webrtc/api.go
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
[]byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"),
} }
func stackHandler(w http.ResponseWriter, r *http.Request) { func stackHandler(w http.ResponseWriter, r *http.Request) {

View File

@@ -34,8 +34,13 @@ func Init() {
return false return false
} }
waiter <- conn // unblocking write to channel
return true select {
case waiter <- conn:
return true
default:
return false
}
}) })
streams.HandleFunc("exec", Handle) streams.HandleFunc("exec", Handle)
@@ -86,7 +91,13 @@ func Handle(url string) (streamer.Producer, error) {
chErr := make(chan error) chErr := make(chan error)
go func() { go func() {
chErr <- cmd.Wait() err := cmd.Wait()
// unblocking write to channel
select {
case chErr <- err:
default:
log.Trace().Str("url", url).Msg("[exec] close")
}
}() }()
select { select {

View File

@@ -27,7 +27,10 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
exit := make(chan []byte) exit := make(chan []byte)
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{
RemoteAddr: r.RemoteAddr,
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
switch msg := msg.(type) { switch msg := msg.(type) {
case []byte: case []byte:
@@ -68,7 +71,10 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
flusher := w.(http.Flusher) flusher := w.(http.Flusher)
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{
RemoteAddr: r.RemoteAddr,
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
switch msg := msg.(type) { switch msg := msg.(type) {
case []byte: case []byte:
@@ -109,7 +115,10 @@ func handlerWS(tr *api.Transport, _ *api.Message) error {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{
RemoteAddr: tr.Request.RemoteAddr,
UserAgent: tr.Request.UserAgent(),
}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
tr.Write(data) tr.Write(data)

View File

@@ -80,7 +80,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
exit := make(chan error) exit := make(chan error)
cons := &mp4.Consumer{} cons := &mp4.Consumer{
RemoteAddr: r.RemoteAddr,
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
if _, err := w.Write(data); err != nil && exit != nil { if _, err := w.Write(data); err != nil && exit != nil {

View File

@@ -18,7 +18,10 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
cons := &mp4.Consumer{} cons := &mp4.Consumer{
RemoteAddr: tr.Request.RemoteAddr,
UserAgent: tr.Request.UserAgent(),
}
cons.UserAgent = tr.Request.UserAgent() cons.UserAgent = tr.Request.UserAgent()
cons.RemoteAddr = tr.Request.RemoteAddr cons.RemoteAddr = tr.Request.RemoteAddr
@@ -38,7 +41,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
}) })
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Caller().Send() log.Debug().Err(err).Msg("[mp4] add consumer")
return err return err
} }
@@ -68,7 +71,11 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
cons := &mp4.Segment{} cons := &mp4.Segment{
RemoteAddr: tr.Request.RemoteAddr,
UserAgent: tr.Request.UserAgent(),
OnlyKeyframe: true,
}
if codecs, ok := msg.Value.(string); ok { if codecs, ok := msg.Value.(string); ok {
log.Trace().Str("codecs", codecs).Msgf("[mp4] new WS/MP4 consumer") log.Trace().Str("codecs", codecs).Msgf("[mp4] new WS/MP4 consumer")

View File

@@ -14,9 +14,9 @@ import (
func Init() { func Init() {
var conf struct { var conf struct {
Mod struct { Mod struct {
Listen string `yaml:"listen"` Listen string `yaml:"listen" json:"listen"`
Username string `yaml:"username"` Username string `yaml:"username" json:"-"`
Password string `yaml:"password"` Password string `yaml:"password" json:"-"`
} `yaml:"rtsp"` } `yaml:"rtsp"`
} }
@@ -24,6 +24,7 @@ func Init() {
conf.Mod.Listen = ":8554" conf.Mod.Listen = ":8554"
app.LoadConfig(&conf) app.LoadConfig(&conf)
app.Info["rtsp"] = conf.Mod
log = app.GetLogger("rtsp") log = app.GetLogger("rtsp")
@@ -161,6 +162,8 @@ func tcpHandler(conn *rtsp.Conn) {
log.Debug().Str("stream", name).Msg("[rtsp] new consumer") log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
conn.SessionName = app.UserAgent
initMedias(conn) initMedias(conn)
if err := stream.AddConsumer(conn); err != nil { if err := stream.AddConsumer(conn); err != nil {
@@ -200,6 +203,9 @@ func tcpHandler(conn *rtsp.Conn) {
if err := conn.Accept(); err != nil { if err := conn.Accept(); err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
if closer != nil {
closer()
}
_ = conn.Close() _ = conn.Close()
return return
} }
@@ -212,7 +218,7 @@ func tcpHandler(conn *rtsp.Conn) {
if closer != nil { if closer != nil {
if err := conn.Handle(); err != nil { if err := conn.Handle(); err != nil {
log.Debug().Err(err).Caller().Send() log.Debug().Msgf("[rtsp] handle=%s", err)
} }
closer() closer()

15
cmd/streams/consumer.go Normal file
View File

@@ -0,0 +1,15 @@
package streams
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
type Consumer struct {
element streamer.Consumer
tracks []*streamer.Track
}
func (c *Consumer) MarshalJSON() ([]byte, error) {
return json.Marshal(c.element)
}

View File

@@ -1,6 +1,7 @@
package streams package streams
import ( import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync" "sync"
@@ -27,9 +28,9 @@ type Producer struct {
lastErr error lastErr error
tracks []*streamer.Track tracks []*streamer.Track
state state state state
mu sync.Mutex mu sync.Mutex
restart *time.Timer workerID int
} }
func (p *Producer) SetSource(s string) { func (p *Producer) SetSource(s string) {
@@ -91,6 +92,15 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
return track return track
} }
func (p *Producer) MarshalJSON() ([]byte, error) {
if p.element != nil {
return json.Marshal(p.element)
}
info := streamer.Info{URL: p.url}
return json.Marshal(info)
}
// internals // internals
func (p *Producer) start() { func (p *Producer) start() {
@@ -104,20 +114,32 @@ func (p *Producer) start() {
log.Debug().Msgf("[streams] start producer url=%s", p.url) log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart p.state = stateStart
go func() { p.workerID++
// safe read element while mu locked
if err := p.element.Start(); err != nil { go p.worker(p.element, p.workerID)
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) reconnect() { func (p *Producer) worker(element streamer.Producer, workerID int) {
if err := element.Start(); err != nil {
p.mu.Lock()
closed := p.workerID != workerID
p.mu.Unlock()
if closed {
return
}
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect(workerID)
}
func (p *Producer) reconnect(workerID int) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.state != stateStart { if p.workerID != workerID {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url) log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return return
} }
@@ -126,9 +148,11 @@ func (p *Producer) reconnect() {
p.element, p.lastErr = GetProducer(p.url) p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil { if p.lastErr != nil || p.element == nil {
log.Debug().Err(p.lastErr).Caller().Send() log.Debug().Msgf("[streams] producer=%s", p.lastErr)
// TODO: dynamic timeout // TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect) time.AfterFunc(30*time.Second, func() {
p.reconnect(workerID)
})
return return
} }
@@ -152,12 +176,7 @@ func (p *Producer) reconnect() {
} }
} }
go func() { go p.worker(p.element, workerID)
if err := p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) stop() { func (p *Producer) stop() {
@@ -171,6 +190,8 @@ func (p *Producer) stop() {
case stateNone: case stateNone:
log.Debug().Msgf("[streams] can't stop none producer") log.Debug().Msgf("[streams] can't stop none producer")
return return
case stateStart:
p.workerID++
} }
log.Debug().Msgf("[streams] stop producer url=%s", p.url) log.Debug().Msgf("[streams] stop producer url=%s", p.url)
@@ -179,10 +200,6 @@ func (p *Producer) stop() {
_ = p.element.Stop() _ = p.element.Stop()
p.element = nil p.element = nil
} }
if p.restart != nil {
p.restart.Stop()
p.restart = nil
}
p.state = stateNone p.state = stateNone
p.tracks = nil p.tracks = nil

View File

@@ -7,18 +7,14 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )
type Consumer struct {
element streamer.Consumer
tracks []*streamer.Track
}
type Stream struct { type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup requests int32
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -53,6 +49,9 @@ func (s *Stream) SetSource(source string) {
} }
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers
atomic.AddInt32(&s.requests, 1)
ic := len(s.consumers) ic := len(s.consumers)
consumer := &Consumer{element: cons} consumer := &Consumer{element: cons}
@@ -60,9 +59,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
var codecs string var codecs string
// support for multiple simultaneous requests from different consumers
s.wg.Add(1)
// Step 1. Get consumer medias // Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() { for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia). log.Trace().Stringer("media", consMedia).
@@ -86,7 +82,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// Step 4. Get producer track // Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec) prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil { if prodTrack == nil {
log.Warn().Str("url", prod.url).Msg("[stream] can't get track") log.Warn().Str("url", prod.url).Msg("[streams] can't get track")
continue continue
} }
@@ -101,12 +97,11 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
} }
s.wg.Done() if atomic.AddInt32(&s.requests, -1) == 0 {
s.wg.Wait() s.stopProducers()
}
if len(producers) == 0 { if len(producers) == 0 {
s.stopProducers()
if len(codecs) > 0 { if len(codecs) > 0 {
return errors.New("codecs not match: " + codecs) return errors.New("codecs not match: " + codecs)
} }
@@ -197,22 +192,21 @@ producers:
//} //}
func (s *Stream) MarshalJSON() ([]byte, error) { func (s *Stream) MarshalJSON() ([]byte, error) {
var v []interface{} if !s.mu.TryLock() {
s.mu.Lock() log.Warn().Msgf("[streams] json locked")
for _, prod := range s.producers { return json.Marshal(nil)
if prod.element != nil {
v = append(v, prod.element)
}
} }
for _, cons := range s.consumers {
// cons.element always not nil var info struct {
v = append(v, cons.element) Producers []*Producer `json:"producers"`
Consumers []*Consumer `json:"consumers"`
} }
info.Producers = s.producers
info.Consumers = s.consumers
s.mu.Unlock() s.mu.Unlock()
if len(v) == 0 {
v = nil return json.Marshal(info)
}
return json.Marshal(v)
} }
func (s *Stream) removeConsumer(i int) { func (s *Stream) removeConsumer(i int) {
@@ -242,6 +236,10 @@ func (s *Stream) removeProducer(i int) {
} }
func collectCodecs(media *streamer.Media, codecs *string) { func collectCodecs(media *streamer.Media, codecs *string) {
if media.Direction == streamer.DirectionRecvonly {
return
}
for _, codec := range media.Codecs { for _, codec := range media.Codecs {
name := codec.Name name := codec.Name
if name == streamer.CodecAAC { if name == streamer.CodecAAC {

View File

@@ -1,9 +1,12 @@
package streams package streams
import ( import (
"encoding/json"
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/app/store" "github.com/AlexxIT/go2rtc/cmd/app/store"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"net/http"
) )
func Init() { func Init() {
@@ -22,6 +25,8 @@ func Init() {
for name, item := range store.GetDict("streams") { for name, item := range store.GetDict("streams") {
streams[name] = NewStream(item) streams[name] = NewStream(item)
} }
api.HandleFunc("api/streams", streamsHandler)
} }
func Get(name string) *Stream { func Get(name string) *Stream {
@@ -48,19 +53,29 @@ func GetOrNew(src string) *Stream {
return New(src, src) return New(src, src)
} }
func Delete(name string) { func streamsHandler(w http.ResponseWriter, r *http.Request) {
delete(streams, name) src := r.URL.Query().Get("src")
}
func All() map[string]interface{} { switch r.Method {
all := map[string]interface{}{} case "PUT":
for name, stream := range streams { name := r.URL.Query().Get("name")
all[name] = stream if name == "" {
//if stream.Active() { name = src
// all[name] = stream }
//} New(name, src)
return
case "DELETE":
delete(streams, src)
return
}
if src != "" {
e := json.NewEncoder(w)
e.SetIndent("", " ")
_ = e.Encode(streams[src])
} else {
_ = json.NewEncoder(w).Encode(streams)
} }
return all
} }
var log zerolog.Logger var log zerolog.Logger

View File

@@ -7,6 +7,7 @@ import (
) )
var candidates []string var candidates []string
var networks = []string{"udp", "tcp"}
func AddCandidate(address string) { func AddCandidate(address string) {
candidates = append(candidates, address) candidates = append(candidates, address)
@@ -20,15 +21,17 @@ func asyncCandidates(tr *api.Transport) {
continue continue
} }
cand, err := webrtc.NewCandidate(address) for _, network := range networks {
if err != nil { cand, err := webrtc.NewCandidate(network, address)
log.Warn().Err(err).Caller().Send() if err != nil {
continue log.Warn().Err(err).Caller().Send()
continue
}
log.Trace().Str("candidate", cand).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
} }
log.Trace().Str("candidate", cand).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
} }
} }
@@ -57,13 +60,15 @@ func syncCanditates(answer string) (string, error) {
continue continue
} }
cand, err := webrtc.NewCandidate(address) for _, network := range networks {
if err != nil { cand, err := webrtc.NewCandidate(network, address)
log.Warn().Err(err).Msg("[webrtc] candidate") if err != nil {
continue log.Warn().Err(err).Msg("[webrtc] candidate")
} continue
}
md.WithPropertyAttribute(cand) md.WithPropertyAttribute(cand)
}
} }
if end { if end {

View File

@@ -22,6 +22,7 @@ func Init() {
} `yaml:"webrtc"` } `yaml:"webrtc"`
} }
cfg.Mod.Listen = ":8555"
cfg.Mod.IceServers = []pion.ICEServer{ cfg.Mod.IceServers = []pion.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}}, {URLs: []string{"stun:stun.l.google.com:19302"}},
} }
@@ -112,7 +113,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
// 2. AddConsumer, so we get new tracks // 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil { if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Caller().Send() log.Debug().Err(err).Msg("[webrtc] add consumer")
_ = conn.Conn.Close() _ = conn.Conn.Close()
return err return err
} }

View File

@@ -1,6 +1,7 @@
package h264 package h264
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -27,7 +28,8 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
} }
// Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true // Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true
if packet.Marker { // Reolink Duo 2: sends SPS with Marker and PPS without
if packet.Marker && len(payload) < 128 {
switch NALUType(payload) { switch NALUType(payload) {
case NALUTypeSPS, NALUTypePPS: case NALUTypeSPS, NALUTypePPS:
buf = append(buf, payload...) buf = append(buf, payload...)
@@ -68,9 +70,30 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
if len(buf) > 0 { if len(buf) > 0 {
payload = append(buf, payload...) payload = append(buf, payload...)
buf = buf[:0] buf = buf[:0]
} else {
// some Chinese buggy cameras has single packet with SPS+PPS+IFrame separated by 00 00 00 01
// https://github.com/AlexxIT/WebRTC/issues/391
// https://github.com/AlexxIT/WebRTC/issues/392
for i := 0; i < len(payload); {
if i+4 >= len(payload) {
break
}
size := bytes.Index(payload[i+4:], []byte{0, 0, 0, 1})
if size < 0 {
if i == 0 {
break
}
size = len(payload) - (i + 4)
}
binary.BigEndian.PutUint32(payload[i:], uint32(size))
i += size + 4
}
} }
//log.Printf("[AVC] %v, len: %d", Types(payload), len(payload)) //log.Printf("[AVC] %v, len: %d, ts: %10d, seq: %d", Types(payload), len(payload), packet.Timestamp, packet.SequenceNumber)
clone := *packet clone := *packet
clone.Version = RTPPacketVersionAVC clone.Version = RTPPacketVersionAVC

View File

@@ -1,6 +1,7 @@
package homekit package homekit
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/hap" "github.com/AlexxIT/go2rtc/pkg/hap"
@@ -11,6 +12,7 @@ import (
"github.com/brutella/hap/rtp" "github.com/brutella/hap/rtp"
"net" "net"
"net/url" "net/url"
"sync/atomic"
) )
type Client struct { type Client struct {
@@ -263,3 +265,19 @@ func (c *Client) getMedias() []*streamer.Media {
return medias return medias
} }
func (c *Client) MarshalJSON() ([]byte, error) {
var recv uint32
for _, session := range c.sessions {
recv += atomic.LoadUint32(&session.Recv)
}
info := &streamer.Info{
Type: "HomeKit source",
URL: c.conn.URL(),
Medias: c.medias,
Tracks: c.tracks,
Recv: recv,
}
return json.Marshal(info)
}

165
pkg/httpflv/amf0.go Normal file
View File

@@ -0,0 +1,165 @@
package httpflv
import (
"encoding/binary"
"errors"
"math"
)
const (
TypeNumber byte = iota
TypeBoolean
TypeString
TypeObject
TypeEcmaArray = 8
TypeObjectEnd = 9
)
var Err = errors.New("amf0 read error")
// AMF0 spec: http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
type AMF0 struct {
buf []byte
pos int
}
func NewReader(b []byte) *AMF0 {
return &AMF0{buf: b}
}
func (a *AMF0) ReadMetaData() map[string]interface{} {
if b, _ := a.ReadByte(); b != TypeString {
return nil
}
if s, _ := a.ReadString(); s != "onMetaData" {
return nil
}
b, _ := a.ReadByte()
switch b {
case TypeObject:
v, _ := a.ReadObject()
return v
case TypeEcmaArray:
v, _ := a.ReadEcmaArray()
return v
}
return nil
}
func (a *AMF0) ReadMap() (map[interface{}]interface{}, error) {
dict := make(map[interface{}]interface{})
for a.pos < len(a.buf) {
k, err := a.ReadItem()
if err != nil {
return nil, err
}
v, err := a.ReadItem()
if err != nil {
return nil, err
}
dict[k] = v
}
return dict, nil
}
func (a *AMF0) ReadItem() (interface{}, error) {
dataType, err := a.ReadByte()
if err != nil {
return nil, err
}
switch dataType {
case TypeNumber:
return a.ReadNumber()
case TypeBoolean:
v, err := a.ReadByte()
return v != 0, err
case TypeString:
return a.ReadString()
case TypeObject:
return a.ReadObject()
case TypeObjectEnd:
return nil, nil
}
return nil, Err
}
func (a *AMF0) ReadByte() (byte, error) {
if a.pos >= len(a.buf) {
return 0, Err
}
v := a.buf[a.pos]
a.pos++
return v, nil
}
func (a *AMF0) ReadNumber() (float64, error) {
if a.pos+8 >= len(a.buf) {
return 0, Err
}
v := binary.BigEndian.Uint64(a.buf[a.pos : a.pos+8])
a.pos += 8
return math.Float64frombits(v), nil
}
func (a *AMF0) ReadString() (string, error) {
if a.pos+2 >= len(a.buf) {
return "", Err
}
size := int(binary.BigEndian.Uint16(a.buf[a.pos:]))
a.pos += 2
if a.pos+size >= len(a.buf) {
return "", Err
}
s := string(a.buf[a.pos : a.pos+size])
a.pos += size
return s, nil
}
func (a *AMF0) ReadObject() (map[string]interface{}, error) {
obj := make(map[string]interface{})
for {
k, err := a.ReadString()
if err != nil {
return nil, err
}
v, err := a.ReadItem()
if err != nil {
return nil, err
}
if k == "" {
break
}
obj[k] = v
}
return obj, nil
}
func (a *AMF0) ReadEcmaArray() (map[string]interface{}, error) {
if a.pos+4 >= len(a.buf) {
return nil, Err
}
a.pos += 4 // skip size
return a.ReadObject()
}

97
pkg/httpflv/flvio.go Normal file
View File

@@ -0,0 +1,97 @@
package httpflv
import (
"fmt"
"github.com/deepch/vdk/format/flv/flvio"
"github.com/deepch/vdk/utils/bits/pio"
"io"
)
// TODO: rewrite all of this someday
func ReadTag(r io.Reader, b []byte) (tag flvio.Tag, ts int32, err error) {
if _, err = io.ReadFull(r, b[:flvio.TagHeaderLength]); err != nil {
return
}
var datalen int
if tag, ts, datalen, err = flvio.ParseTagHeader(b); err != nil {
return
}
data := make([]byte, datalen)
if _, err = io.ReadFull(r, data); err != nil {
return
}
n, err := ParseHeader(&tag, data)
if err != nil {
return
}
tag.Data = data[n:]
if _, err = io.ReadFull(r, b[:4]); err != nil {
return
}
return
}
func ParseHeader(self *flvio.Tag, b []byte) (n int, err error) {
switch self.Type {
case flvio.TAG_AUDIO:
return audioParseHeader(self, b)
case flvio.TAG_VIDEO:
return videoParseHeader(self, b)
}
return
}
func audioParseHeader(tag *flvio.Tag, b []byte) (n int, err error) {
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
flags := b[n]
n++
tag.SoundFormat = flags >> 4
tag.SoundRate = (flags >> 2) & 0x3
tag.SoundSize = (flags >> 1) & 0x1
tag.SoundType = flags & 0x1
switch tag.SoundFormat {
case flvio.SOUND_AAC:
if len(b) < n+1 {
err = fmt.Errorf("audiodata: parse invalid")
return
}
tag.AACPacketType = b[n]
n++
}
return
}
func videoParseHeader(tag *flvio.Tag, b []byte) (n int, err error) {
if len(b) < n+1 {
err = fmt.Errorf("videodata: parse invalid")
return
}
flags := b[n]
tag.FrameType = flags >> 4
tag.CodecID = flags & 0xf
n++
if len(b) < n+4 {
err = fmt.Errorf("videodata: parse invalid")
return
}
tag.AVCPacketType = b[n]
n++
tag.CompositionTime = pio.I24BE(b[n:])
n += 3
return
}

View File

@@ -2,8 +2,9 @@ package httpflv
import ( import (
"bufio" "bufio"
"errors" "bytes"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/flv/flvio" "github.com/deepch/vdk/format/flv/flvio"
"github.com/deepch/vdk/utils/bits/pio" "github.com/deepch/vdk/utils/bits/pio"
@@ -41,8 +42,12 @@ func Accept(res *http.Response) (*Conn, error) {
return nil, err return nil, err
} }
if flags&flvio.FILE_HAS_VIDEO == 0 { if flags&flvio.FILE_HAS_VIDEO != 0 {
return nil, errors.New("not supported") c.videoIdx = -1
}
if flags&flvio.FILE_HAS_AUDIO != 0 {
c.audioIdx = -1
} }
if _, err = c.reader.Discard(n); err != nil { if _, err = c.reader.Discard(n); err != nil {
@@ -56,49 +61,154 @@ type Conn struct {
conn io.ReadCloser conn io.ReadCloser
reader *bufio.Reader reader *bufio.Reader
buf []byte buf []byte
videoIdx int8
audioIdx int8
} }
func (c *Conn) Streams() ([]av.CodecData, error) { func (c *Conn) Streams() ([]av.CodecData, error) {
for { var video, audio av.CodecData
// Normal software sends:
// 1. Video/audio flag in header
// 2. MetaData as first tag (with video/audio codec info)
// 3. Video/audio headers in 2nd and 3rd tag
// Reolink camera sends:
// 1. Empty video/audio flag
// 2. MedaData without stereo key for AAC
// 3. Audio header after Video keyframe tag
waitVideo := c.videoIdx != 0
waitAudio := c.audioIdx != 0
for i := 0; i < 20; i++ {
tag, _, err := flvio.ReadTag(c.reader, c.buf) tag, _, err := flvio.ReadTag(c.reader, c.buf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR { //log.Printf("[FLV] type=%d avc=%d aac=%d video=%t audio=%t", tag.Type, tag.AVCPacketType, tag.AACPacketType, video != nil, audio != nil)
continue
switch tag.Type {
case flvio.TAG_SCRIPTDATA:
if meta := NewReader(tag.Data).ReadMetaData(); meta != nil {
waitVideo = meta["videocodecid"] != nil
// don't wait audio tag because parse all info from MetaData
waitAudio = false
audio = parseAudioConfig(meta)
} else {
waitVideo = bytes.Contains(tag.Data, []byte("videocodecid"))
waitAudio = bytes.Contains(tag.Data, []byte("audiocodecid"))
}
case flvio.TAG_VIDEO:
if tag.AVCPacketType == flvio.AVC_SEQHDR {
video, _ = h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data)
}
waitVideo = false
case flvio.TAG_AUDIO:
if tag.SoundFormat == flvio.SOUND_AAC && tag.AACPacketType == flvio.AAC_SEQHDR {
audio, _ = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(tag.Data)
}
waitAudio = false
} }
stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data) if !waitVideo && !waitAudio {
if err != nil { break
return nil, err
} }
return []av.CodecData{stream}, nil
} }
if video != nil && audio != nil {
c.videoIdx = 0
c.audioIdx = 1
return []av.CodecData{video, audio}, nil
} else if video != nil {
c.videoIdx = 0
c.audioIdx = -1
return []av.CodecData{video}, nil
} else if audio != nil {
c.videoIdx = -1
c.audioIdx = 0
return []av.CodecData{audio}, nil
}
return nil, nil
} }
func (c *Conn) ReadPacket() (av.Packet, error) { func (c *Conn) ReadPacket() (av.Packet, error) {
for { for {
tag, ts, err := flvio.ReadTag(c.reader, c.buf) tag, ts, err := ReadTag(c.reader, c.buf)
if err != nil { if err != nil {
return av.Packet{}, err return av.Packet{}, err
} }
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU { switch tag.Type {
continue case flvio.TAG_VIDEO:
} if c.videoIdx < 0 || tag.AVCPacketType != flvio.AVC_NALU {
continue
}
return av.Packet{ //log.Printf("[FLV] %v, len: %d, ts: %10d", h264.Types(tag.Data), len(tag.Data), flvio.TsToTime(ts))
Idx: 0,
Data: tag.Data, return av.Packet{
CompositionTime: flvio.TsToTime(tag.CompositionTime), Idx: c.videoIdx,
IsKeyFrame: tag.FrameType == flvio.FRAME_KEY, Data: tag.Data,
Time: flvio.TsToTime(ts), CompositionTime: flvio.TsToTime(tag.CompositionTime),
}, nil IsKeyFrame: tag.FrameType == flvio.FRAME_KEY,
Time: flvio.TsToTime(ts),
}, nil
case flvio.TAG_AUDIO:
if c.audioIdx < 0 || tag.SoundFormat != flvio.SOUND_AAC || tag.AACPacketType != flvio.AAC_RAW {
continue
}
return av.Packet{Idx: c.audioIdx, Data: tag.Data, Time: flvio.TsToTime(ts)}, nil
}
} }
} }
func (c *Conn) Close() (err error) { func (c *Conn) Close() (err error) {
return c.conn.Close() return c.conn.Close()
} }
func parseAudioConfig(meta map[string]interface{}) av.CodecData {
if meta["audiocodecid"] != float64(10) {
return nil
}
config := aacparser.MPEG4AudioConfig{
ObjectType: aacparser.AOT_AAC_LC,
}
switch v := meta["audiosamplerate"].(type) {
case float64:
config.SampleRate = int(v)
default:
return nil
}
switch meta["stereo"] {
case true:
config.ChannelConfig = 2
config.ChannelLayout = av.CH_STEREO
default:
// Reolink doesn't have this setting
config.ChannelConfig = 1
config.ChannelLayout = av.CH_MONO
}
buf := &bytes.Buffer{}
if err := aacparser.WriteMPEG4AudioConfig(buf, config); err != nil {
return nil
}
return aacparser.CodecData{
Config: config,
ConfigBytes: buf.Bytes(),
}
}

View File

@@ -14,9 +14,19 @@ import (
"io" "io"
"net/http" "net/http"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
) )
type State byte
const (
StateNone State = iota
StateConn
StateHandle
)
type Client struct { type Client struct {
streamer.Element streamer.Element
@@ -26,12 +36,14 @@ type Client struct {
medias []*streamer.Media medias []*streamer.Media
tracks map[byte]*streamer.Track tracks map[byte]*streamer.Track
closed bool
msg *message msg *message
t0 time.Time t0 time.Time
buffer chan []byte buffer chan []byte
state State
mu sync.Mutex
recv uint32
} }
func NewClient(id string) *Client { func NewClient(id string) *Client {
@@ -69,16 +81,26 @@ func (c *Client) Dial() (err error) {
return err return err
} }
c.state = StateConn
return nil return nil
} }
func (c *Client) Handle() error { func (c *Client) Handle() error {
c.buffer = make(chan []byte, 5)
// add delay to the stream for smooth playing (not a best solution) // add delay to the stream for smooth playing (not a best solution)
c.t0 = time.Now().Add(time.Second) c.t0 = time.Now().Add(time.Second)
// processing stream in separate thread for lower delay between packets c.mu.Lock()
go c.worker()
if c.state == StateConn {
c.buffer = make(chan []byte, 5)
c.state = StateHandle
// processing stream in separate thread for lower delay between packets
go c.worker(c.buffer)
}
c.mu.Unlock()
_, data, err := c.conn.ReadMessage() _, data, err := c.conn.ReadMessage()
if err != nil { if err != nil {
@@ -87,7 +109,12 @@ func (c *Client) Handle() error {
track := c.tracks[c.msg.Track] track := c.tracks[c.msg.Track]
if track != nil { if track != nil {
c.buffer <- data c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
atomic.AddUint32(&c.recv, uint32(len(data)))
}
c.mu.Unlock()
} }
// we have one unprocessed msg after getTracks // we have one unprocessed msg after getTracks
@@ -114,7 +141,12 @@ func (c *Client) Handle() error {
track = c.tracks[msg.Track] track = c.tracks[msg.Track]
if track != nil { if track != nil {
c.buffer <- data c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
atomic.AddUint32(&c.recv, uint32(len(data)))
}
c.mu.Unlock()
} }
default: default:
@@ -124,13 +156,19 @@ func (c *Client) Handle() error {
} }
func (c *Client) Close() error { func (c *Client) Close() error {
if c.conn == nil { c.mu.Lock()
defer c.mu.Unlock()
switch c.state {
case StateNone:
return nil return nil
} case StateConn:
if c.buffer != nil { case StateHandle:
close(c.buffer) close(c.buffer)
} }
c.closed = true
c.state = StateNone
return c.conn.Close() return c.conn.Close()
} }
@@ -210,13 +248,13 @@ func (c *Client) getTracks() error {
} }
} }
func (c *Client) worker() { func (c *Client) worker(buffer chan []byte) {
var track *streamer.Track var track *streamer.Track
for _, track = range c.tracks { for _, track = range c.tracks {
break break
} }
for data := range c.buffer { for data := range buffer {
moof := &fmp4io.MovieFrag{} moof := &fmp4io.MovieFrag{}
if _, err := moof.Unmarshal(data, 0); err != nil { if _, err := moof.Unmarshal(data, 0); err != nil {
continue continue

View File

@@ -1,8 +1,10 @@
package ivideon package ivideon
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"sync/atomic"
) )
func (c *Client) GetMedias() []*streamer.Media { func (c *Client) GetMedias() []*streamer.Media {
@@ -20,7 +22,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame
func (c *Client) Start() error { func (c *Client) Start() error {
err := c.Handle() err := c.Handle()
if c.closed { if c.buffer == nil {
return nil return nil
} }
return err return err
@@ -29,3 +31,19 @@ func (c *Client) Start() error {
func (c *Client) Stop() error { func (c *Client) Stop() error {
return c.Close() return c.Close()
} }
func (c *Client) MarshalJSON() ([]byte, error) {
var tracks []*streamer.Track
for _, track := range c.tracks {
tracks = append(tracks, track)
}
info := &streamer.Info{
Type: "Ivideon source",
URL: c.ID,
Medias: c.medias,
Tracks: tracks,
Recv: atomic.LoadUint32(&c.recv),
}
return json.Marshal(info)
}

View File

@@ -2,6 +2,7 @@ package mjpeg
import ( import (
"bufio" "bufio"
"encoding/json"
"errors" "errors"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/AlexxIT/go2rtc/pkg/tcp"
@@ -11,6 +12,7 @@ import (
"net/textproto" "net/textproto"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
@@ -24,6 +26,7 @@ type Client struct {
res *http.Response res *http.Response
track *streamer.Track track *streamer.Track
recv uint32
} }
func NewClient(res *http.Response) *Client { func NewClient(res *http.Response) *Client {
@@ -64,10 +67,23 @@ func (c *Client) Start() error {
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
// important for close reader/writer gorutines
_ = c.res.Body.Close()
c.closed = true c.closed = true
return nil return nil
} }
func (c *Client) MarshalJSON() ([]byte, error) {
info := &streamer.Info{
Type: "MJPEG source",
URL: c.res.Request.URL.String(),
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Recv: atomic.LoadUint32(&c.recv),
}
return json.Marshal(info)
}
func (c *Client) startJPEG() error { func (c *Client) startJPEG() error {
buf, err := io.ReadAll(c.res.Body) buf, err := io.ReadAll(c.res.Body)
if err != nil { if err != nil {
@@ -77,6 +93,8 @@ func (c *Client) startJPEG() error {
packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf}
_ = c.track.WriteRTP(packet) _ = c.track.WriteRTP(packet)
atomic.AddUint32(&c.recv, uint32(len(buf)))
req := c.res.Request req := c.res.Request
for !c.closed { for !c.closed {
@@ -96,6 +114,8 @@ func (c *Client) startJPEG() error {
packet = &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} packet = &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf}
_ = c.track.WriteRTP(packet) _ = c.track.WriteRTP(packet)
atomic.AddUint32(&c.recv, uint32(len(buf)))
} }
return nil return nil
@@ -139,6 +159,8 @@ func (c *Client) startMJPEG(boundary string) error {
packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf}
_ = c.track.WriteRTP(packet) _ = c.track.WriteRTP(packet)
atomic.AddUint32(&c.recv, uint32(len(buf)))
if _, err = r.Discard(2); err != nil { if _, err = r.Discard(2); err != nil {
return err return err
} }

View File

@@ -1,8 +1,10 @@
package mjpeg package mjpeg
import ( import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp" "github.com/pion/rtp"
"sync/atomic"
) )
type Consumer struct { type Consumer struct {
@@ -14,7 +16,7 @@ type Consumer struct {
codecs []*streamer.Codec codecs []*streamer.Codec
start bool start bool
send int send uint32
} }
func (c *Consumer) GetMedias() []*streamer.Media { func (c *Consumer) GetMedias() []*streamer.Media {
@@ -28,6 +30,7 @@ func (c *Consumer) GetMedias() []*streamer.Media {
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
push := func(packet *rtp.Packet) error { push := func(packet *rtp.Packet) error {
c.Fire(packet.Payload) c.Fire(packet.Payload)
atomic.AddUint32(&c.send, uint32(len(packet.Payload)))
return nil return nil
} }
@@ -38,3 +41,13 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return track.Bind(push) return track.Bind(push)
} }
func (c *Consumer) MarshalJSON() ([]byte, error) {
info := &streamer.Info{
Type: "MJPEG client",
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Send: atomic.LoadUint32(&c.send),
}
return json.Marshal(info)
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp" "github.com/pion/rtp"
"sync/atomic"
) )
type Consumer struct { type Consumer struct {
@@ -20,7 +21,7 @@ type Consumer struct {
codecs []*streamer.Codec codecs []*streamer.Codec
wait byte wait byte
send int send uint32
} }
const ( const (
@@ -76,7 +77,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
} }
buf := c.muxer.Marshal(trackID, packet) buf := c.muxer.Marshal(trackID, packet)
c.send += len(buf) atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(buf) c.Fire(buf)
return nil return nil
@@ -108,7 +109,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
} }
buf := c.muxer.Marshal(trackID, packet) buf := c.muxer.Marshal(trackID, packet)
c.send += len(buf) atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(buf) c.Fire(buf)
return nil return nil
@@ -128,7 +129,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
} }
buf := c.muxer.Marshal(trackID, packet) buf := c.muxer.Marshal(trackID, packet)
c.send += len(buf) atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(buf) c.Fire(buf)
return nil return nil
@@ -163,12 +164,11 @@ func (c *Consumer) Start() {
// //
func (c *Consumer) MarshalJSON() ([]byte, error) { func (c *Consumer) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{ info := &streamer.Info{
"type": "MP4 server consumer", Type: "MP4 client",
"send": c.send, RemoteAddr: c.RemoteAddr,
"remote_addr": c.RemoteAddr, UserAgent: c.UserAgent,
"user_agent": c.UserAgent, Send: atomic.LoadUint32(&c.send),
} }
return json.Marshal(info)
return json.Marshal(v)
} }

View File

@@ -1,18 +1,25 @@
package mp4 package mp4
import ( import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp" "github.com/pion/rtp"
"sync/atomic"
) )
type Segment struct { type Segment struct {
streamer.Element streamer.Element
Medias []*streamer.Media Medias []*streamer.Media
UserAgent string
RemoteAddr string
MimeType string MimeType string
OnlyKeyframe bool OnlyKeyframe bool
send uint32
} }
func (c *Segment) GetMedias() []*streamer.Media { func (c *Segment) GetMedias() []*streamer.Media {
@@ -56,6 +63,7 @@ func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *stream
} }
buf := muxer.Marshal(0, packet) buf := muxer.Marshal(0, packet)
atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(append(init, buf...)) c.Fire(append(init, buf...))
return nil return nil
@@ -73,6 +81,7 @@ func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *stream
buf = append(buf, b...) buf = append(buf, b...)
} }
atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(buf) c.Fire(buf)
buf = buf[:0] buf = buf[:0]
@@ -106,6 +115,7 @@ func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *stream
} }
buf := muxer.Marshal(0, packet) buf := muxer.Marshal(0, packet)
atomic.AddUint32(&c.send, uint32(len(buf)))
c.Fire(append(init, buf...)) c.Fire(append(init, buf...))
return nil return nil
@@ -121,3 +131,13 @@ func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *stream
panic("unsupported codec") panic("unsupported codec")
} }
func (c *Segment) MarshalJSON() ([]byte, error) {
info := &streamer.Info{
Type: "WS/MP4 client",
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Send: atomic.LoadUint32(&c.send),
}
return json.Marshal(info)
}

View File

@@ -1,7 +1,6 @@
package mp4f package mp4f
import ( import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
@@ -15,6 +14,7 @@ import (
type Consumer struct { type Consumer struct {
streamer.Element streamer.Element
Medias []*streamer.Media
UserAgent string UserAgent string
RemoteAddr string RemoteAddr string
@@ -27,6 +27,10 @@ type Consumer struct {
} }
func (c *Consumer) GetMedias() []*streamer.Media { func (c *Consumer) GetMedias() []*streamer.Media {
if c.Medias != nil {
return c.Medias
}
return []*streamer.Media{ return []*streamer.Media{
{ {
Kind: streamer.KindVideo, Kind: streamer.KindVideo,
@@ -89,7 +93,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !codec.IsRAW() { if codec.IsRTP() {
wrapper := h264.RTPDepay(track) wrapper := h264.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }
@@ -149,16 +153,3 @@ func (c *Consumer) Init() ([]byte, error) {
func (c *Consumer) Start() { func (c *Consumer) Start() {
c.start = true c.start = true
} }
//
func (c *Consumer) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{
"type": "MSE server consumer",
"send": c.send,
"remote_addr": c.RemoteAddr,
"user_agent": c.UserAgent,
}
return json.Marshal(v)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/deepch/vdk/format/rtmp" "github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp" "github.com/pion/rtp"
"net/http" "net/http"
"sync/atomic"
"time" "time"
) )
@@ -33,7 +34,7 @@ type Client struct {
conn Conn conn Conn
closed bool closed bool
receive int recv uint32
} }
func NewClient(uri string) *Client { func NewClient(uri string) *Client {
@@ -138,7 +139,7 @@ func (c *Client) Handle() (err error) {
return return
} }
c.receive += len(pkt.Data) atomic.AddUint32(&c.recv, uint32(len(pkt.Data)))
track := c.tracks[int(pkt.Idx)] track := c.tracks[int(pkt.Idx)]

View File

@@ -4,7 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strconv" "sync/atomic"
) )
func (c *Client) GetMedias() []*streamer.Media { func (c *Client) GetMedias() []*streamer.Media {
@@ -29,19 +29,12 @@ func (c *Client) Stop() error {
} }
func (c *Client) MarshalJSON() ([]byte, error) { func (c *Client) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{ info := &streamer.Info{
streamer.JSONReceive: c.receive, Type: "RTMP source",
streamer.JSONType: "RTMP client producer", URL: c.URI,
//streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), Medias: c.medias,
"url": c.URI, Tracks: c.tracks,
Recv: atomic.LoadUint32(&c.recv),
} }
for i, media := range c.medias { return json.Marshal(info)
k := "media:" + strconv.Itoa(i)
v[k] = media.String()
}
for i, track := range c.tracks {
k := "track:" + strconv.Itoa(i)
v[k] = track.String()
}
return json.Marshal(v)
} }

View File

@@ -2,7 +2,6 @@ package rtsp
import ( import (
"bufio" "bufio"
"bytes"
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"errors" "errors"
@@ -48,6 +47,22 @@ const (
type State byte type State byte
func (s State) String() string {
switch s {
case StateNone:
return "NONE"
case StateConn:
return "CONN"
case StateSetup:
return "SETUP"
case StatePlay:
return "PLAY"
case StateHandle:
return "HANDLE"
}
return strconv.Itoa(int(s))
}
const ( const (
StateNone State = iota StateNone State = iota
StateConn StateConn
@@ -62,6 +77,7 @@ type Conn struct {
// public // public
Backchannel bool Backchannel bool
SessionName string
Medias []*streamer.Media Medias []*streamer.Media
Session string Session string
@@ -264,7 +280,7 @@ func (c *Conn) Options() error {
} }
if val := res.Header.Get("Content-Base"); val != "" { if val := res.Header.Get("Content-Base"); val != "" {
c.URL, err = url.Parse(val) c.URL, err = urlParse(val)
if err != nil { if err != nil {
return err return err
} }
@@ -294,7 +310,7 @@ func (c *Conn) Describe() error {
} }
if val := res.Header.Get("Content-Base"); val != "" { if val := res.Header.Get("Content-Base"); val != "" {
c.URL, err = url.Parse(val) c.URL, err = urlParse(val)
if err != nil { if err != nil {
return err return err
} }
@@ -346,6 +362,10 @@ func (c *Conn) SetupMedia(
c.stateMu.Lock() c.stateMu.Lock()
defer c.stateMu.Unlock() defer c.stateMu.Unlock()
if c.state != StateConn && c.state != StateSetup {
return nil, fmt.Errorf("RTSP SETUP from wrong state: %s", c.state)
}
ch := c.GetChannel(media) ch := c.GetChannel(media)
if ch < 0 { if ch < 0 {
return nil, fmt.Errorf("wrong media: %v", media) return nil, fmt.Errorf("wrong media: %v", media)
@@ -359,7 +379,7 @@ func (c *Conn) SetupMedia(
} }
rawURL += media.Control rawURL += media.Control
} }
trackURL, err := url.Parse(rawURL) trackURL, err := urlParse(rawURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -598,7 +618,7 @@ func (c *Conn) Accept() error {
medias = append(medias, media) medias = append(medias, media)
} }
res.Body, err = streamer.MarshalSDP(medias) res.Body, err = streamer.MarshalSDP(c.SessionName, medias)
if err != nil { if err != nil {
return err return err
} }
@@ -634,6 +654,12 @@ func (c *Conn) Accept() error {
} }
return err return err
case MethodTeardown:
res := &tcp.Response{Request: req}
_ = c.Response(res)
c.state = StateNone
return c.conn.Close()
default: default:
return fmt.Errorf("unsupported method: %s", req.Method) return fmt.Errorf("unsupported method: %s", req.Method)
} }
@@ -648,15 +674,17 @@ func (c *Conn) Handle() (err error) {
case StatePlay: case StatePlay:
c.state = StateHandle c.state = StateHandle
default: default:
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state) err = fmt.Errorf("RTSP HANDLE from wrong state: %s", c.state)
c.state = StateNone c.state = StateNone
_ = c.conn.Close() _ = c.conn.Close()
} }
ok := c.state == StateHandle
c.stateMu.Unlock() c.stateMu.Unlock()
if c.state != StateHandle { if !ok {
return return
} }
@@ -770,12 +798,12 @@ func (c *Conn) Handle() (err error) {
msg := &RTCP{Channel: channelID} msg := &RTCP{Channel: channelID}
if err = msg.Header.Unmarshal(buf); err != nil { if err = msg.Header.Unmarshal(buf); err != nil {
return continue
} }
msg.Packets, err = rtcp.Unmarshal(buf) msg.Packets, err = rtcp.Unmarshal(buf)
if err != nil { if err != nil {
return continue
} }
c.Fire(msg) c.Fire(msg)
@@ -853,42 +881,3 @@ func (c *Conn) bindTrack(
return track.Bind(push) return track.Bind(push)
} }
type RTCP struct {
Channel byte
Header rtcp.Header
Packets []rtcp.Packet
}
const sdpHeader = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0`
func UnmarshalSDP(rawSDP []byte) ([]*streamer.Media, error) {
medias, err := streamer.UnmarshalSDP(rawSDP)
if err != nil {
// fix SDP header for some cameras
i := bytes.Index(rawSDP, []byte("\nm="))
if i > 0 {
rawSDP = append([]byte(sdpHeader), rawSDP[i:]...)
medias, err = streamer.UnmarshalSDP(rawSDP)
}
if err != nil {
return nil, err
}
}
// fix bug in ONVIF spec
// https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec-v241.pdf
for _, media := range medias {
switch media.Direction {
case streamer.DirectionRecvonly, "":
media.Direction = streamer.DirectionSendonly
case streamer.DirectionSendonly:
media.Direction = streamer.DirectionRecvonly
}
}
return medias, nil
}

63
pkg/rtsp/helpers.go Normal file
View File

@@ -0,0 +1,63 @@
package rtsp
import (
"bytes"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtcp"
"net/url"
"strings"
)
type RTCP struct {
Channel byte
Header rtcp.Header
Packets []rtcp.Packet
}
const sdpHeader = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0`
func UnmarshalSDP(rawSDP []byte) ([]*streamer.Media, error) {
medias, err := streamer.UnmarshalSDP(rawSDP)
if err != nil {
// fix SDP header for some cameras
i := bytes.Index(rawSDP, []byte("\nm="))
if i > 0 {
rawSDP = append([]byte(sdpHeader), rawSDP[i:]...)
medias, err = streamer.UnmarshalSDP(rawSDP)
}
if err != nil {
return nil, err
}
}
// fix bug in ONVIF spec
// https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec-v241.pdf
for _, media := range medias {
switch media.Direction {
case streamer.DirectionRecvonly, "":
media.Direction = streamer.DirectionSendonly
case streamer.DirectionSendonly:
media.Direction = streamer.DirectionRecvonly
}
}
return medias, nil
}
// urlParse fix bug in URL from D-Link camera:
// Content-Base: rtsp://::ffff:192.168.1.123/onvif/profile.1/
func urlParse(rawURL string) (*url.URL, error) {
u, err := url.Parse(rawURL)
if err != nil && strings.HasSuffix(err.Error(), "after host") {
if i1 := strings.Index(rawURL, "://"); i1 > 0 {
if i2 := strings.IndexByte(rawURL[i1+3:], '/'); i2 > 0 {
return urlParse(rawURL[:i1+3+i2] + ":" + rawURL[i1+3+i2:])
}
}
}
return u, err
}

12
pkg/rtsp/rtsp_test.go Normal file
View File

@@ -0,0 +1,12 @@
package rtsp
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestURLParse(t *testing.T) {
base := "rtsp://::ffff:192.168.1.123/onvif/profile.1/"
_, err := urlParse(base)
assert.Empty(t, err)
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strconv"
) )
// Element Producer // Element Producer
@@ -88,40 +87,30 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.
// //
func (c *Conn) MarshalJSON() ([]byte, error) { func (c *Conn) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{ info := &streamer.Info{
streamer.JSONReceive: c.receive, UserAgent: c.UserAgent,
streamer.JSONSend: c.send, Medias: c.Medias,
Tracks: c.tracks,
Recv: uint32(c.receive),
Send: uint32(c.send),
} }
switch c.mode { switch c.mode {
case ModeUnknown: case ModeUnknown:
v[streamer.JSONType] = "RTSP unknown" info.Type = "RTSP unknown"
case ModeClientProducer: case ModeClientProducer, ModeServerProducer:
v[streamer.JSONType] = "RTSP client producer" info.Type = "RTSP source"
case ModeServerProducer:
v[streamer.JSONType] = "RTSP server producer"
case ModeServerConsumer: case ModeServerConsumer:
v[streamer.JSONType] = "RTSP server consumer" info.Type = "RTSP client"
} }
//if c.URI != "" {
// v["uri"] = c.URI
//}
if c.URL != nil { if c.URL != nil {
v["url"] = c.URL.String() info.URL = c.URL.String()
} }
if c.conn != nil { if c.conn != nil {
v[streamer.JSONRemoteAddr] = c.conn.RemoteAddr().String() info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.UserAgent != "" {
v[streamer.JSONUserAgent] = c.UserAgent
}
for i, media := range c.Medias {
k := "media:" + strconv.Itoa(i)
v[k] = media.String()
}
for i, track := range c.tracks {
k := "track:" + strconv.Itoa(int(i>>1))
v[k] = track.String()
} }
//for i, track := range c.tracks { //for i, track := range c.tracks {
// k := "track:" + strconv.Itoa(i+1) // k := "track:" + strconv.Itoa(i+1)
// if track.MimeType() == streamer.MimeTypeH264 { // if track.MimeType() == streamer.MimeTypeH264 {
@@ -130,5 +119,6 @@ func (c *Conn) MarshalJSON() ([]byte, error) {
// v[k] = track.MimeType() // v[k] = track.MimeType()
// } // }
//} //}
return json.Marshal(v)
return json.Marshal(info)
} }

32
pkg/shell/env.go Normal file
View File

@@ -0,0 +1,32 @@
package shell
import (
"os"
"regexp"
"strings"
)
func ReplaceEnvVars(text string) string {
re := regexp.MustCompile(`\${([^}{]+)}`)
return re.ReplaceAllStringFunc(text, func(match string) string {
key := match[2 : len(match)-1]
var def string
var dok bool
if i := strings.IndexByte(key, ':'); i > 0 {
key, def = key[:i], key[i+1:]
dok = true
}
if value, vok := os.LookupEnv(key); vok {
return value
}
if dok {
return def
}
return match
})
}

View File

@@ -3,6 +3,7 @@ package srtp
import ( import (
"encoding/binary" "encoding/binary"
"net" "net"
"sync/atomic"
) )
// Server using same UDP port for SRTP and for SRTCP as the iPhone does // Server using same UDP port for SRTP and for SRTCP as the iPhone does
@@ -55,6 +56,8 @@ func (s *Server) Serve(conn net.PacketConn) error {
} }
} }
atomic.AddUint32(&session.Recv, uint32(n))
if err = session.HandleRTP(buf[:n]); err != nil { if err = session.HandleRTP(buf[:n]); err != nil {
return err return err
} }

View File

@@ -17,6 +17,7 @@ type Session struct {
Write func(b []byte) (int, error) Write func(b []byte) (int, error)
Track *streamer.Track Track *streamer.Track
Recv uint32
lastSequence uint32 lastSequence uint32
lastTimestamp uint32 lastTimestamp uint32

View File

@@ -4,13 +4,16 @@ import (
"strings" "strings"
) )
const ( type Info struct {
JSONType = "type" Type string `json:"type,omitempty"`
JSONRemoteAddr = "remote_addr" URL string `json:"url,omitempty"`
JSONUserAgent = "user_agent" RemoteAddr string `json:"remote_addr,omitempty"`
JSONReceive = "receive" UserAgent string `json:"user_agent,omitempty"`
JSONSend = "send" Medias []*Media `json:"medias,omitempty"`
) Tracks []*Track `json:"tracks,omitempty"`
Recv uint32 `json:"recv,omitempty"`
Send uint32 `json:"send,omitempty"`
}
func Between(s, sub1, sub2 string) string { func Between(s, sub1, sub2 string) string {
i := strings.Index(s, sub1) i := strings.Index(s, sub1)

View File

@@ -1,6 +1,7 @@
package streamer package streamer
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"strconv" "strconv"
@@ -70,6 +71,10 @@ func (m *Media) String() string {
return s return s
} }
func (m *Media) MarshalJSON() ([]byte, error) {
return json.Marshal(m.String())
}
func (m *Media) Clone() *Media { func (m *Media) Clone() *Media {
clone := *m clone := *m
return &clone return &clone
@@ -178,8 +183,22 @@ func UnmarshalSDP(rawSDP []byte) ([]*Media, error) {
return medias, nil return medias, nil
} }
func MarshalSDP(medias []*Media) ([]byte, error) { func MarshalSDP(name string, medias []*Media) ([]byte, error) {
sd := &sdp.SessionDescription{} sd := &sdp.SessionDescription{
Origin: sdp.Origin{
Username: "-", SessionID: 1, SessionVersion: 1,
NetworkType: "IN", AddressType: "IP4", UnicastAddress: "0.0.0.0",
},
SessionName: sdp.SessionName(name),
ConnectionInformation: &sdp.ConnectionInformation{
NetworkType: "IN", AddressType: "IP4", Address: &sdp.Address{
Address: "0.0.0.0",
},
},
TimeDescriptions: []sdp.TimeDescription{
{Timing: sdp.Timing{}},
},
}
payloadType := uint8(96) payloadType := uint8(96)

View File

@@ -0,0 +1,23 @@
package streamer
import (
"github.com/pion/sdp/v3"
"github.com/stretchr/testify/assert"
"testing"
)
func TestSDP(t *testing.T) {
medias := []*Media{{
Kind: KindAudio, Direction: DirectionSendonly,
Codecs: []*Codec{
{Name: CodecPCMU, ClockRate: 8000},
},
}}
data, err := MarshalSDP("go2rtc/1.0.0", medias)
assert.Empty(t, err)
sd := &sdp.SessionDescription{}
err = sd.Unmarshal(data)
assert.Empty(t, err)
}

View File

@@ -1,6 +1,7 @@
package streamer package streamer
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/pion/rtp" "github.com/pion/rtp"
"sync" "sync"
@@ -22,12 +23,19 @@ func NewTrack(codec *Codec, direction string) *Track {
func (t *Track) String() string { func (t *Track) String() string {
s := t.Codec.String() s := t.Codec.String()
t.sinkMu.RLock() if t.sinkMu.TryRLock() {
s += fmt.Sprintf(", sinks=%d", len(t.sink)) s += fmt.Sprintf(", sinks=%d", len(t.sink))
t.sinkMu.RUnlock() t.sinkMu.RUnlock()
} else {
s += fmt.Sprintf(", sinks=?")
}
return s return s
} }
func (t *Track) MarshalJSON() ([]byte, error) {
return json.Marshal(t.String())
}
func (t *Track) WriteRTP(p *rtp.Packet) error { func (t *Track) WriteRTP(p *rtp.Packet) error {
t.sinkMu.RLock() t.sinkMu.RLock()
for _, f := range t.sink { for _, f := range t.sink {

View File

@@ -35,13 +35,17 @@ func NewAPI(address string) (*webrtc.API, error) {
s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled) s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
if address != "" { if address != "" {
ln, err := net.Listen("tcp", address) s.SetNetworkTypes([]webrtc.NetworkType{
if err == nil { webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6,
s.SetNetworkTypes([]webrtc.NetworkType{ webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6, })
webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
})
if ln, err := net.ListenPacket("udp", address); err == nil {
udpMux := webrtc.NewICEUDPMux(nil, ln)
s.SetICEUDPMux(udpMux)
}
if ln, err := net.Listen("tcp", address); err == nil {
tcpMux := webrtc.NewICETCPMux(nil, ln, 8) tcpMux := webrtc.NewICETCPMux(nil, ln, 8)
s.SetICETCPMux(tcpMux) s.SetICETCPMux(tcpMux)
} }

View File

@@ -113,20 +113,12 @@ func (c *Conn) AddCandidate(candidate string) {
} }
func (c *Conn) MarshalJSON() ([]byte, error) { func (c *Conn) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{ info := &streamer.Info{
streamer.JSONType: "WebRTC server consumer", Type: "WebRTC client",
streamer.JSONRemoteAddr: c.remote(), RemoteAddr: c.remote(),
UserAgent: c.UserAgent,
Recv: uint32(c.receive),
Send: uint32(c.send),
} }
return json.Marshal(info)
if c.receive > 0 {
v[streamer.JSONReceive] = c.receive
}
if c.send > 0 {
v[streamer.JSONSend] = c.send
}
if c.UserAgent != "" {
v[streamer.JSONUserAgent] = c.UserAgent
}
return json.Marshal(v)
} }

View File

@@ -13,7 +13,7 @@ import (
"time" "time"
) )
func NewCandidate(address string) (string, error) { func NewCandidate(network, address string) (string, error) {
i := strings.LastIndexByte(address, ':') i := strings.LastIndexByte(address, ':')
if i < 0 { if i < 0 {
return "", errors.New("wrong candidate: " + address) return "", errors.New("wrong candidate: " + address)
@@ -25,13 +25,18 @@ func NewCandidate(address string) (string, error) {
return "", err return "", err
} }
cand, err := ice.NewCandidateHost(&ice.CandidateHostConfig{ config := &ice.CandidateHostConfig{
Network: "tcp", Network: network,
Address: host, Address: host,
Port: i, Port: i,
Component: ice.ComponentRTP, Component: ice.ComponentRTP,
TCPType: ice.TCPTypePassive, }
})
if network == "tcp" {
config.TCPType = ice.TCPTypePassive
}
cand, err := ice.NewCandidateHost(config)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@@ -58,7 +58,7 @@
0, location.pathname.lastIndexOf("/") 0, location.pathname.lastIndexOf("/")
); );
fetch(`${baseUrl}/api/devices`) fetch(`${baseUrl}/api/devices`, {cache: 'no-cache'})
.then(r => r.json()) .then(r => r.json())
.then(data => { .then(data => {
document.querySelector("body > table > tbody").innerHTML = document.querySelector("body > table > tbody").innerHTML =

69
www/editor.html Normal file
View File

@@ -0,0 +1,69 @@
<!DOCTYPE html>
<html>
<head>
<title>File Editor</title>
<meta name="viewport" content="width=device-width, user-scalable=yes, initial-scale=1, maximum-scale=1">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<script src="https://cdnjs.cloudflare.com/ajax/libs/ace/1.14.0/ace.min.js"></script>
<style>
body {
font-family: Arial, Helvetica, sans-serif;
}
body {
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
}
html, body, #config {
width: 100%;
height: 100%;
}
</style>
</head>
<body>
<script src="main.js"></script>
<div>
<button id="save">Save & Restart</button>
</div>
<br>
<div id="config"></div>
<script>
ace.config.set('basePath', 'https://cdnjs.cloudflare.com/ajax/libs/ace/1.14.0/');
const editor = ace.edit("config", {
mode: "ace/mode/yaml",
});
document.getElementById('save').addEventListener('click', () => {
fetch('api/config', {
method: 'POST', body: editor.getValue()
}).then(r => {
if (r.ok) {
alert('OK');
fetch('api/exit', {method: 'POST'});
} else {
r.text().then(alert);
}
});
});
window.addEventListener('load', () => {
fetch('api/config', {cache: 'no-cache'}).then(r => {
if (r.status === 410) {
alert('Config file is not set');
} else if (r.status === 404) {
editor.setValue(''); // config file not exist
} else if (r.ok) {
r.text().then(data => {
editor.setValue(data);
});
} else {
alert(`Unknown error: ${r.statusText} (${r.status})`);
}
});
})
</script>
</body>
</html>

View File

@@ -65,7 +65,7 @@
0, location.pathname.lastIndexOf("/") 0, location.pathname.lastIndexOf("/")
); );
fetch(`${baseUrl}/api/homekit`) fetch(`${baseUrl}/api/homekit`, {cache: 'no-cache'})
.then(r => r.json()) .then(r => r.json())
.then(data => { .then(data => {
document.querySelector("body > table > tbody").innerHTML = document.querySelector("body > table > tbody").innerHTML =

View File

@@ -10,6 +10,7 @@
<style> <style>
body { body {
font-family: Arial, Helvetica, sans-serif; font-family: Arial, Helvetica, sans-serif;
background-color: white;
} }
table { table {
@@ -61,6 +62,7 @@
</head> </head>
<body> <body>
<script src="main.js"></script> <script src="main.js"></script>
<div class="info"></div>
<div class="header"> <div class="header">
<input id="src" type="text" placeholder="url"> <input id="src" type="text" placeholder="url">
<a id="add" href="#">add</a> <a id="add" href="#">add</a>
@@ -89,7 +91,6 @@
'<a href="webrtc.html?src={name}">2-way-aud</a>', '<a href="webrtc.html?src={name}">2-way-aud</a>',
'<a href="api/stream.mp4?src={name}">mp4</a>', '<a href="api/stream.mp4?src={name}">mp4</a>',
'<a href="api/stream.mjpeg?src={name}">mjpeg</a>', '<a href="api/stream.mjpeg?src={name}">mjpeg</a>',
`<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`,
'<a href="api/streams?src={name}">info</a>', '<a href="api/streams?src={name}">info</a>',
'<a href="#" data-name="{name}">delete</a>', '<a href="#" data-name="{name}">delete</a>',
]; ];
@@ -132,11 +133,11 @@
function reload() { function reload() {
const url = new URL("api/streams", location.href); const url = new URL("api/streams", location.href);
fetch(url).then(r => r.json()).then(data => { fetch(url, {cache: 'no-cache'}).then(r => r.json()).then(data => {
tbody.innerHTML = ""; tbody.innerHTML = "";
for (const [name, value] of Object.entries(data)) { for (const [name, value] of Object.entries(data)) {
const online = value ? value.length : 0; const online = value && value.consumers ? value.consumers.length : 0;
const links = templates.map(link => { const links = templates.map(link => {
return link.replace("{name}", encodeURIComponent(name)); return link.replace("{name}", encodeURIComponent(name));
}).join(" "); }).join(" ");
@@ -151,7 +152,21 @@
}); });
} }
reload(); const url = new URL("api", location.href);
fetch(url, {cache: 'no-cache'}).then(r => r.json()).then(data => {
const info = document.querySelector(".info");
info.innerText = `Version: ${data.version}, Config: ${data.config_path}`;
try {
const host = data.host.match(/^[^:]+/)[0];
const port = data.rtsp.listen.match(/[0-9]+$/)[0];
templates.splice(4, 0, `<a href="rtsp://${host}:${port}/{name}">rtsp</a>`);
} catch (e) {
templates.splice(4, 0, `<a href="rtsp://${location.host}:8554/{name}">rtsp</a>`);
}
reload();
});
</script> </script>
</body> </body>
</html> </html>

View File

@@ -47,6 +47,7 @@ nav li {
<li><a href="index.html">Streams</a></li> <li><a href="index.html">Streams</a></li>
<li><a href="devices.html">Devices</a></li> <li><a href="devices.html">Devices</a></li>
<li><a href="homekit.html">HomeKit</a></li> <li><a href="homekit.html">HomeKit</a></li>
<li><a href="editor.html">Config</a></li>
</ul> </ul>
</nav> </nav>
` + document.body.innerHTML; ` + document.body.innerHTML;

View File

@@ -228,16 +228,12 @@ export class VideoRTC extends HTMLElement {
this.video.playsInline = true; this.video.playsInline = true;
this.video.preload = "auto"; this.video.preload = "auto";
this.appendChild(this.video);
// important for second video for mode MP4
this.style.display = "block";
this.style.position = "relative";
this.video.style.display = "block"; // fix bottom margin 4px this.video.style.display = "block"; // fix bottom margin 4px
this.video.style.width = "100%"; this.video.style.width = "100%";
this.video.style.height = "100%" this.video.style.height = "100%"
this.appendChild(this.video);
if (this.background) return; if (this.background) return;
if ("hidden" in document && this.visibilityCheck) { if ("hidden" in document && this.visibilityCheck) {
@@ -392,21 +388,23 @@ export class VideoRTC extends HTMLElement {
sb.mode = "segments"; // segments or sequence sb.mode = "segments"; // segments or sequence
sb.addEventListener("updateend", () => { sb.addEventListener("updateend", () => {
if (sb.updating) return; if (sb.updating) return;
if (bufLen > 0) {
try { try {
sb.appendBuffer(buf.slice(0, bufLen)); if (bufLen > 0) {
} catch (e) { const data = buf.slice(0, bufLen);
// console.debug(e); bufLen = 0;
sb.appendBuffer(data);
} else if (sb.buffered && sb.buffered.length) {
const end = sb.buffered.end(sb.buffered.length - 1) - 15;
const start = sb.buffered.start(0);
if (end > start) {
sb.remove(start, end);
ms.setLiveSeekableRange(end, end + 15);
}
// console.debug("VideoRTC.buffered", start, end);
} }
bufLen = 0; } catch (e) {
} else if (sb.buffered && sb.buffered.length) { // console.debug(e);
const end = sb.buffered.end(sb.buffered.length - 1) - 5;
const start = sb.buffered.start(0);
if (end > start) {
sb.remove(start, end);
ms.setLiveSeekableRange(end, end + 5);
}
// console.debug("VideoRTC.buffered", start, end);
} }
}); });
@@ -504,6 +502,8 @@ export class VideoRTC extends HTMLElement {
* @param ev {Event} * @param ev {Event}
*/ */
onpcvideo(ev) { onpcvideo(ev) {
if (!this.pc) return;
/** @type {HTMLVideoElement} */ /** @type {HTMLVideoElement} */
const video2 = ev.target; const video2 = ev.target;
const state = this.pc.connectionState; const state = this.pc.connectionState;
@@ -543,46 +543,42 @@ export class VideoRTC extends HTMLElement {
onmjpeg() { onmjpeg() {
this.ondata = data => { this.ondata = data => {
this.video.controls = false;
this.video.poster = "data:image/jpeg;base64," + VideoRTC.btoa(data); this.video.poster = "data:image/jpeg;base64," + VideoRTC.btoa(data);
}; };
this.send({type: "mjpeg"}); this.send({type: "mjpeg"});
this.video.controls = false;
} }
onmp4() { onmp4() {
/** @type {HTMLVideoElement} */ /** @type {HTMLCanvasElement} **/
let video2; const canvas = document.createElement("canvas");
/** @type {CanvasRenderingContext2D} */
let context;
this.ondata = data => { /** @type {HTMLVideoElement} */
// first video with default position (set container size) const video2 = document.createElement("video");
// second video with position=absolute and top=0px video2.autoplay = true;
if (video2) { video2.muted = true;
this.removeChild(this.video);
this.video.src = ""; video2.addEventListener("loadeddata", ev => {
this.video = video2; if (!context) {
video2.style.position = ""; canvas.width = video2.videoWidth;
video2.style.top = ""; canvas.height = video2.videoHeight;
context = canvas.getContext('2d');
} }
video2 = this.video.cloneNode(); context.drawImage(video2, 0, 0, canvas.width, canvas.height);
video2.style.position = "absolute";
video2.style.top = "0px";
this.appendChild(video2);
video2.src = "data:video/mp4;base64," + VideoRTC.btoa(data); this.video.controls = false;
video2.play().catch(() => console.log); this.video.poster = canvas.toDataURL("image/jpeg");
};
this.ws.addEventListener("close", () => {
if (!video2) return;
this.removeChild(video2);
video2.src = "";
}); });
this.ondata = data => {
video2.src = "data:video/mp4;base64," + VideoRTC.btoa(data);
};
this.send({type: "mp4", value: this.codecs("mp4")}); this.send({type: "mp4", value: this.codecs("mp4")});
this.video.controls = false;
} }
static btoa(buffer) { static btoa(buffer) {

View File

@@ -22,6 +22,9 @@ class VideoStream extends VideoRTC {
this.innerHTML = ` this.innerHTML = `
<style> <style>
video-stream {
position: relative;
}
.info { .info {
position: absolute; position: absolute;
top: 0; top: 0;