Compare commits

...

48 Commits

Author SHA1 Message Date
Alexey Khit
d8158bc1e3 Update stream log message 2022-11-04 22:27:11 +03:00
Alexey Khit
f4f588d2c6 Add mutex to stream 2022-11-04 22:20:52 +03:00
Alexey Khit
e287b52808 Add check for RTSPtoWeb API 2022-11-04 22:12:00 +03:00
Alexey Khit
ff96257252 Add backchannel=0 option to readme 2022-11-04 21:49:36 +03:00
Alexey Khit
909f21b7e4 Update docs about TURN server 2022-11-04 21:44:12 +03:00
Alexey Khit
7d6a5b44f8 Add frame.jpeg api for MJPEG stream 2022-11-04 21:22:33 +03:00
Alexey Khit
278f7696b6 Make sink private for Track 2022-11-04 20:54:35 +03:00
Alexey Khit
3cbf2465ae Fix loopback producer 2022-11-04 17:52:26 +03:00
Alexey Khit
e9ea7a0b1f Add reconnection feature 2022-11-04 17:23:42 +03:00
Alexey Khit
0231fc3a90 Code refactoring 2022-11-04 17:16:42 +03:00
Alexey Khit
9ef2633840 Add 5 sec timeout to ffmpeg rtsp 2022-11-04 17:06:24 +03:00
Alexey Khit
5a8df3e90a Change RTSP dial timeout to 5 sec 2022-11-04 17:05:57 +03:00
Alexey Khit
a31cbec3eb Fix check RTSP transport prefix 2022-11-04 17:05:30 +03:00
Alexey Khit
54f547977e Add mutext to streams handlers 2022-11-04 17:04:47 +03:00
Alexey Khit
65d91e02bd Move NewLogger to function 2022-11-04 17:03:56 +03:00
Alexey Khit
7fc3f0f641 Ignore srtp init in stack list func 2022-11-04 10:07:50 +03:00
Alexey Khit
7725d5ed31 Rewrite get handlers code 2022-11-04 06:24:39 +03:00
Alexey Khit
6c1b9daa8b Update logs for RTSP packets (disabled) 2022-11-03 11:25:47 +03:00
Alexey Khit
6d432574bf Make main logger global 2022-11-03 10:26:26 +03:00
Alexey Khit
616f69c88b Cache public IP for 5 min 2022-11-02 12:48:36 +03:00
Alexey Khit
f72440712b Add timeout to GetPublicIP func 2022-11-02 12:47:26 +03:00
Alexey Khit
ceed146fb8 Add webrtc sync API 2022-11-02 12:46:39 +03:00
Alexey Khit
f17dadbbbf Rewrite keepalive and add timeouts for RTSP 2022-11-02 10:50:11 +03:00
Alexey Khit
3d4514eab9 Fix loopback for stream 2022-11-02 08:51:54 +03:00
Alexey Khit
2629dccb81 Rename HTTP-FLV 2022-10-31 20:05:28 +03:00
Alexey Khit
04f1aa2900 Fix trash in webrtc.html 2022-10-31 08:34:32 +03:00
Alexey Khit
0dacdea1c3 Add support RTMPT (flv over HTTP) 2022-10-30 17:17:42 +03:00
Alexey Khit
24082b1616 Fix backchannel reconnection issue 2022-10-29 11:33:01 +03:00
Alexey Khit
7964b1743b Fix RTSP processing for Amcrest IP4M-1051 2022-10-29 11:29:53 +03:00
Alexey Khit
49773a1ece Add mjpeg link to stream to main page 2022-10-21 12:01:00 +03:00
Alexey Khit
c97a48a73f Fix mjpeg for 2K cameras 2022-10-21 12:00:00 +03:00
Alexey Khit
e03231ebb4 fix ffmpeg transcoding for Reolink RLC-510A 2022-10-21 10:58:56 +03:00
Alexey Khit
649525a842 Merge remote-tracking branch 'origin/master' 2022-10-21 10:54:54 +03:00
Alex X
d411c1a25c Merge pull request #76 from NickM-27/name_api_stream
Add ability for API to set name of stream
2022-10-14 06:59:31 +03:00
Nicolas Mowen
2f0bcf4ae0 Add ability for API to set name of stream 2022-10-13 14:58:26 -06:00
Alexey Khit
831c504cab Fix memory usage for RTSP processing 2022-10-05 21:15:59 +03:00
Alexey Khit
12925a6bc5 Fix TP-Link Tapo TC70 support 2022-10-05 19:43:36 +03:00
Alexey Khit
e50e929150 Fix empty SPS for mp4 format 2022-10-05 15:35:30 +03:00
Alexey Khit
d0c87e0379 Support SEI NAL from ffmpeg transcoding 2022-10-05 15:35:04 +03:00
Alexey Khit
247b61790e Update EncodeAVC for empty NALs 2022-10-05 15:34:34 +03:00
Alexey Khit
2ec618334a Adds NALs types logger 2022-10-05 15:33:51 +03:00
Alexey Khit
6f9976c806 Rework RTSP and RTMP processing 2022-10-05 13:25:29 +03:00
Alexey Khit
17b3a4cf3a Code refactoring 2022-10-05 13:23:31 +03:00
Alexey Khit
ba30f46c02 Fix FmtpLine for RTMP 2022-10-05 10:50:00 +03:00
Alexey Khit
4134f2a89c Fix timestamp for RTMP 2022-10-05 10:48:37 +03:00
Alexey Khit
a81160bea1 Fix support Escam Q6 camera 2022-10-03 21:12:27 +03:00
Alexey Khit
80392acb78 Fix audio copy #46 2022-09-24 08:24:52 +03:00
Alexey Khit
5afac513b4 Adds codecs section to readme 2022-09-22 00:22:44 +03:00
33 changed files with 820 additions and 418 deletions

View File

