Compare commits

...

37 Commits

Author SHA1 Message Date
Alexey Khit
d8158bc1e3 Update stream log message 2022-11-04 22:27:11 +03:00
Alexey Khit
f4f588d2c6 Add mutex to stream 2022-11-04 22:20:52 +03:00
Alexey Khit
e287b52808 Add check for RTSPtoWeb API 2022-11-04 22:12:00 +03:00
Alexey Khit
ff96257252 Add backchannel=0 option to readme 2022-11-04 21:49:36 +03:00
Alexey Khit
909f21b7e4 Update docs about TURN server 2022-11-04 21:44:12 +03:00
Alexey Khit
7d6a5b44f8 Add frame.jpeg api for MJPEG stream 2022-11-04 21:22:33 +03:00
Alexey Khit
278f7696b6 Make sink private for Track 2022-11-04 20:54:35 +03:00
Alexey Khit
3cbf2465ae Fix loopback producer 2022-11-04 17:52:26 +03:00
Alexey Khit
e9ea7a0b1f Add reconnection feature 2022-11-04 17:23:42 +03:00
Alexey Khit
0231fc3a90 Code refactoring 2022-11-04 17:16:42 +03:00
Alexey Khit
9ef2633840 Add 5 sec timeout to ffmpeg rtsp 2022-11-04 17:06:24 +03:00
Alexey Khit
5a8df3e90a Change RTSP dial timeout to 5 sec 2022-11-04 17:05:57 +03:00
Alexey Khit
a31cbec3eb Fix check RTSP transport prefix 2022-11-04 17:05:30 +03:00
Alexey Khit
54f547977e Add mutext to streams handlers 2022-11-04 17:04:47 +03:00
Alexey Khit
65d91e02bd Move NewLogger to function 2022-11-04 17:03:56 +03:00
Alexey Khit
7fc3f0f641 Ignore srtp init in stack list func 2022-11-04 10:07:50 +03:00
Alexey Khit
7725d5ed31 Rewrite get handlers code 2022-11-04 06:24:39 +03:00
Alexey Khit
6c1b9daa8b Update logs for RTSP packets (disabled) 2022-11-03 11:25:47 +03:00
Alexey Khit
6d432574bf Make main logger global 2022-11-03 10:26:26 +03:00
Alexey Khit
616f69c88b Cache public IP for 5 min 2022-11-02 12:48:36 +03:00
Alexey Khit
f72440712b Add timeout to GetPublicIP func 2022-11-02 12:47:26 +03:00
Alexey Khit
ceed146fb8 Add webrtc sync API 2022-11-02 12:46:39 +03:00
Alexey Khit
f17dadbbbf Rewrite keepalive and add timeouts for RTSP 2022-11-02 10:50:11 +03:00
Alexey Khit
3d4514eab9 Fix loopback for stream 2022-11-02 08:51:54 +03:00
Alexey Khit
2629dccb81 Rename HTTP-FLV 2022-10-31 20:05:28 +03:00
Alexey Khit
04f1aa2900 Fix trash in webrtc.html 2022-10-31 08:34:32 +03:00
Alexey Khit
0dacdea1c3 Add support RTMPT (flv over HTTP) 2022-10-30 17:17:42 +03:00
Alexey Khit
24082b1616 Fix backchannel reconnection issue 2022-10-29 11:33:01 +03:00
Alexey Khit
7964b1743b Fix RTSP processing for Amcrest IP4M-1051 2022-10-29 11:29:53 +03:00
Alexey Khit
49773a1ece Add mjpeg link to stream to main page 2022-10-21 12:01:00 +03:00
Alexey Khit
c97a48a73f Fix mjpeg for 2K cameras 2022-10-21 12:00:00 +03:00
Alexey Khit
e03231ebb4 fix ffmpeg transcoding for Reolink RLC-510A 2022-10-21 10:58:56 +03:00
Alexey Khit
649525a842 Merge remote-tracking branch 'origin/master' 2022-10-21 10:54:54 +03:00
Alex X
d411c1a25c Merge pull request #76 from NickM-27/name_api_stream
Add ability for API to set name of stream
2022-10-14 06:59:31 +03:00
Nicolas Mowen
2f0bcf4ae0 Add ability for API to set name of stream 2022-10-13 14:58:26 -06:00
Alexey Khit
831c504cab Fix memory usage for RTSP processing 2022-10-05 21:15:59 +03:00
Alexey Khit
12925a6bc5 Fix TP-Link Tapo TC70 support 2022-10-05 19:43:36 +03:00
28 changed files with 665 additions and 284 deletions

View File