@@ -172,6 +172,8 @@ streams:
- rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1 - rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1
``` ```
**PS.** For disable bachannel just add `#backchannel=0` to end of RTSP link.
#### Source: RTMP #### Source: RTMP
You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio. You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio.
@@ -385,13 +387,13 @@ ngrok:
command: ... command: ...
``` ```
**Own TCP-tunnel** **Hard tech way 1. Own TCP-tunnel**
If you have personal VPS, you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config. If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
**Using TURN-server** **Hard tech way 2. Using TURN-server**
TODO... If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can install TURN server (e.g. [coturn](https://github.com/coturn/coturn), config [example](https://github.com/AlexxIT/WebRTC/wiki/Coturn-Example)).
```yaml ```yaml
webrtc: webrtc:
@@ -547,6 +549,30 @@ If you need Web interface protection without Home Assistant Add-on - you need to
PS. Additionally WebRTC opens a lot of random UDP ports for transmit encrypted media. They work without problems on the local network. And sometimes work for external access, even if you haven't opened ports on your router. But for stable external WebRTC access, you need to configure the TCP port. PS. Additionally WebRTC opens a lot of random UDP ports for transmit encrypted media. They work without problems on the local network. And sometimes work for external access, even if you haven't opened ports on your router. But for stable external WebRTC access, you need to configure the TCP port.
## Codecs madness
`AVC/H.264` codec can be played almost anywhere. But `HEVC/H.265` has a lot of limitations in supporting with different devices and browsers. It's all about patents and money, you can't do anything about it.
Device | WebRTC | MSE | MP4
-------|--------|-----|----
*latency* | best | medium | bad
Desktop Chrome | H264 | H264, H265* | H264, H265*
Desktop Safari | H264, H265* | H264 | no
Desktop Edge | H264 | H264, H265* | H264, H265*
Desktop Firefox | H264 | H264 | H264
Desktop Opera | no | H264 | H264
iPhone Safari | H264, H265* | no | no
iPad Safari | H264, H265* | H264 | no
Android Chrome | H264 | H264 | H264
masOS Hass App | no | no | no
- WebRTC audio codecs: `PCMU/8000`, `PCMA/8000`, `OPUS/48000/2`
- MSE/MP4 audio codecs: not supported yet (should be: `AAC`)
- Chrome H265: [read this](https://github.com/StaZhu/enable-chromium-hevc-hardware-decoding)
- Edge H265: [read this](https://www.reddit.com/r/MicrosoftEdge/comments/v9iw8k/enable_hevc_support_in_edge/)
- Desktop Safari H265: Menu > Develop > Experimental > WebRTC H265
- iOS Safari H265: Settings > Safari > Advanced > Experimental > WebRTC H265
## FAQ ## FAQ
**Q. What's the difference between go2rtc, WebRTC Camera and RTSPtoWebRTC?** **Q. What's the difference between go2rtc, WebRTC Camera and RTSPtoWebRTC?**

View File

@@ -85,10 +85,15 @@ var wsHandlers = make(map[string]WSHandler)
func streamsHandler(w http.ResponseWriter, r *http.Request) { func streamsHandler(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
name := r.URL.Query().Get("name")
if name == "" {
name = src
}
switch r.Method { switch r.Method {
case "PUT": case "PUT":
streams.New(src, src) streams.New(name, src)
return return
case "DELETE": case "DELETE":
streams.Delete(src) streams.Delete(src)

View File

@@ -3,6 +3,7 @@ package app
import ( import (
"flag" "flag"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"io" "io"
"os" "os"
@@ -30,10 +31,18 @@ func Init() {
} }
} }
log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"])
modules = cfg.Mod
path, _ := os.Getwd()
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
}
func NewLogger(format string, level string) zerolog.Logger {
var writer io.Writer = os.Stdout var writer io.Writer = os.Stdout
// styles
format := cfg.Mod["format"]
if format != "json" { if format != "json" {
writer = zerolog.ConsoleWriter{ writer = zerolog.ConsoleWriter{
Out: writer, TimeFormat: "15:04:05.000", Out: writer, TimeFormat: "15:04:05.000",
@@ -43,18 +52,12 @@ func Init() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
lvl, err := zerolog.ParseLevel(cfg.Mod["level"]) lvl, err := zerolog.ParseLevel(level)
if err != nil || lvl == zerolog.NoLevel { if err != nil || lvl == zerolog.NoLevel {
lvl = zerolog.InfoLevel lvl = zerolog.InfoLevel
} }
log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl) return zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
modules = cfg.Mod
path, _ := os.Getwd()
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
} }
func LoadConfig(v interface{}) { func LoadConfig(v interface{}) {
@@ -68,15 +71,13 @@ func LoadConfig(v interface{}) {
func GetLogger(module string) zerolog.Logger { func GetLogger(module string) zerolog.Logger {
if s, ok := modules[module]; ok { if s, ok := modules[module]; ok {
lvl, err := zerolog.ParseLevel(s) lvl, err := zerolog.ParseLevel(s)
if err != nil { if err == nil {
log.Warn().Err(err).Msg("[log]")
return log
}
return log.Level(lvl) return log.Level(lvl)
} }
log.Warn().Err(err).Caller().Send()
}
return log return log.Logger
} }
// internal // internal
@@ -84,8 +85,5 @@ func GetLogger(module string) zerolog.Logger {
// data - config content // data - config content
var data []byte var data []byte
// log - main logger
var log zerolog.Logger
// modules log levels // modules log levels
var modules map[string]string var modules map[string]string

View File

@@ -21,6 +21,7 @@ var stackSkip = [][]byte{
[]byte("created by net/http.(*Server).Serve"), // TODO: why two? []byte("created by net/http.(*Server).Serve"), // TODO: why two?
[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"), []byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"),
[]byte("created by github.com/AlexxIT/go2rtc/cmd/srtp.Init"),
// webrtc/api.go // webrtc/api.go
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),

View File

@@ -14,6 +14,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync"
"time" "time"
) )
@@ -23,22 +24,22 @@ func Init() {
return return
} }
rtsp.OnProducer = func(prod streamer.Producer) bool { rtsp.HandleFunc(func(conn *pkg.Conn) bool {
if conn := prod.(*pkg.Conn); conn != nil { waitersMu.Lock()
if waiter := waiters[conn.URL.Path]; waiter != nil { waiter := waiters[conn.URL.Path]
waiter <- prod waitersMu.Unlock()
return true
} if waiter == nil {
}
return false return false
} }
waiter <- conn
return true
})
streams.HandleFunc("exec", Handle) streams.HandleFunc("exec", Handle)
log = app.GetLogger("exec") log = app.GetLogger("exec")
// TODO: add sync.Mutex
waiters = map[string]chan streamer.Producer{}
} }
func Handle(url string) (streamer.Producer, error) { func Handle(url string) (streamer.Producer, error) {
@@ -60,8 +61,15 @@ func Handle(url string) (streamer.Producer, error) {
ch := make(chan streamer.Producer) ch := make(chan streamer.Producer)
waitersMu.Lock()
waiters[path] = ch waiters[path] = ch
defer delete(waiters, path) waitersMu.Unlock()
defer func() {
waitersMu.Lock()
delete(waiters, path)
waitersMu.Unlock()
}()
log.Debug().Str("url", url).Msg("[exec] run") log.Debug().Str("url", url).Msg("[exec] run")
@@ -86,4 +94,5 @@ func Handle(url string) (streamer.Producer, error) {
// internal // internal
var log zerolog.Logger var log zerolog.Logger
var waiters map[string]chan streamer.Producer var waiters = map[string]chan streamer.Producer{}
var waitersMu sync.Mutex

View File

@@ -23,7 +23,7 @@ func Init() {
// inputs // inputs
"file": "-re -stream_loop -1 -i {input}", "file": "-re -stream_loop -1 -i {input}",
"http": "-fflags nobuffer -flags low_delay -i {input}", "http": "-fflags nobuffer -flags low_delay -i {input}",
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}", "rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -timeout 5000000 -i {input}",
// output // output
"output": "-rtsp_transport tcp -f rtsp {output}", "output": "-rtsp_transport tcp -f rtsp {output}",
@@ -121,7 +121,7 @@ func Init() {
for _, audio := range query["audio"] { for _, audio := range query["audio"] {
if audio == "copy" { if audio == "copy" {
s += " -codec:v copy" s += " -codec:a copy"
} else { } else {
s += " " + tpl[audio] s += " " + tpl[audio]
} }

View File

@@ -55,6 +55,10 @@ func initAPI() {
// /stream/{id}/channel/0/webrtc // /stream/{id}/channel/0/webrtc
default: default:
i := strings.IndexByte(r.RequestURI[8:], '/') i := strings.IndexByte(r.RequestURI[8:], '/')
if i <= 0 {
log.Warn().Msgf("wrong request: %s", r.RequestURI)
return
}
name := r.RequestURI[8 : 8+i] name := r.RequestURI[8 : 8+i]
stream := streams.Get(name) stream := streams.Get(name)

View File

@@ -10,12 +10,47 @@ import (
) )
func Init() { func Init() {
api.HandleFunc("api/stream.mjpeg", handler) api.HandleFunc("api/frame.jpeg", handlerKeyframe)
api.HandleFunc("api/stream.mjpeg", handlerStream)
}
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
return
}
exit := make(chan []byte)
cons := &mjpeg.Consumer{}
cons.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case []byte:
exit <- msg
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return
}
data := <-exit
stream.RemoveConsumer(cons)
w.Header().Set("Content-Type", "image/jpeg")
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send()
}
} }
const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: " const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: "
func handler(w http.ResponseWriter, r *http.Request) { func handlerStream(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {

View File

@@ -8,6 +8,8 @@ import (
func Init() { func Init() {
streams.HandleFunc("rtmp", handle) streams.HandleFunc("rtmp", handle)
streams.HandleFunc("http", handle)
streams.HandleFunc("https", handle)
} }
func handle(url string) (streamer.Producer, error) { func handle(url string) (streamer.Producer, error) {

View File

@@ -32,20 +32,43 @@ func Init() {
// RTSP server support // RTSP server support
address := conf.Mod.Listen address := conf.Mod.Listen
if address != "" { if address == "" {
return
}
ln, err := net.Listen("tcp", address)
if err != nil {
log.Error().Err(err).Msg("[rtsp] listen")
return
}
_, Port, _ = net.SplitHostPort(address) _, Port, _ = net.SplitHostPort(address)
go worker(address) log.Info().Str("addr", address).Msg("[rtsp] listen")
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
} }
go tcpHandler(conn)
}
}()
}
type Handler func(conn *rtsp.Conn) bool
func HandleFunc(handler Handler) {
handlers = append(handlers, handler)
} }
var Port string var Port string
var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
// internal // internal
var log zerolog.Logger var log zerolog.Logger
var handlers []Handler
func rtspHandler(url string) (streamer.Producer, error) { func rtspHandler(url string) (streamer.Producer, error) {
backchannel := true backchannel := true
@@ -84,10 +107,10 @@ func rtspHandler(url string) (streamer.Producer, error) {
} }
// second try without backchannel, we need to reconnect // second try without backchannel, we need to reconnect
conn.Backchannel = false
if err = conn.Dial(); err != nil { if err = conn.Dial(); err != nil {
return nil, err return nil, err
} }
conn.Backchannel = false
if err = conn.Describe(); err != nil { if err = conn.Describe(); err != nil {
return nil, err return nil, err
} }
@@ -96,24 +119,13 @@ func rtspHandler(url string) (streamer.Producer, error) {
return conn, nil return conn, nil
} }
func worker(address string) { func tcpHandler(c net.Conn) {
srv, err := tcp.NewServer(address)
if err != nil {
log.Error().Err(err).Msg("[rtsp] listen")
return
}
log.Info().Str("addr", address).Msg("[rtsp] listen")
srv.Listen(func(msg interface{}) {
switch msg.(type) {
case net.Conn:
var name string var name string
var onDisconnect func() var closer func()
trace := log.Trace().Enabled() trace := log.Trace().Enabled()
conn := rtsp.NewServer(msg.(net.Conn)) conn := rtsp.NewServer(c)
conn.Listen(func(msg interface{}) { conn.Listen(func(msg interface{}) {
if trace { if trace {
switch msg := msg.(type) { switch msg := msg.(type) {
@@ -128,43 +140,37 @@ func worker(address string) {
case rtsp.MethodDescribe: case rtsp.MethodDescribe:
name = conn.URL.Path[1:] name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new consumer") stream := streams.Get(name)
stream := streams.Get(name) // TODO: rewrite
if stream == nil { if stream == nil {
return return
} }
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
initMedias(conn) initMedias(conn)
if err = stream.AddConsumer(conn); err != nil { if err := stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]") log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
return return
} }
onDisconnect = func() { closer = func() {
stream.RemoveConsumer(conn) stream.RemoveConsumer(conn)
} }
case rtsp.MethodAnnounce: case rtsp.MethodAnnounce:
if OnProducer != nil {
if OnProducer(conn) {
return
}
}
name = conn.URL.Path[1:] name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
stream := streams.Get(name) stream := streams.Get(name)
if stream == nil { if stream == nil {
return return
} }
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
stream.AddProducer(conn) stream.AddProducer(conn)
onDisconnect = func() { closer = func() {
stream.RemoveProducer(conn) stream.RemoveProducer(conn)
} }
@@ -173,24 +179,29 @@ func worker(address string) {
} }
}) })
if err = conn.Accept(); err != nil { if err := conn.Accept(); err != nil {
log.Warn().Err(err).Msg("[rtsp] accept") log.Warn().Err(err).Caller().Send()
_ = conn.Close()
return return
} }
if err = conn.Handle(); err != nil { for _, handler := range handlers {
//log.Warn().Err(err).Msg("[rtsp] handle server") if handler(conn) {
return
}
} }
if onDisconnect != nil { if closer != nil {
onDisconnect() if err := conn.Handle(); err != nil {
log.Debug().Err(err).Caller().Send()
} }
closer()
log.Debug().Str("stream", name).Msg("[rtsp] disconnect") log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
} }
})
srv.Serve() _ = conn.Close()
} }
func initMedias(conn *rtsp.Conn) { func initMedias(conn *rtsp.Conn) {

View File

@@ -4,30 +4,36 @@ import (
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync"
) )
type Handler func(url string) (streamer.Producer, error) type Handler func(url string) (streamer.Producer, error)
var handlers map[string]Handler var handlers = map[string]Handler{}
var handlersMu sync.Mutex
func HandleFunc(scheme string, handler Handler) { func HandleFunc(scheme string, handler Handler) {
if handlers == nil { handlersMu.Lock()
handlers = make(map[string]Handler)
}
handlers[scheme] = handler handlers[scheme] = handler
handlersMu.Unlock()
}
func getHandler(url string) Handler {
i := strings.IndexByte(url, ':')
if i <= 0 { // TODO: i < 4 ?
return nil
}
handlersMu.Lock()
defer handlersMu.Unlock()
return handlers[url[:i]]
} }
func HasProducer(url string) bool { func HasProducer(url string) bool {
i := strings.IndexByte(url, ':') return getHandler(url) != nil
if i <= 0 { // TODO: i < 4 ?
return false
}
return handlers[url[:i]] != nil
} }
func GetProducer(url string) (streamer.Producer, error) { func GetProducer(url string) (streamer.Producer, error) {
i := strings.IndexByte(url, ':') handler := getHandler(url)
handler := handlers[url[:i]]
if handler == nil { if handler == nil {
return nil, fmt.Errorf("unsupported scheme: %s", url) return nil, fmt.Errorf("unsupported scheme: %s", url)
} }

View File

@@ -4,6 +4,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync" "sync"
"time"
) )
type state byte type state byte
@@ -25,7 +26,8 @@ type Producer struct {
tracks []*streamer.Track tracks []*streamer.Track
state state state state
mx sync.Mutex mu sync.Mutex
restart *time.Timer
} }
func (p *Producer) SetSource(s string) { func (p *Producer) SetSource(s string) {
@@ -36,16 +38,16 @@ func (p *Producer) SetSource(s string) {
} }
func (p *Producer) GetMedias() []*streamer.Media { func (p *Producer) GetMedias() []*streamer.Media {
p.mx.Lock() p.mu.Lock()
defer p.mx.Unlock() defer p.mu.Unlock()
if p.state == stateNone { if p.state == stateNone {
log.Debug().Str("url", p.url).Msg("[streams] probe producer") log.Debug().Msgf("[streams] probe producer url=%s", p.url)
var err error var err error
p.element, err = GetProducer(p.url) p.element, err = GetProducer(p.url)
if err != nil || p.element == nil { if err != nil || p.element == nil {
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer") log.Error().Err(err).Caller().Send()
return nil return nil
} }
@@ -56,14 +58,17 @@ func (p *Producer) GetMedias() []*streamer.Media {
} }
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
p.mx.Lock() p.mu.Lock()
defer p.mx.Unlock() defer p.mu.Unlock()
if p.state == stateMedias { if p.state == stateNone {
p.state = stateTracks return nil
} }
track := p.element.GetTrack(media, codec) track := p.element.GetTrack(media, codec)
if track == nil {
return nil
}
for _, t := range p.tracks { for _, t := range p.tracks {
if track == t { if track == t {
@@ -71,6 +76,10 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
} }
} }
if p.state == stateMedias {
p.state = stateTracks
}
p.tracks = append(p.tracks, track) p.tracks = append(p.tracks, track)
return track return track
@@ -79,36 +88,89 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
// internals // internals
func (p *Producer) start() { func (p *Producer) start() {
p.mx.Lock() p.mu.Lock()
defer p.mx.Unlock() defer p.mu.Unlock()
if p.state != stateTracks { if p.state != stateTracks {
return return
} }
log.Debug().Str("url", p.url).Msg("[streams] start producer") log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart p.state = stateStart
go func() { go func() {
// safe read element while mu locked
if err := p.element.Start(); err != nil { if err := p.element.Start(); err != nil {
log.Warn().Err(err).Str("url", p.url).Msg("[streams] start") log.Warn().Err(err).Caller().Send()
} }
p.reconnect()
}()
}
func (p *Producer) reconnect() {
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateStart {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return
}
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
var err error
p.element, err = GetProducer(p.url)
if err != nil || p.element == nil {
log.Debug().Err(err).Caller().Send()
// TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
return
}
medias := p.element.GetMedias()
// convert all old producer tracks to new tracks
for i, oldTrack := range p.tracks {
// match new element medias with old track codec
for _, media := range medias {
codec := media.MatchCodec(oldTrack.Codec)
if codec == nil {
continue
}
// move sink from old track to new track
newTrack := p.element.GetTrack(media, codec)
newTrack.GetSink(oldTrack)
p.tracks[i] = newTrack
break
}
}
go func() {
if err = p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}() }()
} }
func (p *Producer) stop() { func (p *Producer) stop() {
p.mx.Lock() p.mu.Lock()
log.Debug().Str("url", p.url).Msg("[streams] stop producer") log.Debug().Msgf("[streams] stop producer url=%s", p.url)
if p.element != nil { if p.element != nil {
_ = p.element.Stop() _ = p.element.Stop()
p.element = nil p.element = nil
} else {
log.Warn().Str("url", p.url).Msg("[streams] stop empty producer")
} }
p.tracks = nil if p.restart != nil {
p.state = stateNone p.restart.Stop()
p.restart = nil
}
p.mx.Unlock() p.state = stateNone
p.tracks = nil
p.mu.Unlock()
} }

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"sync"
) )
type Consumer struct { type Consumer struct {
@@ -14,6 +15,7 @@ type Consumer struct {
type Stream struct { type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -51,18 +53,19 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
ic := len(s.consumers) ic := len(s.consumers)
consumer := &Consumer{element: cons} consumer := &Consumer{element: cons}
var producers []*Producer // matched producers for consumer
// Step 1. Get consumer medias // Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() { for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia). log.Trace().Stringer("media", consMedia).
Msgf("[streams] consumer:%d:%d candidate", ic, icc) Msgf("[streams] consumer=%d candidate=%d", ic, icc)
producers: producers:
for ip, prod := range s.producers { for ip, prod := range s.producers {
// Step 2. Get producer medias (not tracks yet) // Step 2. Get producer medias (not tracks yet)
for ipc, prodMedia := range prod.GetMedias() { for ipc, prodMedia := range prod.GetMedias() {
log.Trace().Stringer("media", prodMedia). log.Trace().Stringer("media", prodMedia).
Msgf("[streams] producer:%d:%d candidate", ip, ipc) Msgf("[streams] producer=%d candidate=%d", ip, ipc)
// Step 3. Match consumer/producer codecs list // Step 3. Match consumer/producer codecs list
prodCodec := prodMedia.MatchMedia(consMedia) prodCodec := prodMedia.MatchMedia(consMedia)
@@ -81,20 +84,23 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
consTrack := consumer.element.AddTrack(consMedia, prodTrack) consTrack := consumer.element.AddTrack(consMedia, prodTrack)
consumer.tracks = append(consumer.tracks, consTrack) consumer.tracks = append(consumer.tracks, consTrack)
producers = append(producers, prod)
break producers break producers
} }
} }
} }
} }
// can't match tracks for consumer if len(producers) == 0 {
if len(consumer.tracks) == 0 {
return errors.New("couldn't find the matching tracks") return errors.New("couldn't find the matching tracks")
} }
s.mu.Lock()
s.consumers = append(s.consumers, consumer) s.consumers = append(s.consumers, consumer)
s.mu.Unlock()
for _, prod := range s.producers { // there may be duplicates, but that's not a problem
for _, prod := range producers {
prod.start() prod.start()
} }
@@ -102,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
func (s *Stream) RemoveConsumer(cons streamer.Consumer) { func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
s.mu.Lock()
for i, consumer := range s.consumers { for i, consumer := range s.consumers {
if consumer == nil { if consumer == nil {
log.Warn().Msgf("empty consumer: %+v\n", s) log.Warn().Msgf("empty consumer: %+v\n", s)
@@ -127,7 +134,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
var sink bool var sink bool
for _, track := range producer.tracks { for _, track := range producer.tracks {
if len(track.Sink) > 0 { if track.HasSink() {
sink = true sink = true
} }
} }
@@ -135,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
producer.stop() producer.stop()
} }
} }
s.mu.Unlock()
} }
func (s *Stream) AddProducer(prod streamer.Producer) { func (s *Stream) AddProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateTracks} producer := &Producer{element: prod, state: stateTracks}
s.mu.Lock()
s.producers = append(s.producers, producer) s.producers = append(s.producers, producer)
s.mu.Unlock()
} }
func (s *Stream) RemoveProducer(prod streamer.Producer) { func (s *Stream) RemoveProducer(prod streamer.Producer) {
s.mu.Lock()
for i, producer := range s.producers { for i, producer := range s.producers {
if producer.element == prod { if producer.element == prod {
s.removeProducer(i) s.removeProducer(i)
break break
} }
} }
s.mu.Unlock()
} }
func (s *Stream) Active() bool { //func (s *Stream) Active() bool {
if len(s.consumers) > 0 { // if len(s.consumers) > 0 {
return true // return true
} // }
//
for _, prod := range s.producers { // for _, prod := range s.producers {
if prod.element != nil { // if prod.element != nil {
return true // return true
} // }
} // }
//
return false // return false
} //}
func (s *Stream) MarshalJSON() ([]byte, error) { func (s *Stream) MarshalJSON() ([]byte, error) {
var v []interface{} var v []interface{}
s.mu.Lock()
for _, prod := range s.producers { for _, prod := range s.producers {
if prod.element != nil { if prod.element != nil {
v = append(v, prod.element) v = append(v, prod.element)
@@ -176,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
// cons.element always not nil // cons.element always not nil
v = append(v, cons.element) v = append(v, cons.element)
} }
s.mu.Unlock()
if len(v) == 0 { if len(v) == 0 {
v = nil v = nil
} }

View File

@@ -8,7 +8,9 @@ import (
"github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3" pion "github.com/pion/webrtc/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"io/ioutil"
"net" "net"
"net/http"
) )
func Init() { func Init() {
@@ -55,6 +57,8 @@ func Init() {
api.HandleWS(webrtc.MsgTypeOffer, offerHandler) api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler) api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
api.HandleFunc("api/webrtc", syncHandler)
} }
var Port string var Port string
@@ -137,6 +141,32 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
ctx.Consumer = conn ctx.Consumer = conn
} }
func syncHandler(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("src")
stream := streams.Get(url)
if stream == nil {
return
}
// get offer
offer, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
answer, err := ExchangeSDP(stream, string(offer), r.UserAgent())
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
// send SDP to client
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Caller().Send()
}
}
func ExchangeSDP( func ExchangeSDP(
stream *streams.Stream, offer string, userAgent string, stream *streams.Stream, offer string, userAgent string,
) (answer string, err error) { ) (answer string, err error) {

View File

@@ -1,3 +1,11 @@
# H264
Access Unit (AU) can contain one or multiple NAL Unit:
1. [SEI,] SPS, PPS, IFrame, [IFrame...]
2. BFrame, [BFrame...]
3. IFrame, [IFrame...]
## RTP H264 ## RTP H264
Camera | NALu Camera | NALu

View File

@@ -12,49 +12,38 @@ func IsAVC(codec *streamer.Codec) bool {
return codec.PayloadType == PayloadTypeAVC return codec.PayloadType == PayloadTypeAVC
} }
func EncodeAVC(raw []byte) (avc []byte) { func EncodeAVC(nals ...[]byte) (avc []byte) {
avc = make([]byte, len(raw)+4) var i, n int
binary.BigEndian.PutUint32(avc, uint32(len(raw)))
copy(avc[4:], raw) for _, nal := range nals {
if i = len(nal); i > 0 {
n += 4 + i
}
}
avc = make([]byte, n)
n = 0
for _, nal := range nals {
if i = len(nal); i > 0 {
binary.BigEndian.PutUint32(avc[n:], uint32(i))
n += 4 + copy(avc[n+4:], nal)
}
}
return return
} }
func RepairAVC(track *streamer.Track) streamer.WrapperFunc { func RepairAVC(track *streamer.Track) streamer.WrapperFunc {
sps, pps := GetParameterSet(track.Codec.FmtpLine) sps, pps := GetParameterSet(track.Codec.FmtpLine)
sps = EncodeAVC(sps) ps := EncodeAVC(sps, pps)
pps = EncodeAVC(pps)
return func(push streamer.WriterFunc) streamer.WriterFunc { return func(push streamer.WriterFunc) streamer.WriterFunc {
return func(packet *rtp.Packet) (err error) { return func(packet *rtp.Packet) (err error) {
naluType := NALUType(packet.Payload) if NALUType(packet.Payload) == NALUTypeIFrame {
switch naluType { packet.Payload = Join(ps, packet.Payload)
case NALUTypeSPS:
sps = packet.Payload
return
case NALUTypePPS:
pps = packet.Payload
return
} }
return push(packet)
var clone rtp.Packet
if naluType == NALUTypeIFrame {
clone = *packet
clone.Payload = sps
if err = push(&clone); err != nil {
return
}
clone = *packet
clone.Payload = pps
if err = push(&clone); err != nil {
return
}
}
clone = *packet
clone.Payload = packet.Payload
return push(&clone)
} }
} }
} }
@@ -63,12 +52,12 @@ func SplitAVC(data []byte) [][]byte {
var nals [][]byte var nals [][]byte
for { for {
// get AVC length // get AVC length
size := int(binary.BigEndian.Uint32(data)) size := int(binary.BigEndian.Uint32(data)) + 4
// check if multiple items in one packet // check if multiple items in one packet
if size+4 < len(data) { if size < len(data) {
nals = append(nals, data[:size+4]) nals = append(nals, data[:size])
data = data[size+4:] data = data[size:]
} else { } else {
nals = append(nals, data) nals = append(nals, data)
break break
@@ -76,3 +65,18 @@ func SplitAVC(data []byte) [][]byte {
} }
return nals return nals
} }
func Types(data []byte) []byte {
var types []byte
for {
types = append(types, NALUType(data))
size := 4 + int(binary.BigEndian.Uint32(data))
if size < len(data) {
data = data[size:]
} else {
break
}
}
return types
}

View File

@@ -2,24 +2,49 @@ package h264
import ( import (
"encoding/base64" "encoding/base64"
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
) )
const ( const (
NALUTypePFrame = 1 NALUTypePFrame = 1 // Coded slice of a non-IDR picture
NALUTypeIFrame = 5 NALUTypeIFrame = 5 // Coded slice of an IDR picture
NALUTypeSEI = 6 NALUTypeSEI = 6 // Supplemental enhancement information (SEI)
NALUTypeSPS = 7 NALUTypeSPS = 7 // Sequence parameter set
NALUTypePPS = 8 NALUTypePPS = 8 // Picture parameter set
NALUTypeAUD = 9 // Access unit delimiter
) )
func NALUType(b []byte) byte { func NALUType(b []byte) byte {
return b[4] & 0x1F return b[4] & 0x1F
} }
// IsKeyframe - check if any NALU in one AU is Keyframe
func IsKeyframe(b []byte) bool { func IsKeyframe(b []byte) bool {
return NALUType(b) == NALUTypeIFrame for {
switch NALUType(b) {
case NALUTypePFrame:
return false
case NALUTypeIFrame:
return true
}
size := int(binary.BigEndian.Uint32(b)) + 4
if size < len(b) {
b = b[size:]
continue
} else {
return false
}
}
}
func Join(ps, iframe []byte) []byte {
b := make([]byte, len(ps)+len(iframe))
i := copy(b, ps)
copy(b[i:], iframe)
return b
} }
func GetProfileLevelID(fmtp string) string { func GetProfileLevelID(fmtp string) string {

View File

@@ -13,100 +13,72 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
depack := &codecs.H264Packet{IsAVC: true} depack := &codecs.H264Packet{IsAVC: true}
sps, pps := GetParameterSet(track.Codec.FmtpLine) sps, pps := GetParameterSet(track.Codec.FmtpLine)
sps = EncodeAVC(sps) ps := EncodeAVC(sps, pps)
pps = EncodeAVC(pps)
var buffer []byte buf := make([]byte, 0, 512*1024) // 512K
return func(push streamer.WriterFunc) streamer.WriterFunc { return func(push streamer.WriterFunc) streamer.WriterFunc {
return func(packet *rtp.Packet) error { return func(packet *rtp.Packet) error {
//nalUnitType := packet.Payload[0] & 0x1F //log.Printf("[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v", track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker)
//fmt.Printf(
// "[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d\n",
// track.Codec.Name, nalUnitType, len(packet.Payload), packet.Timestamp,
// packet.PayloadType, packet.SSRC, packet.SequenceNumber,
//)
data, err := depack.Unmarshal(packet.Payload) payload, err := depack.Unmarshal(packet.Payload)
if len(data) == 0 || err != nil { if len(payload) == 0 || err != nil {
return nil return nil
} }
for { // Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true
unitType := NALUType(data) if packet.Marker {
//fmt.Printf("[H264] nalu: %2d, size: %6d\n", unitType, len(data)) switch NALUType(payload) {
case NALUTypeSPS, NALUTypePPS:
buf = append(buf, payload...)
return nil
}
}
// multiple 5 and 1 in one payload is OK if len(buf) == 0 {
if unitType != NALUTypeIFrame && unitType != NALUTypePFrame { // Amcrest IP4M-1051: 9, 7, 8, 6, 28...
i := int(binary.BigEndian.Uint32(data)) + 4 // Amcrest IP4M-1051: 9, 6, 1
if i < len(data) { switch NALUType(payload) {
data0 := data[:i] // NAL Unit with AVC header case NALUTypeIFrame:
data = data[i:] // fix IFrame without SPS,PPS
switch unitType { buf = append(buf, ps...)
case NALUTypeSPS: case NALUTypeSEI, NALUTypeAUD:
sps = data0 // fix ffmpeg with transcoding first frame
continue i := int(4 + binary.BigEndian.Uint32(payload))
case NALUTypePPS:
pps = data0 // check if only one NAL (fix ffmpeg transcoding for Reolink RLC-510A)
continue if i == len(payload) {
case NALUTypeSEI: return nil
// some unnecessary text information }
continue
payload = payload[i:]
if NALUType(payload) == NALUTypeIFrame {
buf = append(buf, ps...)
} }
} }
} }
switch unitType { // collect all NALs for Access Unit
case NALUTypeSPS:
sps = data
return nil
case NALUTypePPS:
pps = data
return nil
case NALUTypeSEI:
// some unnecessary text information
return nil
}
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
// and every NALU will be sliced to multiple NALUs
if !packet.Marker { if !packet.Marker {
buffer = append(buffer, data...) buf = append(buf, payload...)
return nil return nil
} }
if buffer != nil { if len(buf) > 0 {
buffer = append(buffer, data...) payload = append(buf, payload...)
data = buffer buf = buf[:0]
buffer = nil
} }
var clone rtp.Packet //log.Printf("[AVC] %v, len: %d", Types(payload), len(payload))
if unitType == NALUTypeIFrame { clone := *packet
clone = *packet
clone.Version = RTPPacketVersionAVC clone.Version = RTPPacketVersionAVC
clone.Payload = sps clone.Payload = payload
if err = push(&clone); err != nil {
return err
}
clone = *packet
clone.Version = RTPPacketVersionAVC
clone.Payload = pps
if err = push(&clone); err != nil {
return err
}
}
clone = *packet
clone.Version = RTPPacketVersionAVC
clone.Payload = data
return push(&clone) return push(&clone)
} }
} }
} }
}
func RTPPay(mtu uint16) streamer.WrapperFunc { func RTPPay(mtu uint16) streamer.WrapperFunc {
payloader := &Payloader{IsAVC: true} payloader := &Payloader{IsAVC: true}
@@ -117,11 +89,12 @@ func RTPPay(mtu uint16) streamer.WrapperFunc {
return func(packet *rtp.Packet) error { return func(packet *rtp.Packet) error {
if packet.Version == RTPPacketVersionAVC { if packet.Version == RTPPacketVersionAVC {
payloads := payloader.Payload(mtu, packet.Payload) payloads := payloader.Payload(mtu, packet.Payload)
last := len(payloads) - 1
for i, payload := range payloads { for i, payload := range payloads {
clone := rtp.Packet{ clone := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: 2, Version: 2,
Marker: i == len(payloads)-1, Marker: i == last,
//PayloadType: packet.PayloadType, //PayloadType: packet.PayloadType,
SequenceNumber: sequencer.NextSequenceNumber(), SequenceNumber: sequencer.NextSequenceNumber(),
Timestamp: packet.Timestamp, Timestamp: packet.Timestamp,

3
pkg/httpflv/README.md Normal file
View File

@@ -0,0 +1,3 @@
## Useful links
- https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779

100
pkg/httpflv/httpflv.go Normal file
View File

@@ -0,0 +1,100 @@
package httpflv
import (
"bufio"
"errors"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/flv/flvio"
"github.com/deepch/vdk/utils/bits/pio"
"io"
"net/http"
)
func Dial(uri string) (*Conn, error) {
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
c := Conn{
conn: res.Body,
reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize),
buf: make([]byte, 256),
}
if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil {
return nil, err
}
flags, n, err := flvio.ParseFileHeader(c.buf)
if err != nil {
return nil, err
}
if flags&flvio.FILE_HAS_VIDEO == 0 {
return nil, errors.New("not supported")
}
if _, err = c.reader.Discard(n); err != nil {
return nil, err
}
return &c, nil
}
type Conn struct {
conn io.ReadCloser
reader *bufio.Reader
buf []byte
}
func (c *Conn) Streams() ([]av.CodecData, error) {
for {
tag, _, err := flvio.ReadTag(c.reader, c.buf)
if err != nil {
return nil, err
}
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR {
continue
}
stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data)
if err != nil {
return nil, err
}
return []av.CodecData{stream}, nil
}
}
func (c *Conn) ReadPacket() (av.Packet, error) {
for {
tag, ts, err := flvio.ReadTag(c.reader, c.buf)
if err != nil {
return av.Packet{}, err
}
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU {
continue
}
return av.Packet{
Idx: 0,
Data: tag.Data,
CompositionTime: flvio.TsToTime(tag.CompositionTime),
IsKeyFrame: tag.FrameType == flvio.FRAME_KEY,
Time: flvio.TsToTime(ts),
}, nil
}
}
func (c *Conn) Close() (err error) {
return c.conn.Close()
}

View File

@@ -245,14 +245,12 @@ func (c *Client) worker() {
time.Sleep(d) time.Sleep(d)
// can be SPS, PPS and IFrame in one packet // can be SPS, PPS and IFrame in one packet
for _, payload := range h264.SplitAVC(data[:entry.Size]) {
packet := &rtp.Packet{ packet := &rtp.Packet{
// ivideon clockrate=1000, RTP clockrate=90000 // ivideon clockrate=1000, RTP clockrate=90000
Header: rtp.Header{Timestamp: ts * 90}, Header: rtp.Header{Timestamp: ts * 90},
Payload: payload, Payload: data[:entry.Size],
} }
_ = track.WriteRTP(packet) _ = track.WriteRTP(packet)
}
data = data[entry.Size:] data = data[entry.Size:]
ts += entry.Duration ts += entry.Duration

View File

@@ -61,8 +61,16 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
lqt, cqt = MakeTables(q) lqt, cqt = MakeTables(q)
} }
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
// The maximum width is 2040 pixels.
w := uint16(packet.Payload[6]) << 3 w := uint16(packet.Payload[6]) << 3
h := uint16(packet.Payload[7]) << 3 h := uint16(packet.Payload[7]) << 3
// fix 2560x1920 and 2560x1440
if w == 512 && (h == 1920 || h == 1440) {
w = 2560
}
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h) //fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
header = MakeHeaders(t, w, h, lqt, cqt) header = MakeHeaders(t, w, h, lqt, cqt)
} }

View File

@@ -53,16 +53,17 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
switch h264.NALUType(packet.Payload) { if c.muxer == nil {
case h264.NALUTypeIFrame:
c.start = true
case h264.NALUTypePFrame:
if !c.start {
return nil return nil
} }
default:
if !c.start {
if h264.IsKeyframe(packet.Payload) {
c.start = true
} else {
return nil return nil
} }
}
buf := c.muxer.Marshal(packet) buf := c.muxer.Marshal(packet)
c.send += len(buf) c.send += len(buf)
@@ -71,10 +72,13 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !h264.IsAVC(codec) { var wrapper streamer.WrapperFunc
wrapper := h264.RTPDepay(track) if h264.IsAVC(codec) {
push = wrapper(push) wrapper = h264.RepairAVC(track)
} else {
wrapper = h264.RTPDepay(track)
} }
push = wrapper(push)
return track.Bind(push) return track.Bind(push)

View File

@@ -46,10 +46,11 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
case streamer.CodecH264: case streamer.CodecH264:
sps, pps := h264.GetParameterSet(codec.FmtpLine) sps, pps := h264.GetParameterSet(codec.FmtpLine)
if sps == nil { if sps == nil {
return nil, fmt.Errorf("empty SPS: %#v", codec) // some dummy SPS and PPS not a problem
sps = []byte{0x67, 0x42, 0x00, 0x0a, 0xf8, 0x41, 0xa2}
pps = []byte{0x68, 0xce, 0x38, 0x80}
} }
// TODO: remove
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps) codecData, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -5,15 +5,24 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/httpflv"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/av" "github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtmp" "github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp" "github.com/pion/rtp"
"strings"
"time" "time"
) )
// Conn for RTMP and RTMPT (flv over HTTP)
type Conn interface {
Streams() (streams []av.CodecData, err error)
ReadPacket() (pkt av.Packet, err error)
Close() (err error)
}
type Client struct { type Client struct {
streamer.Element streamer.Element
@@ -22,7 +31,7 @@ type Client struct {
medias []*streamer.Media medias []*streamer.Media
tracks []*streamer.Track tracks []*streamer.Track
conn *rtmp.Conn conn Conn
closed bool closed bool
receive int receive int
@@ -33,7 +42,12 @@ func NewClient(uri string) *Client {
} }
func (c *Client) Dial() (err error) { func (c *Client) Dial() (err error) {
if strings.HasPrefix(c.URI, "http") {
c.conn, err = httpflv.Dial(c.URI)
} else {
c.conn, err = rtmp.Dial(c.URI) c.conn, err = rtmp.Dial(c.URI)
}
if err != nil { if err != nil {
return return
} }
@@ -47,10 +61,14 @@ func (c *Client) Dial() (err error) {
for _, stream := range streams { for _, stream := range streams {
switch stream.Type() { switch stream.Type() {
case av.H264: case av.H264:
cd := stream.(h264parser.CodecData) info := stream.(h264parser.CodecData).RecordInfo
fmtp := "sprop-parameter-sets=" +
base64.StdEncoding.EncodeToString(cd.RecordInfo.SPS[0]) + "," + fmtp := fmt.Sprintf(
base64.StdEncoding.EncodeToString(cd.RecordInfo.PPS[0]) "profile-level-id=%02X%02X%02X;sprop-parameter-sets=%s,%s",
info.AVCProfileIndication, info.ProfileCompatibility, info.AVCLevelIndication,
base64.StdEncoding.EncodeToString(info.SPS[0]),
base64.StdEncoding.EncodeToString(info.PPS[0]),
)
codec := &streamer.Codec{ codec := &streamer.Codec{
Name: streamer.CodecH264, Name: streamer.CodecH264,
@@ -129,24 +147,16 @@ func (c *Client) Handle() (err error) {
track := c.tracks[int(pkt.Idx)] track := c.tracks[int(pkt.Idx)]
timestamp := uint32(pkt.Time / time.Duration(track.Codec.ClockRate)) // convert seconds to RTP timestamp
timestamp := uint32(pkt.Time * time.Duration(track.Codec.ClockRate) / time.Second)
var payloads [][]byte
if track.Codec.Name == streamer.CodecH264 {
payloads = h264.SplitAVC(pkt.Data)
} else {
payloads = [][]byte{pkt.Data}
}
for _, payload := range payloads {
packet := &rtp.Packet{ packet := &rtp.Packet{
Header: rtp.Header{Timestamp: timestamp}, Header: rtp.Header{Timestamp: timestamp},
Payload: payload, Payload: pkt.Data,
} }
_ = track.WriteRTP(packet) _ = track.WriteRTP(packet)
} }
} }
}
func (c *Client) Close() error { func (c *Client) Close() error {
if c.conn == nil { if c.conn == nil {

View File

@@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{ v := map[string]interface{}{
streamer.JSONReceive: c.receive, streamer.JSONReceive: c.receive,
streamer.JSONType: "RTMP client producer", streamer.JSONType: "RTMP client producer",
streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), //streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
"url": c.URI, "url": c.URI,
} }
for i, media := range c.medias { for i, media := range c.medias {

View File

@@ -43,8 +43,6 @@ const (
ModeServerConsumer ModeServerConsumer
) )
const KeepAlive = time.Second * 25
type Conn struct { type Conn struct {
streamer.Element streamer.Element
@@ -60,6 +58,7 @@ type Conn struct {
// internal // internal
auth *tcp.Auth auth *tcp.Auth
closed bool
conn net.Conn conn net.Conn
mode Mode mode Mode
reader *bufio.Reader reader *bufio.Reader
@@ -115,9 +114,7 @@ func (c *Conn) Dial() (err error) {
_ = c.parseURI() _ = c.parseURI()
} }
c.conn, err = net.DialTimeout( c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5)
"tcp", c.URL.Host, 10*time.Second,
)
if err != nil { if err != nil {
return return
} }
@@ -362,22 +359,26 @@ func (c *Conn) SetupMedia(
var res *tcp.Response var res *tcp.Response
res, err = c.Do(req) res, err = c.Do(req)
if err != nil { if err != nil {
// Dahua VTO2111D fail on this step because of backchannel // some Dahua/Amcrest cameras fail here because two simultaneous
// backchannel connections
if c.Backchannel { if c.Backchannel {
if err = c.Dial(); err != nil {
return nil, err
}
c.Backchannel = false c.Backchannel = false
if err = c.Describe(); err != nil { if err := c.Dial(); err != nil {
return nil, err return nil, err
} }
res, err = c.Do(req) if err := c.Describe(); err != nil {
return nil, err
} }
if err != nil { for _, newMedia := range c.Medias {
return nil, err if newMedia.Control == media.Control {
return c.SetupMedia(newMedia, newMedia.Codecs[0])
} }
} }
}
return nil, err
}
if c.Session == "" { if c.Session == "" {
// Session: 216525287999;timeout=60 // Session: 216525287999;timeout=60
@@ -392,13 +393,17 @@ func (c *Conn) SetupMedia(
// we send our `interleaved`, but camera can answer with another // we send our `interleaved`, but camera can answer with another
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.123;source=192.168.10.12;interleaved=0 // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
s := res.Header.Get("Transport") s := res.Header.Get("Transport")
// TODO: rewrite // TODO: rewrite
if !strings.HasPrefix(s, "RTP/AVP/TCP;") { if !strings.HasPrefix(s, "RTP/AVP/TCP;") {
// Escam Q6 has a bug:
// Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1
if !strings.Contains(s, ";interleaved=") {
return nil, fmt.Errorf("wrong transport: %s", s) return nil, fmt.Errorf("wrong transport: %s", s)
} }
}
i := strings.Index(s, "interleaved=") i := strings.Index(s, "interleaved=")
if i < 0 { if i < 0 {
@@ -451,24 +456,19 @@ func (c *Conn) Teardown() (err error) {
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
if c.conn == nil { if c.closed {
return nil return nil
} }
if err := c.Teardown(); err != nil { if err := c.Teardown(); err != nil {
return err return err
} }
conn := c.conn c.closed = true
c.conn = nil return c.conn.Close()
return conn.Close()
} }
const transport = "RTP/AVP/TCP;unicast;interleaved=" const transport = "RTP/AVP/TCP;unicast;interleaved="
func (c *Conn) Accept() error { func (c *Conn) Accept() error {
//if c.state != StateServerInit {
// panic("wrong state")
//}
for { for {
req, err := tcp.ReadRequest(c.reader) req, err := tcp.ReadRequest(c.reader)
if err != nil { if err != nil {
@@ -571,7 +571,7 @@ func (c *Conn) Accept() error {
Request: req, Request: req,
} }
if tr[:len(transport)] == transport { if strings.HasPrefix(tr, transport) {
c.Session = "1" // TODO: fixme c.Session = "1" // TODO: fixme
res.Header.Set("Transport", tr[:len(transport)+3]) res.Header.Set("Transport", tr[:len(transport)+3])
} else { } else {
@@ -594,16 +594,44 @@ func (c *Conn) Accept() error {
func (c *Conn) Handle() (err error) { func (c *Conn) Handle() (err error) {
defer func() { defer func() {
if c.conn == nil { if c.closed {
err = nil err = nil
} else {
// may have gotten here because of the deadline
// so close the connection to stop keepalive
_ = c.conn.Close()
} }
//c.Fire(streamer.StateNull)
}() }()
//c.Fire(streamer.StatePlaying) var timeout time.Duration
ts := time.Now().Add(KeepAlive)
switch c.mode {
case ModeClientProducer:
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
go c.keepalive()
case ModeServerProducer:
// polling frames from remote RTSP Client (ex FFmpeg)
timeout = time.Second * 15
case ModeServerConsumer:
// pushing frames to remote RTSP Client (ex VLC)
timeout = time.Second * 60
default:
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
}
for { for {
if c.closed {
return
}
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return
}
// we can read: // we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size // 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK // 2. RTSP response: RTSP/1.0 200 OK
@@ -681,16 +709,19 @@ func (c *Conn) Handle() (err error) {
c.Fire(msg) c.Fire(msg)
} }
// keep-alive
now := time.Now()
if now.After(ts) {
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
// don't need to wait respose on this request
if err = c.Request(req); err != nil {
return err
} }
ts = now.Add(KeepAlive) }
func (c *Conn) keepalive() {
// TODO: rewrite to RTCP
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
for {
time.Sleep(time.Second * 25)
if c.closed {
return
}
if err := c.Request(req); err != nil {
return
} }
} }
} }
@@ -708,20 +739,16 @@ func (c *Conn) bindTrack(
track *streamer.Track, channel uint8, payloadType uint8, track *streamer.Track, channel uint8, payloadType uint8,
) *streamer.Track { ) *streamer.Track {
push := func(packet *rtp.Packet) error { push := func(packet *rtp.Packet) error {
if c.conn == nil { if c.closed {
return nil return nil
} }
packet.Header.PayloadType = payloadType packet.Header.PayloadType = payloadType
//packet.Header.PayloadType = 100
//packet.Header.PayloadType = 8
//packet.Header.PayloadType = 106
size := packet.MarshalSize() size := packet.MarshalSize()
data := make([]byte, 4+size) data := make([]byte, 4+size)
data[0] = '$' data[0] = '$'
data[1] = channel data[1] = channel
//data[1] = 10
binary.BigEndian.PutUint16(data[2:], uint16(size)) binary.BigEndian.PutUint16(data[2:], uint16(size))
if _, err := packet.MarshalTo(data[4:]); err != nil { if _, err := packet.MarshalTo(data[4:]); err != nil {

View File

@@ -2,6 +2,7 @@ package rtsp
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strconv" "strconv"
) )
@@ -27,13 +28,16 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
} }
func (c *Conn) Start() error { func (c *Conn) Start() error {
if c.mode == ModeServerProducer { switch c.mode {
return nil case ModeClientProducer:
}
if err := c.Play(); err != nil { if err := c.Play(); err != nil {
return err return err
} }
case ModeServerProducer:
default:
return fmt.Errorf("start wrong mode: %d", c.mode)
}
return c.Handle() return c.Handle()
} }

View File

@@ -75,13 +75,13 @@ func (m *Media) AV() bool {
return m.Kind == KindVideo || m.Kind == KindAudio return m.Kind == KindVideo || m.Kind == KindAudio
} }
func (m *Media) MatchCodec(codec *Codec) bool { func (m *Media) MatchCodec(codec *Codec) *Codec {
for _, c := range m.Codecs { for _, c := range m.Codecs {
if c.Match(codec) { if c.Match(codec) {
return true return c
} }
} }
return false return nil
} }
func (m *Media) MatchMedia(media *Media) *Codec { func (m *Media) MatchMedia(media *Media) *Codec {

View File

@@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc
type Track struct { type Track struct {
Codec *Codec Codec *Codec
Direction string Direction string
Sink map[*Track]WriterFunc sink map[*Track]WriterFunc
mx sync.Mutex sinkMu sync.Mutex
} }
func (t *Track) String() string { func (t *Track) String() string {
s := t.Codec.String() s := t.Codec.String()
s += fmt.Sprintf(", sinks=%d", len(t.Sink)) s += fmt.Sprintf(", sinks=%d", len(t.sink))
return s return s
} }
func (t *Track) WriteRTP(p *rtp.Packet) error { func (t *Track) WriteRTP(p *rtp.Packet) error {
t.mx.Lock() t.sinkMu.Lock()
for _, f := range t.Sink { for _, f := range t.sink {
_ = f(p) _ = f(p)
} }
t.mx.Unlock() t.sinkMu.Unlock()
return nil return nil
} }
func (t *Track) Bind(w WriterFunc) *Track { func (t *Track) Bind(w WriterFunc) *Track {
t.mx.Lock() t.sinkMu.Lock()
if t.Sink == nil { if t.sink == nil {
t.Sink = map[*Track]WriterFunc{} t.sink = map[*Track]WriterFunc{}
} }
clone := &Track{ clone := &Track{
Codec: t.Codec, Direction: t.Direction, Sink: t.Sink, Codec: t.Codec, Direction: t.Direction, sink: t.sink,
} }
t.Sink[clone] = w t.sink[clone] = w
t.mx.Unlock() t.sinkMu.Unlock()
return clone return clone
} }
func (t *Track) Unbind() { func (t *Track) Unbind() {
t.mx.Lock() t.sinkMu.Lock()
delete(t.Sink, t) delete(t.sink, t)
t.mx.Unlock() t.sinkMu.Unlock()
}
func (t *Track) GetSink(from *Track) {
t.sink = from.sink
}
func (t *Track) HasSink() bool {
t.sinkMu.Lock()
defer t.sinkMu.Unlock()
return len(t.sink) > 0
} }

View File

@@ -9,6 +9,7 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time"
) )
func NewCandidate(address string) (string, error) { func NewCandidate(address string) (string, error) {
@@ -38,7 +39,7 @@ func NewCandidate(address string) (string, error) {
func LookupIP(address string) (string, error) { func LookupIP(address string) (string, error) {
if strings.HasPrefix(address, "stun:") { if strings.HasPrefix(address, "stun:") {
ip, err := GetPublicIP() ip, err := GetCachedPublicIP()
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -63,11 +64,20 @@ func LookupIP(address string) (string, error) {
// GetPublicIP example from https://github.com/pion/stun // GetPublicIP example from https://github.com/pion/stun
func GetPublicIP() (net.IP, error) { func GetPublicIP() (net.IP, error) {
c, err := stun.Dial("udp", "stun.l.google.com:19302") conn, err := net.Dial("udp", "stun.l.google.com:19302")
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := stun.NewClient(conn)
if err != nil {
return nil, err
}
if err = conn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil {
return nil, err
}
var res stun.Event var res stun.Event
message := stun.MustBuild(stun.TransactionID, stun.BindingRequest) message := stun.MustBuild(stun.TransactionID, stun.BindingRequest)
@@ -90,6 +100,24 @@ func GetPublicIP() (net.IP, error) {
return xorAddr.IP, nil return xorAddr.IP, nil
} }
var cachedIP net.IP
var cachedTS time.Time
func GetCachedPublicIP() (net.IP, error) {
now := time.Now()
if now.After(cachedTS) {
newIP, err := GetPublicIP()
if err == nil {
cachedIP = newIP
cachedTS = now.Add(time.Minute * 5)
} else if cachedIP == nil {
return nil, err
}
}
return cachedIP, nil
}
func IsIP(host string) bool { func IsIP(host string) bool {
for _, i := range host { for _, i := range host {
if i >= 'A' { if i >= 'A' {

View File

@@ -70,6 +70,7 @@
'<a href="api/stream.mp4?src={name}">mp4</a>', '<a href="api/stream.mp4?src={name}">mp4</a>',
'<a href="api/frame.mp4?src={name}">frame</a>', '<a href="api/frame.mp4?src={name}">frame</a>',
`<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`, `<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`,
'<a href="api/stream.mjpeg?src={name}">mjpeg</a>',
'<a href="api/streams?src={name}">info</a>', '<a href="api/streams?src={name}">info</a>',
]; ];

View File

@@ -51,11 +51,6 @@
pc.addIceCandidate({candidate: msg.value, sdpMid: ''}); pc.addIceCandidate({candidate: msg.value, sdpMid: ''});
} else if (msg.type === 'webrtc/answer') { } else if (msg.type === 'webrtc/answer') {
pc.setRemoteDescription({type: 'answer', sdp: msg.value}); pc.setRemoteDescription({type: 'answer', sdp: msg.value});
pc.getTransceivers().forEach(t => {
if (t.receiver.track.kind === 'audio') {
t.currentDirection
}
})
} }
} }