@@ -172,6 +172,8 @@ streams:
- rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1
```
**PS.** For disable bachannel just add `#backchannel=0` to end of RTSP link.
#### Source: RTMP
You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio.
@@ -385,13 +387,13 @@ ngrok:
command: ...
```
**Own TCP-tunnel**
**Hard tech way 1. Own TCP-tunnel**
If you have personal VPS, you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
**Using TURN-server**
**Hard tech way 2. Using TURN-server**
TODO...
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can install TURN server (e.g. [coturn](https://github.com/coturn/coturn), config [example](https://github.com/AlexxIT/WebRTC/wiki/Coturn-Example)).
```yaml
webrtc:

View File

@@ -85,10 +85,15 @@ var wsHandlers = make(map[string]WSHandler)
func streamsHandler(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
name := r.URL.Query().Get("name")
if name == "" {
name = src
}
switch r.Method {
case "PUT":
streams.New(src, src)
streams.New(name, src)
return
case "DELETE":
streams.Delete(src)

View File

@@ -3,6 +3,7 @@ package app
import (
"flag"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
"io"
"os"
@@ -30,10 +31,18 @@ func Init() {
}
}
log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"])
modules = cfg.Mod
path, _ := os.Getwd()
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
}
func NewLogger(format string, level string) zerolog.Logger {
var writer io.Writer = os.Stdout
// styles
format := cfg.Mod["format"]
if format != "json" {
writer = zerolog.ConsoleWriter{
Out: writer, TimeFormat: "15:04:05.000",
@@ -43,18 +52,12 @@ func Init() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
lvl, err := zerolog.ParseLevel(cfg.Mod["level"])
lvl, err := zerolog.ParseLevel(level)
if err != nil || lvl == zerolog.NoLevel {
lvl = zerolog.InfoLevel
}
log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
modules = cfg.Mod
path, _ := os.Getwd()
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
return zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
}
func LoadConfig(v interface{}) {
@@ -68,15 +71,13 @@ func LoadConfig(v interface{}) {
func GetLogger(module string) zerolog.Logger {
if s, ok := modules[module]; ok {
lvl, err := zerolog.ParseLevel(s)
if err != nil {
log.Warn().Err(err).Msg("[log]")
return log
if err == nil {
return log.Level(lvl)
}
return log.Level(lvl)
log.Warn().Err(err).Caller().Send()
}
return log
return log.Logger
}
// internal
@@ -84,8 +85,5 @@ func GetLogger(module string) zerolog.Logger {
// data - config content
var data []byte
// log - main logger
var log zerolog.Logger
// modules log levels
var modules map[string]string

View File

@@ -21,6 +21,7 @@ var stackSkip = [][]byte{
[]byte("created by net/http.(*Server).Serve"), // TODO: why two?
[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"),
[]byte("created by github.com/AlexxIT/go2rtc/cmd/srtp.Init"),
// webrtc/api.go
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),

View File

@@ -14,6 +14,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"time"
)
@@ -23,22 +24,22 @@ func Init() {
return
}
rtsp.OnProducer = func(prod streamer.Producer) bool {
if conn := prod.(*pkg.Conn); conn != nil {
if waiter := waiters[conn.URL.Path]; waiter != nil {
waiter <- prod
return true
}
rtsp.HandleFunc(func(conn *pkg.Conn) bool {
waitersMu.Lock()
waiter := waiters[conn.URL.Path]
waitersMu.Unlock()
if waiter == nil {
return false
}
return false
}
waiter <- conn
return true
})
streams.HandleFunc("exec", Handle)
log = app.GetLogger("exec")
// TODO: add sync.Mutex
waiters = map[string]chan streamer.Producer{}
}
func Handle(url string) (streamer.Producer, error) {
@@ -60,8 +61,15 @@ func Handle(url string) (streamer.Producer, error) {
ch := make(chan streamer.Producer)
waitersMu.Lock()
waiters[path] = ch
defer delete(waiters, path)
waitersMu.Unlock()
defer func() {
waitersMu.Lock()
delete(waiters, path)
waitersMu.Unlock()
}()
log.Debug().Str("url", url).Msg("[exec] run")
@@ -86,4 +94,5 @@ func Handle(url string) (streamer.Producer, error) {
// internal
var log zerolog.Logger
var waiters map[string]chan streamer.Producer
var waiters = map[string]chan streamer.Producer{}
var waitersMu sync.Mutex

View File

@@ -23,7 +23,7 @@ func Init() {
// inputs
"file": "-re -stream_loop -1 -i {input}",
"http": "-fflags nobuffer -flags low_delay -i {input}",
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -timeout 5000000 -i {input}",
// output
"output": "-rtsp_transport tcp -f rtsp {output}",

View File

@@ -55,6 +55,10 @@ func initAPI() {
// /stream/{id}/channel/0/webrtc
default:
i := strings.IndexByte(r.RequestURI[8:], '/')
if i <= 0 {
log.Warn().Msgf("wrong request: %s", r.RequestURI)
return
}
name := r.RequestURI[8 : 8+i]
stream := streams.Get(name)

View File

@@ -10,12 +10,47 @@ import (
)
func Init() {
api.HandleFunc("api/stream.mjpeg", handler)
api.HandleFunc("api/frame.jpeg", handlerKeyframe)
api.HandleFunc("api/stream.mjpeg", handlerStream)
}
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
return
}
exit := make(chan []byte)
cons := &mjpeg.Consumer{}
cons.Listen(func(msg interface{}) {
switch msg := msg.(type) {
case []byte:
exit <- msg
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return
}
data := <-exit
stream.RemoveConsumer(cons)
w.Header().Set("Content-Type", "image/jpeg")
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send()
}
}
const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: "
func handler(w http.ResponseWriter, r *http.Request) {
func handlerStream(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {

View File

@@ -8,6 +8,8 @@ import (
func Init() {
streams.HandleFunc("rtmp", handle)
streams.HandleFunc("http", handle)
streams.HandleFunc("https", handle)
}
func handle(url string) (streamer.Producer, error) {

View File

@@ -32,20 +32,43 @@ func Init() {
// RTSP server support
address := conf.Mod.Listen
if address != "" {
_, Port, _ = net.SplitHostPort(address)
go worker(address)
if address == "" {
return
}
ln, err := net.Listen("tcp", address)
if err != nil {
log.Error().Err(err).Msg("[rtsp] listen")
return
}
_, Port, _ = net.SplitHostPort(address)
log.Info().Str("addr", address).Msg("[rtsp] listen")
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go tcpHandler(conn)
}
}()
}
type Handler func(conn *rtsp.Conn) bool
func HandleFunc(handler Handler) {
handlers = append(handlers, handler)
}
var Port string
var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
// internal
var log zerolog.Logger
var handlers []Handler
func rtspHandler(url string) (streamer.Producer, error) {
backchannel := true
@@ -84,10 +107,10 @@ func rtspHandler(url string) (streamer.Producer, error) {
}
// second try without backchannel, we need to reconnect
conn.Backchannel = false
if err = conn.Dial(); err != nil {
return nil, err
}
conn.Backchannel = false
if err = conn.Describe(); err != nil {
return nil, err
}
@@ -96,101 +119,89 @@ func rtspHandler(url string) (streamer.Producer, error) {
return conn, nil
}
func worker(address string) {
srv, err := tcp.NewServer(address)
if err != nil {
log.Error().Err(err).Msg("[rtsp] listen")
return
}
func tcpHandler(c net.Conn) {
var name string
var closer func()
log.Info().Str("addr", address).Msg("[rtsp] listen")
trace := log.Trace().Enabled()
srv.Listen(func(msg interface{}) {
switch msg.(type) {
case net.Conn:
var name string
var onDisconnect func()
conn := rtsp.NewServer(c)
conn.Listen(func(msg interface{}) {
if trace {
switch msg := msg.(type) {
case *tcp.Request:
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
case *tcp.Response:
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
}
}
trace := log.Trace().Enabled()
switch msg {
case rtsp.MethodDescribe:
name = conn.URL.Path[1:]
conn := rtsp.NewServer(msg.(net.Conn))
conn.Listen(func(msg interface{}) {
if trace {
switch msg := msg.(type) {
case *tcp.Request:
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
case *tcp.Response:
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
}
}
switch msg {
case rtsp.MethodDescribe:
name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
stream := streams.Get(name) // TODO: rewrite
if stream == nil {
return
}
initMedias(conn)
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
return
}
onDisconnect = func() {
stream.RemoveConsumer(conn)
}
case rtsp.MethodAnnounce:
if OnProducer != nil {
if OnProducer(conn) {
return
}
}
name = conn.URL.Path[1:]
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
stream := streams.Get(name)
if stream == nil {
return
}
stream.AddProducer(conn)
onDisconnect = func() {
stream.RemoveProducer(conn)
}
case streamer.StatePlaying:
log.Debug().Str("stream", name).Msg("[rtsp] start")
}
})
if err = conn.Accept(); err != nil {
log.Warn().Err(err).Msg("[rtsp] accept")
stream := streams.Get(name)
if stream == nil {
return
}
if err = conn.Handle(); err != nil {
//log.Warn().Err(err).Msg("[rtsp] handle server")
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
initMedias(conn)
if err := stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
return
}
if onDisconnect != nil {
onDisconnect()
closer = func() {
stream.RemoveConsumer(conn)
}
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
case rtsp.MethodAnnounce:
name = conn.URL.Path[1:]
stream := streams.Get(name)
if stream == nil {
return
}
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
stream.AddProducer(conn)
closer = func() {
stream.RemoveProducer(conn)
}
case streamer.StatePlaying:
log.Debug().Str("stream", name).Msg("[rtsp] start")
}
})
srv.Serve()
if err := conn.Accept(); err != nil {
log.Warn().Err(err).Caller().Send()
_ = conn.Close()
return
}
for _, handler := range handlers {
if handler(conn) {
return
}
}
if closer != nil {
if err := conn.Handle(); err != nil {
log.Debug().Err(err).Caller().Send()
}
closer()
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
}
_ = conn.Close()
}
func initMedias(conn *rtsp.Conn) {

View File

@@ -4,30 +4,36 @@ import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strings"
"sync"
)
type Handler func(url string) (streamer.Producer, error)
var handlers map[string]Handler
var handlers = map[string]Handler{}
var handlersMu sync.Mutex
func HandleFunc(scheme string, handler Handler) {
if handlers == nil {
handlers = make(map[string]Handler)
}
handlersMu.Lock()
handlers[scheme] = handler
handlersMu.Unlock()
}
func getHandler(url string) Handler {
i := strings.IndexByte(url, ':')
if i <= 0 { // TODO: i < 4 ?
return nil
}
handlersMu.Lock()
defer handlersMu.Unlock()
return handlers[url[:i]]
}
func HasProducer(url string) bool {
i := strings.IndexByte(url, ':')
if i <= 0 { // TODO: i < 4 ?
return false
}
return handlers[url[:i]] != nil
return getHandler(url) != nil
}
func GetProducer(url string) (streamer.Producer, error) {
i := strings.IndexByte(url, ':')
handler := handlers[url[:i]]
handler := getHandler(url)
if handler == nil {
return nil, fmt.Errorf("unsupported scheme: %s", url)
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strings"
"sync"
"time"
)
type state byte
@@ -24,8 +25,9 @@ type Producer struct {
element streamer.Producer
tracks []*streamer.Track
state state
mx sync.Mutex
state state
mu sync.Mutex
restart *time.Timer
}
func (p *Producer) SetSource(s string) {
@@ -36,16 +38,16 @@ func (p *Producer) SetSource(s string) {
}
func (p *Producer) GetMedias() []*streamer.Media {
p.mx.Lock()
defer p.mx.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateNone {
log.Debug().Str("url", p.url).Msg("[streams] probe producer")
log.Debug().Msgf("[streams] probe producer url=%s", p.url)
var err error
p.element, err = GetProducer(p.url)
if err != nil || p.element == nil {
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer")
log.Error().Err(err).Caller().Send()
return nil
}
@@ -56,14 +58,17 @@ func (p *Producer) GetMedias() []*streamer.Media {
}
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
p.mx.Lock()
defer p.mx.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateMedias {
p.state = stateTracks
if p.state == stateNone {
return nil
}
track := p.element.GetTrack(media, codec)
if track == nil {
return nil
}
for _, t := range p.tracks {
if track == t {
@@ -71,6 +76,10 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
}
}
if p.state == stateMedias {
p.state = stateTracks
}
p.tracks = append(p.tracks, track)
return track
@@ -79,36 +88,89 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
// internals
func (p *Producer) start() {
p.mx.Lock()
defer p.mx.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateTracks {
return
}
log.Debug().Str("url", p.url).Msg("[streams] start producer")
log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart
go func() {
// safe read element while mu locked
if err := p.element.Start(); err != nil {
log.Warn().Err(err).Str("url", p.url).Msg("[streams] start")
log.Warn().Err(err).Caller().Send()
}
p.reconnect()
}()
}
func (p *Producer) reconnect() {
p.mu.Lock()
defer p.mu.Unlock()
if p.state != stateStart {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return
}
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
var err error
p.element, err = GetProducer(p.url)
if err != nil || p.element == nil {
log.Debug().Err(err).Caller().Send()
// TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
return
}
medias := p.element.GetMedias()
// convert all old producer tracks to new tracks
for i, oldTrack := range p.tracks {
// match new element medias with old track codec
for _, media := range medias {
codec := media.MatchCodec(oldTrack.Codec)
if codec == nil {
continue
}
// move sink from old track to new track
newTrack := p.element.GetTrack(media, codec)
newTrack.GetSink(oldTrack)
p.tracks[i] = newTrack
break
}
}
go func() {
if err = p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}()
}
func (p *Producer) stop() {
p.mx.Lock()
p.mu.Lock()
log.Debug().Str("url", p.url).Msg("[streams] stop producer")
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
if p.element != nil {
_ = p.element.Stop()
p.element = nil
} else {
log.Warn().Str("url", p.url).Msg("[streams] stop empty producer")
}
p.tracks = nil
p.state = stateNone
if p.restart != nil {
p.restart.Stop()
p.restart = nil
}
p.mx.Unlock()
p.state = stateNone
p.tracks = nil
p.mu.Unlock()
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"sync"
)
type Consumer struct {
@@ -14,6 +15,7 @@ type Consumer struct {
type Stream struct {
producers []*Producer
consumers []*Consumer
mu sync.Mutex
}
func NewStream(source interface{}) *Stream {
@@ -51,18 +53,19 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
ic := len(s.consumers)
consumer := &Consumer{element: cons}
var producers []*Producer // matched producers for consumer
// Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia).
Msgf("[streams] consumer:%d:%d candidate", ic, icc)
Msgf("[streams] consumer=%d candidate=%d", ic, icc)
producers:
for ip, prod := range s.producers {
// Step 2. Get producer medias (not tracks yet)
for ipc, prodMedia := range prod.GetMedias() {
log.Trace().Stringer("media", prodMedia).
Msgf("[streams] producer:%d:%d candidate", ip, ipc)
Msgf("[streams] producer=%d candidate=%d", ip, ipc)
// Step 3. Match consumer/producer codecs list
prodCodec := prodMedia.MatchMedia(consMedia)
@@ -81,20 +84,23 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
consTrack := consumer.element.AddTrack(consMedia, prodTrack)
consumer.tracks = append(consumer.tracks, consTrack)
producers = append(producers, prod)
break producers
}
}
}
}
// can't match tracks for consumer
if len(consumer.tracks) == 0 {
if len(producers) == 0 {
return errors.New("couldn't find the matching tracks")
}
s.mu.Lock()
s.consumers = append(s.consumers, consumer)
s.mu.Unlock()
for _, prod := range s.producers {
// there may be duplicates, but that's not a problem
for _, prod := range producers {
prod.start()
}
@@ -102,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
}
func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
s.mu.Lock()
for i, consumer := range s.consumers {
if consumer == nil {
log.Warn().Msgf("empty consumer: %+v\n", s)
@@ -127,7 +134,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
var sink bool
for _, track := range producer.tracks {
if len(track.Sink) > 0 {
if track.HasSink() {
sink = true
}
}
@@ -135,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
producer.stop()
}
}
s.mu.Unlock()
}
func (s *Stream) AddProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateTracks}
s.mu.Lock()
s.producers = append(s.producers, producer)
s.mu.Unlock()
}
func (s *Stream) RemoveProducer(prod streamer.Producer) {
s.mu.Lock()
for i, producer := range s.producers {
if producer.element == prod {
s.removeProducer(i)
break
}
}
s.mu.Unlock()
}
func (s *Stream) Active() bool {
if len(s.consumers) > 0 {
return true
}
for _, prod := range s.producers {
if prod.element != nil {
return true
}
}
return false
}
//func (s *Stream) Active() bool {
// if len(s.consumers) > 0 {
// return true
// }
//
// for _, prod := range s.producers {
// if prod.element != nil {
// return true
// }
// }
//
// return false
//}
func (s *Stream) MarshalJSON() ([]byte, error) {
var v []interface{}
s.mu.Lock()
for _, prod := range s.producers {
if prod.element != nil {
v = append(v, prod.element)
@@ -176,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
// cons.element always not nil
v = append(v, cons.element)
}
s.mu.Unlock()
if len(v) == 0 {
v = nil
}

View File

@@ -8,7 +8,9 @@ import (
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
"io/ioutil"
"net"
"net/http"
)
func Init() {
@@ -55,6 +57,8 @@ func Init() {
api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
api.HandleFunc("api/webrtc", syncHandler)
}
var Port string
@@ -137,6 +141,32 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
ctx.Consumer = conn
}
func syncHandler(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("src")
stream := streams.Get(url)
if stream == nil {
return
}
// get offer
offer, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
answer, err := ExchangeSDP(stream, string(offer), r.UserAgent())
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
// send SDP to client
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Caller().Send()
}
}
func ExchangeSDP(
stream *streams.Stream, offer string, userAgent string,
) (answer string, err error) {

View File

@@ -8,11 +8,12 @@ import (
)
const (
NALUTypePFrame = 1
NALUTypeIFrame = 5
NALUTypeSEI = 6
NALUTypeSPS = 7
NALUTypePPS = 8
NALUTypePFrame = 1 // Coded slice of a non-IDR picture
NALUTypeIFrame = 5 // Coded slice of an IDR picture
NALUTypeSEI = 6 // Supplemental enhancement information (SEI)
NALUTypeSPS = 7 // Sequence parameter set
NALUTypePPS = 8 // Picture parameter set
NALUTypeAUD = 9 // Access unit delimiter
)
func NALUType(b []byte) byte {

View File

@@ -15,47 +15,62 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
sps, pps := GetParameterSet(track.Codec.FmtpLine)
ps := EncodeAVC(sps, pps)
var buffer []byte
buf := make([]byte, 0, 512*1024) // 512K
return func(push streamer.WriterFunc) streamer.WriterFunc {
return func(packet *rtp.Packet) error {
//nalUnitType := packet.Payload[0] & 0x1F
//fmt.Printf(
// "[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d\n",
// track.Codec.Name, nalUnitType, len(packet.Payload), packet.Timestamp,
// packet.PayloadType, packet.SSRC, packet.SequenceNumber,
//)
//log.Printf("[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v", track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker)
payload, err := depack.Unmarshal(packet.Payload)
if len(payload) == 0 || err != nil {
return nil
}
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
// and every NALU will be sliced to multiple NALUs
// Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true
if packet.Marker {
switch NALUType(payload) {
case NALUTypeSPS, NALUTypePPS:
buf = append(buf, payload...)
return nil
}
}
if len(buf) == 0 {
// Amcrest IP4M-1051: 9, 7, 8, 6, 28...
// Amcrest IP4M-1051: 9, 6, 1
switch NALUType(payload) {
case NALUTypeIFrame:
// fix IFrame without SPS,PPS
buf = append(buf, ps...)
case NALUTypeSEI, NALUTypeAUD:
// fix ffmpeg with transcoding first frame
i := int(4 + binary.BigEndian.Uint32(payload))
// check if only one NAL (fix ffmpeg transcoding for Reolink RLC-510A)
if i == len(payload) {
return nil
}
payload = payload[i:]
if NALUType(payload) == NALUTypeIFrame {
buf = append(buf, ps...)
}
}
}
// collect all NALs for Access Unit
if !packet.Marker {
buffer = append(buffer, payload...)
buf = append(buf, payload...)
return nil
}
if buffer != nil {
payload = append(buffer, payload...)
buffer = nil
if len(buf) > 0 {
payload = append(buf, payload...)
buf = buf[:0]
}
//fmt.Printf("[AVC] %v, len: %d\n", Types(payload), len(payload))
switch NALUType(payload) {
case NALUTypeIFrame:
payload = Join(ps, payload)
case NALUTypeSEI:
// ffmpeg with transcoding
i := 4 + binary.BigEndian.Uint32(payload)
payload = payload[i:]
if NALUType(payload) == NALUTypeIFrame {
payload = Join(ps, payload)
}
}
//log.Printf("[AVC] %v, len: %d", Types(payload), len(payload))
clone := *packet
clone.Version = RTPPacketVersionAVC

3
pkg/httpflv/README.md Normal file
View File

@@ -0,0 +1,3 @@
## Useful links
- https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779

100
pkg/httpflv/httpflv.go Normal file
View File

@@ -0,0 +1,100 @@
package httpflv
import (
"bufio"
"errors"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/flv/flvio"
"github.com/deepch/vdk/utils/bits/pio"
"io"
"net/http"
)
func Dial(uri string) (*Conn, error) {
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
c := Conn{
conn: res.Body,
reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize),
buf: make([]byte, 256),
}
if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil {
return nil, err
}
flags, n, err := flvio.ParseFileHeader(c.buf)
if err != nil {
return nil, err
}
if flags&flvio.FILE_HAS_VIDEO == 0 {
return nil, errors.New("not supported")
}
if _, err = c.reader.Discard(n); err != nil {
return nil, err
}
return &c, nil
}
type Conn struct {
conn io.ReadCloser
reader *bufio.Reader
buf []byte
}
func (c *Conn) Streams() ([]av.CodecData, error) {
for {
tag, _, err := flvio.ReadTag(c.reader, c.buf)
if err != nil {
return nil, err
}
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR {
continue
}
stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data)
if err != nil {
return nil, err
}
return []av.CodecData{stream}, nil
}
}
func (c *Conn) ReadPacket() (av.Packet, error) {
for {
tag, ts, err := flvio.ReadTag(c.reader, c.buf)
if err != nil {
return av.Packet{}, err
}
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU {
continue
}
return av.Packet{
Idx: 0,
Data: tag.Data,
CompositionTime: flvio.TsToTime(tag.CompositionTime),
IsKeyFrame: tag.FrameType == flvio.FRAME_KEY,
Time: flvio.TsToTime(ts),
}, nil
}
}
func (c *Conn) Close() (err error) {
return c.conn.Close()
}

View File

@@ -61,8 +61,16 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
lqt, cqt = MakeTables(q)
}
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
// The maximum width is 2040 pixels.
w := uint16(packet.Payload[6]) << 3
h := uint16(packet.Payload[7]) << 3
// fix 2560x1920 and 2560x1440
if w == 512 && (h == 1920 || h == 1440) {
w = 2560
}
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
header = MakeHeaders(t, w, h, lqt, cqt)
}

View File

@@ -5,15 +5,24 @@ import (
"encoding/hex"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/httpflv"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp"
"strings"
"time"
)
// Conn for RTMP and RTMPT (flv over HTTP)
type Conn interface {
Streams() (streams []av.CodecData, err error)
ReadPacket() (pkt av.Packet, err error)
Close() (err error)
}
type Client struct {
streamer.Element
@@ -22,7 +31,7 @@ type Client struct {
medias []*streamer.Media
tracks []*streamer.Track
conn *rtmp.Conn
conn Conn
closed bool
receive int
@@ -33,7 +42,12 @@ func NewClient(uri string) *Client {
}
func (c *Client) Dial() (err error) {
c.conn, err = rtmp.Dial(c.URI)
if strings.HasPrefix(c.URI, "http") {
c.conn, err = httpflv.Dial(c.URI)
} else {
c.conn, err = rtmp.Dial(c.URI)
}
if err != nil {
return
}

View File

@@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) {
v := map[string]interface{}{
streamer.JSONReceive: c.receive,
streamer.JSONType: "RTMP client producer",
streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
//streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
"url": c.URI,
}
for i, media := range c.medias {

View File

@@ -43,8 +43,6 @@ const (
ModeServerConsumer
)
const KeepAlive = time.Second * 25
type Conn struct {
streamer.Element
@@ -60,6 +58,7 @@ type Conn struct {
// internal
auth *tcp.Auth
closed bool
conn net.Conn
mode Mode
reader *bufio.Reader
@@ -115,9 +114,7 @@ func (c *Conn) Dial() (err error) {
_ = c.parseURI()
}
c.conn, err = net.DialTimeout(
"tcp", c.URL.Host, 10*time.Second,
)
c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5)
if err != nil {
return
}
@@ -362,21 +359,25 @@ func (c *Conn) SetupMedia(
var res *tcp.Response
res, err = c.Do(req)
if err != nil {
// Dahua VTO2111D fail on this step because of backchannel
// some Dahua/Amcrest cameras fail here because two simultaneous
// backchannel connections
if c.Backchannel {
if err = c.Dial(); err != nil {
return nil, err
}
c.Backchannel = false
if err = c.Describe(); err != nil {
if err := c.Dial(); err != nil {
return nil, err
}
res, err = c.Do(req)
if err := c.Describe(); err != nil {
return nil, err
}
for _, newMedia := range c.Medias {
if newMedia.Control == media.Control {
return c.SetupMedia(newMedia, newMedia.Codecs[0])
}
}
}
if err != nil {
return nil, err
}
return nil, err
}
if c.Session == "" {
@@ -455,24 +456,19 @@ func (c *Conn) Teardown() (err error) {
}
func (c *Conn) Close() error {
if c.conn == nil {
if c.closed {
return nil
}
if err := c.Teardown(); err != nil {
return err
}
conn := c.conn
c.conn = nil
return conn.Close()
c.closed = true
return c.conn.Close()
}
const transport = "RTP/AVP/TCP;unicast;interleaved="
func (c *Conn) Accept() error {
//if c.state != StateServerInit {
// panic("wrong state")
//}
for {
req, err := tcp.ReadRequest(c.reader)
if err != nil {
@@ -575,7 +571,7 @@ func (c *Conn) Accept() error {
Request: req,
}
if tr[:len(transport)] == transport {
if strings.HasPrefix(tr, transport) {
c.Session = "1" // TODO: fixme
res.Header.Set("Transport", tr[:len(transport)+3])
} else {
@@ -598,16 +594,44 @@ func (c *Conn) Accept() error {
func (c *Conn) Handle() (err error) {
defer func() {
if c.conn == nil {
if c.closed {
err = nil
} else {
// may have gotten here because of the deadline
// so close the connection to stop keepalive
_ = c.conn.Close()
}
//c.Fire(streamer.StateNull)
}()
//c.Fire(streamer.StatePlaying)
ts := time.Now().Add(KeepAlive)
var timeout time.Duration
switch c.mode {
case ModeClientProducer:
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
go c.keepalive()
case ModeServerProducer:
// polling frames from remote RTSP Client (ex FFmpeg)
timeout = time.Second * 15
case ModeServerConsumer:
// pushing frames to remote RTSP Client (ex VLC)
timeout = time.Second * 60
default:
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
}
for {
if c.closed {
return
}
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return
}
// we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK
@@ -685,16 +709,19 @@ func (c *Conn) Handle() (err error) {
c.Fire(msg)
}
}
}
// keep-alive
now := time.Now()
if now.After(ts) {
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
// don't need to wait respose on this request
if err = c.Request(req); err != nil {
return err
}
ts = now.Add(KeepAlive)
func (c *Conn) keepalive() {
// TODO: rewrite to RTCP
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
for {
time.Sleep(time.Second * 25)
if c.closed {
return
}
if err := c.Request(req); err != nil {
return
}
}
}
@@ -712,20 +739,16 @@ func (c *Conn) bindTrack(
track *streamer.Track, channel uint8, payloadType uint8,
) *streamer.Track {
push := func(packet *rtp.Packet) error {
if c.conn == nil {
if c.closed {
return nil
}
packet.Header.PayloadType = payloadType
//packet.Header.PayloadType = 100
//packet.Header.PayloadType = 8
//packet.Header.PayloadType = 106
size := packet.MarshalSize()
data := make([]byte, 4+size)
data[0] = '$'
data[1] = channel
//data[1] = 10
binary.BigEndian.PutUint16(data[2:], uint16(size))
if _, err := packet.MarshalTo(data[4:]); err != nil {

View File

@@ -2,6 +2,7 @@ package rtsp
import (
"encoding/json"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"strconv"
)
@@ -27,13 +28,16 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
}
func (c *Conn) Start() error {
if c.mode == ModeServerProducer {
return nil
switch c.mode {
case ModeClientProducer:
if err := c.Play(); err != nil {
return err
}
case ModeServerProducer:
default:
return fmt.Errorf("start wrong mode: %d", c.mode)
}
if err := c.Play(); err != nil {
return err
}
return c.Handle()
}

View File

@@ -75,13 +75,13 @@ func (m *Media) AV() bool {
return m.Kind == KindVideo || m.Kind == KindAudio
}
func (m *Media) MatchCodec(codec *Codec) bool {
func (m *Media) MatchCodec(codec *Codec) *Codec {
for _, c := range m.Codecs {
if c.Match(codec) {
return true
return c
}
}
return false
return nil
}
func (m *Media) MatchMedia(media *Media) *Codec {

View File

@@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc
type Track struct {
Codec *Codec
Direction string
Sink map[*Track]WriterFunc
mx sync.Mutex
sink map[*Track]WriterFunc
sinkMu sync.Mutex
}
func (t *Track) String() string {
s := t.Codec.String()
s += fmt.Sprintf(", sinks=%d", len(t.Sink))
s += fmt.Sprintf(", sinks=%d", len(t.sink))
return s
}
func (t *Track) WriteRTP(p *rtp.Packet) error {
t.mx.Lock()
for _, f := range t.Sink {
t.sinkMu.Lock()
for _, f := range t.sink {
_ = f(p)
}
t.mx.Unlock()
t.sinkMu.Unlock()
return nil
}
func (t *Track) Bind(w WriterFunc) *Track {
t.mx.Lock()
t.sinkMu.Lock()
if t.Sink == nil {
t.Sink = map[*Track]WriterFunc{}
if t.sink == nil {
t.sink = map[*Track]WriterFunc{}
}
clone := &Track{
Codec: t.Codec, Direction: t.Direction, Sink: t.Sink,
Codec: t.Codec, Direction: t.Direction, sink: t.sink,
}
t.Sink[clone] = w
t.sink[clone] = w
t.mx.Unlock()
t.sinkMu.Unlock()
return clone
}
func (t *Track) Unbind() {
t.mx.Lock()
delete(t.Sink, t)
t.mx.Unlock()
t.sinkMu.Lock()
delete(t.sink, t)
t.sinkMu.Unlock()
}
func (t *Track) GetSink(from *Track) {
t.sink = from.sink
}
func (t *Track) HasSink() bool {
t.sinkMu.Lock()
defer t.sinkMu.Unlock()
return len(t.sink) > 0
}

View File

@@ -9,6 +9,7 @@ import (
"net"
"strconv"
"strings"
"time"
)
func NewCandidate(address string) (string, error) {
@@ -38,7 +39,7 @@ func NewCandidate(address string) (string, error) {
func LookupIP(address string) (string, error) {
if strings.HasPrefix(address, "stun:") {
ip, err := GetPublicIP()
ip, err := GetCachedPublicIP()
if err != nil {
return "", err
}
@@ -63,11 +64,20 @@ func LookupIP(address string) (string, error) {
// GetPublicIP example from https://github.com/pion/stun
func GetPublicIP() (net.IP, error) {
c, err := stun.Dial("udp", "stun.l.google.com:19302")
conn, err := net.Dial("udp", "stun.l.google.com:19302")
if err != nil {
return nil, err
}
c, err := stun.NewClient(conn)
if err != nil {
return nil, err
}
if err = conn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil {
return nil, err
}
var res stun.Event
message := stun.MustBuild(stun.TransactionID, stun.BindingRequest)
@@ -90,6 +100,24 @@ func GetPublicIP() (net.IP, error) {
return xorAddr.IP, nil
}
var cachedIP net.IP
var cachedTS time.Time
func GetCachedPublicIP() (net.IP, error) {
now := time.Now()
if now.After(cachedTS) {
newIP, err := GetPublicIP()
if err == nil {
cachedIP = newIP
cachedTS = now.Add(time.Minute * 5)
} else if cachedIP == nil {
return nil, err
}
}
return cachedIP, nil
}
func IsIP(host string) bool {
for _, i := range host {
if i >= 'A' {

View File

@@ -70,6 +70,7 @@
'<a href="api/stream.mp4?src={name}">mp4</a>',
'<a href="api/frame.mp4?src={name}">frame</a>',
`<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`,
'<a href="api/stream.mjpeg?src={name}">mjpeg</a>',
'<a href="api/streams?src={name}">info</a>',
];

View File

@@ -51,11 +51,6 @@
pc.addIceCandidate({candidate: msg.value, sdpMid: ''});
} else if (msg.type === 'webrtc/answer') {
pc.setRemoteDescription({type: 'answer', sdp: msg.value});
pc.getTransceivers().forEach(t => {
if (t.receiver.track.kind === 'audio') {
t.currentDirection
}
})
}
}