mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-09-27 04:36:12 +08:00
Compare commits
49 Commits
v0.1-beta.
...
v0.1-rc.2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
49f6233bde | ||
![]() |
78c5c70c73 | ||
![]() |
32651c74ab | ||
![]() |
5c64d1f847 | ||
![]() |
717af29630 | ||
![]() |
ea18475d31 | ||
![]() |
701a9c69ec | ||
![]() |
c06253c8b2 | ||
![]() |
3a07e9fa03 | ||
![]() |
e1bc30fab3 | ||
![]() |
d16ae0972f | ||
![]() |
8b93c97e69 | ||
![]() |
d8158bc1e3 | ||
![]() |
f4f588d2c6 | ||
![]() |
e287b52808 | ||
![]() |
ff96257252 | ||
![]() |
909f21b7e4 | ||
![]() |
7d6a5b44f8 | ||
![]() |
278f7696b6 | ||
![]() |
3cbf2465ae | ||
![]() |
e9ea7a0b1f | ||
![]() |
0231fc3a90 | ||
![]() |
9ef2633840 | ||
![]() |
5a8df3e90a | ||
![]() |
a31cbec3eb | ||
![]() |
54f547977e | ||
![]() |
65d91e02bd | ||
![]() |
7fc3f0f641 | ||
![]() |
7725d5ed31 | ||
![]() |
6c1b9daa8b | ||
![]() |
6d432574bf | ||
![]() |
616f69c88b | ||
![]() |
f72440712b | ||
![]() |
ceed146fb8 | ||
![]() |
f17dadbbbf | ||
![]() |
3d4514eab9 | ||
![]() |
2629dccb81 | ||
![]() |
04f1aa2900 | ||
![]() |
0dacdea1c3 | ||
![]() |
24082b1616 | ||
![]() |
7964b1743b | ||
![]() |
49773a1ece | ||
![]() |
c97a48a73f | ||
![]() |
e03231ebb4 | ||
![]() |
649525a842 | ||
![]() |
d411c1a25c | ||
![]() |
2f0bcf4ae0 | ||
![]() |
831c504cab | ||
![]() |
12925a6bc5 |
10
README.md
10
README.md
@@ -172,6 +172,8 @@ streams:
|
||||
- rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1
|
||||
```
|
||||
|
||||
**PS.** For disable bachannel just add `#backchannel=0` to end of RTSP link.
|
||||
|
||||
#### Source: RTMP
|
||||
|
||||
You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio.
|
||||
@@ -385,13 +387,13 @@ ngrok:
|
||||
command: ...
|
||||
```
|
||||
|
||||
**Own TCP-tunnel**
|
||||
**Hard tech way 1. Own TCP-tunnel**
|
||||
|
||||
If you have personal VPS, you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
|
||||
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
|
||||
|
||||
**Using TURN-server**
|
||||
**Hard tech way 2. Using TURN-server**
|
||||
|
||||
TODO...
|
||||
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can install TURN server (e.g. [coturn](https://github.com/coturn/coturn), config [example](https://github.com/AlexxIT/WebRTC/wiki/Coturn-Example)).
|
||||
|
||||
```yaml
|
||||
webrtc:
|
||||
|
@@ -85,10 +85,15 @@ var wsHandlers = make(map[string]WSHandler)
|
||||
|
||||
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
name := r.URL.Query().Get("name")
|
||||
|
||||
if name == "" {
|
||||
name = src
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case "PUT":
|
||||
streams.New(src, src)
|
||||
streams.New(name, src)
|
||||
return
|
||||
case "DELETE":
|
||||
streams.Delete(src)
|
||||
|
@@ -64,22 +64,14 @@ func (ctx *Context) Close() {
|
||||
|
||||
func (ctx *Context) Write(msg interface{}) {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
|
||||
var err error
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *streamer.Message:
|
||||
err = ctx.Conn.WriteJSON(msg)
|
||||
case []byte:
|
||||
err = ctx.Conn.WriteMessage(websocket.BinaryMessage, msg)
|
||||
default:
|
||||
return
|
||||
if data, ok := msg.([]byte); ok {
|
||||
_ = ctx.Conn.WriteMessage(websocket.BinaryMessage, data)
|
||||
} else {
|
||||
_ = ctx.Conn.WriteJSON(msg)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
//panic(err) // TODO: fix panic
|
||||
}
|
||||
ctx.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ctx *Context) Error(err error) {
|
||||
|
@@ -3,6 +3,7 @@ package app
|
||||
import (
|
||||
"flag"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io"
|
||||
"os"
|
||||
@@ -30,10 +31,18 @@ func Init() {
|
||||
}
|
||||
}
|
||||
|
||||
log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"])
|
||||
|
||||
modules = cfg.Mod
|
||||
|
||||
path, _ := os.Getwd()
|
||||
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
|
||||
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
|
||||
}
|
||||
|
||||
func NewLogger(format string, level string) zerolog.Logger {
|
||||
var writer io.Writer = os.Stdout
|
||||
|
||||
// styles
|
||||
format := cfg.Mod["format"]
|
||||
if format != "json" {
|
||||
writer = zerolog.ConsoleWriter{
|
||||
Out: writer, TimeFormat: "15:04:05.000",
|
||||
@@ -43,18 +52,12 @@ func Init() {
|
||||
|
||||
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
|
||||
|
||||
lvl, err := zerolog.ParseLevel(cfg.Mod["level"])
|
||||
lvl, err := zerolog.ParseLevel(level)
|
||||
if err != nil || lvl == zerolog.NoLevel {
|
||||
lvl = zerolog.InfoLevel
|
||||
}
|
||||
|
||||
log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
|
||||
|
||||
modules = cfg.Mod
|
||||
|
||||
path, _ := os.Getwd()
|
||||
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
|
||||
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
|
||||
return zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
|
||||
}
|
||||
|
||||
func LoadConfig(v interface{}) {
|
||||
@@ -68,15 +71,13 @@ func LoadConfig(v interface{}) {
|
||||
func GetLogger(module string) zerolog.Logger {
|
||||
if s, ok := modules[module]; ok {
|
||||
lvl, err := zerolog.ParseLevel(s)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("[log]")
|
||||
return log
|
||||
if err == nil {
|
||||
return log.Level(lvl)
|
||||
}
|
||||
|
||||
return log.Level(lvl)
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
}
|
||||
|
||||
return log
|
||||
return log.Logger
|
||||
}
|
||||
|
||||
// internal
|
||||
@@ -84,8 +85,5 @@ func GetLogger(module string) zerolog.Logger {
|
||||
// data - config content
|
||||
var data []byte
|
||||
|
||||
// log - main logger
|
||||
var log zerolog.Logger
|
||||
|
||||
// modules log levels
|
||||
var modules map[string]string
|
||||
|
@@ -21,6 +21,7 @@ var stackSkip = [][]byte{
|
||||
[]byte("created by net/http.(*Server).Serve"), // TODO: why two?
|
||||
|
||||
[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"),
|
||||
[]byte("created by github.com/AlexxIT/go2rtc/cmd/srtp.Init"),
|
||||
|
||||
// webrtc/api.go
|
||||
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -23,22 +24,22 @@ func Init() {
|
||||
return
|
||||
}
|
||||
|
||||
rtsp.OnProducer = func(prod streamer.Producer) bool {
|
||||
if conn := prod.(*pkg.Conn); conn != nil {
|
||||
if waiter := waiters[conn.URL.Path]; waiter != nil {
|
||||
waiter <- prod
|
||||
return true
|
||||
}
|
||||
rtsp.HandleFunc(func(conn *pkg.Conn) bool {
|
||||
waitersMu.Lock()
|
||||
waiter := waiters[conn.URL.Path]
|
||||
waitersMu.Unlock()
|
||||
|
||||
if waiter == nil {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
waiter <- conn
|
||||
return true
|
||||
})
|
||||
|
||||
streams.HandleFunc("exec", Handle)
|
||||
|
||||
log = app.GetLogger("exec")
|
||||
|
||||
// TODO: add sync.Mutex
|
||||
waiters = map[string]chan streamer.Producer{}
|
||||
}
|
||||
|
||||
func Handle(url string) (streamer.Producer, error) {
|
||||
@@ -60,8 +61,15 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
|
||||
ch := make(chan streamer.Producer)
|
||||
|
||||
waitersMu.Lock()
|
||||
waiters[path] = ch
|
||||
defer delete(waiters, path)
|
||||
waitersMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
waitersMu.Lock()
|
||||
delete(waiters, path)
|
||||
waitersMu.Unlock()
|
||||
}()
|
||||
|
||||
log.Debug().Str("url", url).Msg("[exec] run")
|
||||
|
||||
@@ -86,4 +94,5 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
// internal
|
||||
|
||||
var log zerolog.Logger
|
||||
var waiters map[string]chan streamer.Producer
|
||||
var waiters = map[string]chan streamer.Producer{}
|
||||
var waitersMu sync.Mutex
|
||||
|
@@ -23,7 +23,7 @@ func Init() {
|
||||
// inputs
|
||||
"file": "-re -stream_loop -1 -i {input}",
|
||||
"http": "-fflags nobuffer -flags low_delay -i {input}",
|
||||
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
|
||||
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -timeout 5000000 -i {input}",
|
||||
|
||||
// output
|
||||
"output": "-rtsp_transport tcp -f rtsp {output}",
|
||||
|
@@ -55,6 +55,10 @@ func initAPI() {
|
||||
// /stream/{id}/channel/0/webrtc
|
||||
default:
|
||||
i := strings.IndexByte(r.RequestURI[8:], '/')
|
||||
if i <= 0 {
|
||||
log.Warn().Msgf("wrong request: %s", r.RequestURI)
|
||||
return
|
||||
}
|
||||
name := r.RequestURI[8 : 8+i]
|
||||
|
||||
stream := streams.Get(name)
|
||||
|
@@ -10,12 +10,47 @@ import (
|
||||
)
|
||||
|
||||
func Init() {
|
||||
api.HandleFunc("api/stream.mjpeg", handler)
|
||||
api.HandleFunc("api/frame.jpeg", handlerKeyframe)
|
||||
api.HandleFunc("api/stream.mjpeg", handlerStream)
|
||||
}
|
||||
|
||||
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan []byte)
|
||||
|
||||
cons := &mjpeg.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
exit <- msg
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
data := <-exit
|
||||
|
||||
stream.RemoveConsumer(cons)
|
||||
|
||||
w.Header().Set("Content-Type", "image/jpeg")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: "
|
||||
|
||||
func handler(w http.ResponseWriter, r *http.Request) {
|
||||
func handlerStream(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
if stream == nil {
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -35,35 +36,29 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
exit := make(chan []byte)
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons := &mp4.Keyframe{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
exit <- msg
|
||||
if data, ok := msg.([]byte); ok && exit != nil {
|
||||
exit <- data
|
||||
exit = nil
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
defer stream.RemoveConsumer(cons)
|
||||
data := <-exit
|
||||
|
||||
w.Header().Set("Content-Type", cons.MimeType())
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] init")
|
||||
return
|
||||
}
|
||||
data = append(data, <-exit...)
|
||||
stream.RemoveConsumer(cons)
|
||||
|
||||
// Apple Safari won't show frame without length
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
|
||||
w.Header().Set("Content-Type", cons.MimeType)
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,20 +75,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan struct{})
|
||||
exit := make(chan error)
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
if _, err := w.Write(msg); err != nil {
|
||||
exit <- struct{}{}
|
||||
if data, ok := msg.([]byte); ok {
|
||||
if _, err := w.Write(data); err != nil && exit != nil {
|
||||
exit <- err
|
||||
exit = nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,18 +98,36 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] init")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] write")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
<-exit
|
||||
cons.Start()
|
||||
|
||||
log.Trace().Msg("[api.mp4] close")
|
||||
var duration *time.Timer
|
||||
if s := r.URL.Query().Get("duration"); s != "" {
|
||||
if i, _ := strconv.Atoi(s); i > 0 {
|
||||
duration = time.AfterFunc(time.Second*time.Duration(i), func() {
|
||||
if exit != nil {
|
||||
exit <- nil
|
||||
exit = nil
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
err = <-exit
|
||||
|
||||
log.Trace().Err(err).Caller().Send()
|
||||
|
||||
if duration != nil {
|
||||
duration.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func isChromeFirst(w http.ResponseWriter, r *http.Request) bool {
|
||||
|
@@ -9,6 +9,8 @@ import (
|
||||
|
||||
const MsgTypeMSE = "mse" // fMP4
|
||||
|
||||
const packetSize = 8192
|
||||
|
||||
func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
src := ctx.Request.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
@@ -21,14 +23,17 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
cons.RemoteAddr = ctx.Request.RemoteAddr
|
||||
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case *streamer.Message, []byte:
|
||||
ctx.Write(msg)
|
||||
if data, ok := msg.([]byte); ok {
|
||||
for len(data) > packetSize {
|
||||
ctx.Write(data[:packetSize])
|
||||
data = data[packetSize:]
|
||||
}
|
||||
ctx.Write(data)
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] add consumer")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
@@ -37,16 +42,16 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
stream.RemoveConsumer(cons)
|
||||
})
|
||||
|
||||
ctx.Write(&streamer.Message{
|
||||
Type: MsgTypeMSE, Value: cons.MimeType(),
|
||||
})
|
||||
ctx.Write(&streamer.Message{Type: MsgTypeMSE, Value: cons.MimeType()})
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] init")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Write(data)
|
||||
|
||||
cons.Start()
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@ import (
|
||||
|
||||
func Init() {
|
||||
streams.HandleFunc("rtmp", handle)
|
||||
streams.HandleFunc("http", handle)
|
||||
streams.HandleFunc("https", handle)
|
||||
}
|
||||
|
||||
func handle(url string) (streamer.Producer, error) {
|
||||
|
210
cmd/rtsp/rtsp.go
210
cmd/rtsp/rtsp.go
@@ -32,20 +32,43 @@ func Init() {
|
||||
|
||||
// RTSP server support
|
||||
address := conf.Mod.Listen
|
||||
if address != "" {
|
||||
_, Port, _ = net.SplitHostPort(address)
|
||||
|
||||
go worker(address)
|
||||
if address == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[rtsp] listen")
|
||||
return
|
||||
}
|
||||
|
||||
_, Port, _ = net.SplitHostPort(address)
|
||||
|
||||
log.Info().Str("addr", address).Msg("[rtsp] listen")
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go tcpHandler(conn)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
type Handler func(conn *rtsp.Conn) bool
|
||||
|
||||
func HandleFunc(handler Handler) {
|
||||
handlers = append(handlers, handler)
|
||||
}
|
||||
|
||||
var Port string
|
||||
|
||||
var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
|
||||
|
||||
// internal
|
||||
|
||||
var log zerolog.Logger
|
||||
var handlers []Handler
|
||||
|
||||
func rtspHandler(url string) (streamer.Producer, error) {
|
||||
backchannel := true
|
||||
@@ -84,10 +107,10 @@ func rtspHandler(url string) (streamer.Producer, error) {
|
||||
}
|
||||
|
||||
// second try without backchannel, we need to reconnect
|
||||
conn.Backchannel = false
|
||||
if err = conn.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.Backchannel = false
|
||||
if err = conn.Describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -96,101 +119,89 @@ func rtspHandler(url string) (streamer.Producer, error) {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func worker(address string) {
|
||||
srv, err := tcp.NewServer(address)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[rtsp] listen")
|
||||
return
|
||||
}
|
||||
func tcpHandler(c net.Conn) {
|
||||
var name string
|
||||
var closer func()
|
||||
|
||||
log.Info().Str("addr", address).Msg("[rtsp] listen")
|
||||
trace := log.Trace().Enabled()
|
||||
|
||||
srv.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case net.Conn:
|
||||
var name string
|
||||
var onDisconnect func()
|
||||
conn := rtsp.NewServer(c)
|
||||
conn.Listen(func(msg interface{}) {
|
||||
if trace {
|
||||
switch msg := msg.(type) {
|
||||
case *tcp.Request:
|
||||
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
|
||||
case *tcp.Response:
|
||||
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
trace := log.Trace().Enabled()
|
||||
switch msg {
|
||||
case rtsp.MethodDescribe:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
conn := rtsp.NewServer(msg.(net.Conn))
|
||||
conn.Listen(func(msg interface{}) {
|
||||
if trace {
|
||||
switch msg := msg.(type) {
|
||||
case *tcp.Request:
|
||||
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
|
||||
case *tcp.Response:
|
||||
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
switch msg {
|
||||
case rtsp.MethodDescribe:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
|
||||
|
||||
stream := streams.Get(name) // TODO: rewrite
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
initMedias(conn)
|
||||
|
||||
if err = stream.AddConsumer(conn); err != nil {
|
||||
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
|
||||
return
|
||||
}
|
||||
|
||||
onDisconnect = func() {
|
||||
stream.RemoveConsumer(conn)
|
||||
}
|
||||
|
||||
case rtsp.MethodAnnounce:
|
||||
if OnProducer != nil {
|
||||
if OnProducer(conn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
|
||||
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
stream.AddProducer(conn)
|
||||
|
||||
onDisconnect = func() {
|
||||
stream.RemoveProducer(conn)
|
||||
}
|
||||
|
||||
case streamer.StatePlaying:
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] start")
|
||||
}
|
||||
})
|
||||
|
||||
if err = conn.Accept(); err != nil {
|
||||
log.Warn().Err(err).Msg("[rtsp] accept")
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = conn.Handle(); err != nil {
|
||||
//log.Warn().Err(err).Msg("[rtsp] handle server")
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
|
||||
|
||||
initMedias(conn)
|
||||
|
||||
if err := stream.AddConsumer(conn); err != nil {
|
||||
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
|
||||
return
|
||||
}
|
||||
|
||||
if onDisconnect != nil {
|
||||
onDisconnect()
|
||||
closer = func() {
|
||||
stream.RemoveConsumer(conn)
|
||||
}
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
|
||||
case rtsp.MethodAnnounce:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
|
||||
|
||||
stream.AddProducer(conn)
|
||||
|
||||
closer = func() {
|
||||
stream.RemoveProducer(conn)
|
||||
}
|
||||
|
||||
case streamer.StatePlaying:
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] start")
|
||||
}
|
||||
})
|
||||
|
||||
srv.Serve()
|
||||
if err := conn.Accept(); err != nil {
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
for _, handler := range handlers {
|
||||
if handler(conn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if closer != nil {
|
||||
if err := conn.Handle(); err != nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
}
|
||||
|
||||
closer()
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func initMedias(conn *rtsp.Conn) {
|
||||
@@ -198,16 +209,27 @@ func initMedias(conn *rtsp.Conn) {
|
||||
for key, value := range conn.URL.Query() {
|
||||
switch key {
|
||||
case streamer.KindVideo, streamer.KindAudio:
|
||||
for _, value := range value {
|
||||
for _, name := range value {
|
||||
name = strings.ToUpper(name)
|
||||
|
||||
// check aliases
|
||||
switch name {
|
||||
case "COPY":
|
||||
name = "" // pass empty codecs list
|
||||
case "MJPEG":
|
||||
name = streamer.CodecJPEG
|
||||
case "AAC":
|
||||
name = streamer.CodecAAC
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
Kind: key, Direction: streamer.DirectionRecvonly,
|
||||
}
|
||||
|
||||
switch value {
|
||||
case "", "copy": // pass empty codecs list
|
||||
default:
|
||||
codec := streamer.NewCodec(value)
|
||||
media.Codecs = append(media.Codecs, codec)
|
||||
// empty codecs match all codecs
|
||||
if name != "" {
|
||||
// empty clock rate and channels match any values
|
||||
media.Codecs = []*streamer.Codec{{Name: name}}
|
||||
}
|
||||
|
||||
conn.Medias = append(conn.Medias, media)
|
||||
|
@@ -4,30 +4,36 @@ import (
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Handler func(url string) (streamer.Producer, error)
|
||||
|
||||
var handlers map[string]Handler
|
||||
var handlers = map[string]Handler{}
|
||||
var handlersMu sync.Mutex
|
||||
|
||||
func HandleFunc(scheme string, handler Handler) {
|
||||
if handlers == nil {
|
||||
handlers = make(map[string]Handler)
|
||||
}
|
||||
handlersMu.Lock()
|
||||
handlers[scheme] = handler
|
||||
handlersMu.Unlock()
|
||||
}
|
||||
|
||||
func getHandler(url string) Handler {
|
||||
i := strings.IndexByte(url, ':')
|
||||
if i <= 0 { // TODO: i < 4 ?
|
||||
return nil
|
||||
}
|
||||
handlersMu.Lock()
|
||||
defer handlersMu.Unlock()
|
||||
return handlers[url[:i]]
|
||||
}
|
||||
|
||||
func HasProducer(url string) bool {
|
||||
i := strings.IndexByte(url, ':')
|
||||
if i <= 0 { // TODO: i < 4 ?
|
||||
return false
|
||||
}
|
||||
return handlers[url[:i]] != nil
|
||||
return getHandler(url) != nil
|
||||
}
|
||||
|
||||
func GetProducer(url string) (streamer.Producer, error) {
|
||||
i := strings.IndexByte(url, ':')
|
||||
handler := handlers[url[:i]]
|
||||
handler := getHandler(url)
|
||||
if handler == nil {
|
||||
return nil, fmt.Errorf("unsupported scheme: %s", url)
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type state byte
|
||||
@@ -24,8 +25,9 @@ type Producer struct {
|
||||
element streamer.Producer
|
||||
tracks []*streamer.Track
|
||||
|
||||
state state
|
||||
mx sync.Mutex
|
||||
state state
|
||||
mu sync.Mutex
|
||||
restart *time.Timer
|
||||
}
|
||||
|
||||
func (p *Producer) SetSource(s string) {
|
||||
@@ -36,16 +38,16 @@ func (p *Producer) SetSource(s string) {
|
||||
}
|
||||
|
||||
func (p *Producer) GetMedias() []*streamer.Media {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state == stateNone {
|
||||
log.Debug().Str("url", p.url).Msg("[streams] probe producer")
|
||||
log.Debug().Msgf("[streams] probe producer url=%s", p.url)
|
||||
|
||||
var err error
|
||||
p.element, err = GetProducer(p.url)
|
||||
if err != nil || p.element == nil {
|
||||
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -56,59 +58,124 @@ func (p *Producer) GetMedias() []*streamer.Media {
|
||||
}
|
||||
|
||||
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state == stateMedias {
|
||||
p.state = stateTracks
|
||||
if p.state == stateNone {
|
||||
return nil
|
||||
}
|
||||
|
||||
track := p.element.GetTrack(media, codec)
|
||||
|
||||
for _, t := range p.tracks {
|
||||
if track == t {
|
||||
for _, track := range p.tracks {
|
||||
if track.Codec == codec {
|
||||
return track
|
||||
}
|
||||
}
|
||||
|
||||
// can't get new tracks after start
|
||||
if p.state == stateStart {
|
||||
return nil
|
||||
}
|
||||
|
||||
track := p.element.GetTrack(media, codec)
|
||||
if track == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.tracks = append(p.tracks, track)
|
||||
|
||||
if p.state == stateMedias {
|
||||
p.state = stateTracks
|
||||
}
|
||||
|
||||
return track
|
||||
}
|
||||
|
||||
// internals
|
||||
|
||||
func (p *Producer) start() {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state != stateTracks {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Str("url", p.url).Msg("[streams] start producer")
|
||||
log.Debug().Msgf("[streams] start producer url=%s", p.url)
|
||||
|
||||
p.state = stateStart
|
||||
go func() {
|
||||
// safe read element while mu locked
|
||||
if err := p.element.Start(); err != nil {
|
||||
log.Warn().Err(err).Str("url", p.url).Msg("[streams] start")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Producer) reconnect() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state != stateStart {
|
||||
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
|
||||
|
||||
var err error
|
||||
p.element, err = GetProducer(p.url)
|
||||
if err != nil || p.element == nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
// TODO: dynamic timeout
|
||||
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
|
||||
return
|
||||
}
|
||||
|
||||
medias := p.element.GetMedias()
|
||||
|
||||
// convert all old producer tracks to new tracks
|
||||
for i, oldTrack := range p.tracks {
|
||||
// match new element medias with old track codec
|
||||
for _, media := range medias {
|
||||
codec := media.MatchCodec(oldTrack.Codec)
|
||||
if codec == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// move sink from old track to new track
|
||||
newTrack := p.element.GetTrack(media, codec)
|
||||
newTrack.GetSink(oldTrack)
|
||||
p.tracks[i] = newTrack
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err = p.element.Start(); err != nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Producer) stop() {
|
||||
p.mx.Lock()
|
||||
p.mu.Lock()
|
||||
|
||||
log.Debug().Str("url", p.url).Msg("[streams] stop producer")
|
||||
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
|
||||
|
||||
if p.element != nil {
|
||||
_ = p.element.Stop()
|
||||
p.element = nil
|
||||
} else {
|
||||
log.Warn().Str("url", p.url).Msg("[streams] stop empty producer")
|
||||
}
|
||||
p.tracks = nil
|
||||
p.state = stateNone
|
||||
if p.restart != nil {
|
||||
p.restart.Stop()
|
||||
p.restart = nil
|
||||
}
|
||||
|
||||
p.mx.Unlock()
|
||||
p.state = stateNone
|
||||
p.tracks = nil
|
||||
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
@@ -14,6 +15,7 @@ type Consumer struct {
|
||||
type Stream struct {
|
||||
producers []*Producer
|
||||
consumers []*Consumer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewStream(source interface{}) *Stream {
|
||||
@@ -51,18 +53,19 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
ic := len(s.consumers)
|
||||
|
||||
consumer := &Consumer{element: cons}
|
||||
var producers []*Producer // matched producers for consumer
|
||||
|
||||
// Step 1. Get consumer medias
|
||||
for icc, consMedia := range cons.GetMedias() {
|
||||
log.Trace().Stringer("media", consMedia).
|
||||
Msgf("[streams] consumer:%d:%d candidate", ic, icc)
|
||||
Msgf("[streams] consumer=%d candidate=%d", ic, icc)
|
||||
|
||||
producers:
|
||||
for ip, prod := range s.producers {
|
||||
// Step 2. Get producer medias (not tracks yet)
|
||||
for ipc, prodMedia := range prod.GetMedias() {
|
||||
log.Trace().Stringer("media", prodMedia).
|
||||
Msgf("[streams] producer:%d:%d candidate", ip, ipc)
|
||||
Msgf("[streams] producer=%d candidate=%d", ip, ipc)
|
||||
|
||||
// Step 3. Match consumer/producer codecs list
|
||||
prodCodec := prodMedia.MatchMedia(consMedia)
|
||||
@@ -81,20 +84,23 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
consTrack := consumer.element.AddTrack(consMedia, prodTrack)
|
||||
|
||||
consumer.tracks = append(consumer.tracks, consTrack)
|
||||
producers = append(producers, prod)
|
||||
break producers
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// can't match tracks for consumer
|
||||
if len(consumer.tracks) == 0 {
|
||||
if len(producers) == 0 {
|
||||
return errors.New("couldn't find the matching tracks")
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.consumers = append(s.consumers, consumer)
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, prod := range s.producers {
|
||||
// there may be duplicates, but that's not a problem
|
||||
for _, prod := range producers {
|
||||
prod.start()
|
||||
}
|
||||
|
||||
@@ -102,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
}
|
||||
|
||||
func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
s.mu.Lock()
|
||||
for i, consumer := range s.consumers {
|
||||
if consumer == nil {
|
||||
log.Warn().Msgf("empty consumer: %+v\n", s)
|
||||
@@ -127,7 +134,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
|
||||
var sink bool
|
||||
for _, track := range producer.tracks {
|
||||
if len(track.Sink) > 0 {
|
||||
if track.HasSink() {
|
||||
sink = true
|
||||
}
|
||||
}
|
||||
@@ -135,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
producer.stop()
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) AddProducer(prod streamer.Producer) {
|
||||
producer := &Producer{element: prod, state: stateTracks}
|
||||
s.mu.Lock()
|
||||
s.producers = append(s.producers, producer)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) RemoveProducer(prod streamer.Producer) {
|
||||
s.mu.Lock()
|
||||
for i, producer := range s.producers {
|
||||
if producer.element == prod {
|
||||
s.removeProducer(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) Active() bool {
|
||||
if len(s.consumers) > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, prod := range s.producers {
|
||||
if prod.element != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
//func (s *Stream) Active() bool {
|
||||
// if len(s.consumers) > 0 {
|
||||
// return true
|
||||
// }
|
||||
//
|
||||
// for _, prod := range s.producers {
|
||||
// if prod.element != nil {
|
||||
// return true
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return false
|
||||
//}
|
||||
|
||||
func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||
var v []interface{}
|
||||
s.mu.Lock()
|
||||
for _, prod := range s.producers {
|
||||
if prod.element != nil {
|
||||
v = append(v, prod.element)
|
||||
@@ -176,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||
// cons.element always not nil
|
||||
v = append(v, cons.element)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if len(v) == 0 {
|
||||
v = nil
|
||||
}
|
||||
|
@@ -8,7 +8,9 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/webrtc"
|
||||
pion "github.com/pion/webrtc/v3"
|
||||
"github.com/rs/zerolog"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -55,6 +57,8 @@ func Init() {
|
||||
|
||||
api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
|
||||
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
|
||||
|
||||
api.HandleFunc("api/webrtc", syncHandler)
|
||||
}
|
||||
|
||||
var Port string
|
||||
@@ -137,6 +141,32 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
|
||||
ctx.Consumer = conn
|
||||
}
|
||||
|
||||
func syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||
url := r.URL.Query().Get("src")
|
||||
stream := streams.Get(url)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// get offer
|
||||
offer, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := ExchangeSDP(stream, string(offer), r.UserAgent())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
// send SDP to client
|
||||
if _, err = w.Write([]byte(answer)); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
func ExchangeSDP(
|
||||
stream *streams.Stream, offer string, userAgent string,
|
||||
) (answer string, err error) {
|
||||
|
57
pkg/aac/rtp.go
Normal file
57
pkg/aac/rtp.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package aac
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const RTPPacketVersionAAC = 0
|
||||
|
||||
func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
// support ONLY 2 bytes header size!
|
||||
// streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408
|
||||
headersSize := binary.BigEndian.Uint16(packet.Payload) >> 3
|
||||
|
||||
//log.Printf("[RTP/AAC] units: %d, size: %4d, ts: %10d, %t", headersSize/2, len(packet.Payload), packet.Timestamp, packet.Marker)
|
||||
|
||||
clone := *packet
|
||||
clone.Version = RTPPacketVersionAAC
|
||||
clone.Payload = packet.Payload[2+headersSize:]
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
sequencer := rtp.NewRandomSequencer()
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
if packet.Version != RTPPacketVersionAAC {
|
||||
return push(packet)
|
||||
}
|
||||
|
||||
// support ONLY one unit in payload
|
||||
size := uint16(len(packet.Payload))
|
||||
// 2 bytes header size + 2 bytes first payload size
|
||||
payload := make([]byte, 2+2+size)
|
||||
payload[1] = 16 // header size in bits
|
||||
binary.BigEndian.PutUint16(payload[2:], size<<3)
|
||||
copy(payload[4:], packet.Payload)
|
||||
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
@@ -6,12 +6,6 @@ import (
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const PayloadTypeAVC = 255
|
||||
|
||||
func IsAVC(codec *streamer.Codec) bool {
|
||||
return codec.PayloadType == PayloadTypeAVC
|
||||
}
|
||||
|
||||
func EncodeAVC(nals ...[]byte) (avc []byte) {
|
||||
var i, n int
|
||||
|
||||
|
@@ -8,11 +8,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
NALUTypePFrame = 1
|
||||
NALUTypeIFrame = 5
|
||||
NALUTypeSEI = 6
|
||||
NALUTypeSPS = 7
|
||||
NALUTypePPS = 8
|
||||
NALUTypePFrame = 1 // Coded slice of a non-IDR picture
|
||||
NALUTypeIFrame = 5 // Coded slice of an IDR picture
|
||||
NALUTypeSEI = 6 // Supplemental enhancement information (SEI)
|
||||
NALUTypeSPS = 7 // Sequence parameter set
|
||||
NALUTypePPS = 8 // Picture parameter set
|
||||
NALUTypeAUD = 9 // Access unit delimiter
|
||||
)
|
||||
|
||||
func NALUType(b []byte) byte {
|
||||
|
108
pkg/h264/rtp.go
108
pkg/h264/rtp.go
@@ -15,47 +15,62 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
sps, pps := GetParameterSet(track.Codec.FmtpLine)
|
||||
ps := EncodeAVC(sps, pps)
|
||||
|
||||
var buffer []byte
|
||||
buf := make([]byte, 0, 512*1024) // 512K
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
//nalUnitType := packet.Payload[0] & 0x1F
|
||||
//fmt.Printf(
|
||||
// "[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d\n",
|
||||
// track.Codec.Name, nalUnitType, len(packet.Payload), packet.Timestamp,
|
||||
// packet.PayloadType, packet.SSRC, packet.SequenceNumber,
|
||||
//)
|
||||
//log.Printf("[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v", track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker)
|
||||
|
||||
payload, err := depack.Unmarshal(packet.Payload)
|
||||
if len(payload) == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
|
||||
// and every NALU will be sliced to multiple NALUs
|
||||
// Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true
|
||||
if packet.Marker {
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeSPS, NALUTypePPS:
|
||||
buf = append(buf, payload...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(buf) == 0 {
|
||||
// Amcrest IP4M-1051: 9, 7, 8, 6, 28...
|
||||
// Amcrest IP4M-1051: 9, 6, 1
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeIFrame:
|
||||
// fix IFrame without SPS,PPS
|
||||
buf = append(buf, ps...)
|
||||
case NALUTypeSEI, NALUTypeAUD:
|
||||
// fix ffmpeg with transcoding first frame
|
||||
i := int(4 + binary.BigEndian.Uint32(payload))
|
||||
|
||||
// check if only one NAL (fix ffmpeg transcoding for Reolink RLC-510A)
|
||||
if i == len(payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload = payload[i:]
|
||||
|
||||
if NALUType(payload) == NALUTypeIFrame {
|
||||
buf = append(buf, ps...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// collect all NALs for Access Unit
|
||||
if !packet.Marker {
|
||||
buffer = append(buffer, payload...)
|
||||
buf = append(buf, payload...)
|
||||
return nil
|
||||
}
|
||||
|
||||
if buffer != nil {
|
||||
payload = append(buffer, payload...)
|
||||
buffer = nil
|
||||
if len(buf) > 0 {
|
||||
payload = append(buf, payload...)
|
||||
buf = buf[:0]
|
||||
}
|
||||
|
||||
//fmt.Printf("[AVC] %v, len: %d\n", Types(payload), len(payload))
|
||||
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeIFrame:
|
||||
payload = Join(ps, payload)
|
||||
case NALUTypeSEI:
|
||||
// ffmpeg with transcoding
|
||||
i := 4 + binary.BigEndian.Uint32(payload)
|
||||
payload = payload[i:]
|
||||
if NALUType(payload) == NALUTypeIFrame {
|
||||
payload = Join(ps, payload)
|
||||
}
|
||||
}
|
||||
//log.Printf("[AVC] %v, len: %d", Types(payload), len(payload))
|
||||
|
||||
clone := *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
@@ -72,29 +87,28 @@ func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
if packet.Version == RTPPacketVersionAVC {
|
||||
payloads := payloader.Payload(mtu, packet.Payload)
|
||||
last := len(payloads) - 1
|
||||
for i, payload := range payloads {
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: i == last,
|
||||
//PayloadType: packet.PayloadType,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
//SSRC: packet.SSRC,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
if err := push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
if packet.Version != RTPPacketVersionAVC {
|
||||
return push(packet)
|
||||
}
|
||||
|
||||
return push(packet)
|
||||
payloads := payloader.Payload(mtu, packet.Payload)
|
||||
last := len(payloads) - 1
|
||||
for i, payload := range payloads {
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: i == last,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
if err := push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
3
pkg/httpflv/README.md
Normal file
3
pkg/httpflv/README.md
Normal file
@@ -0,0 +1,3 @@
|
||||
## Useful links
|
||||
|
||||
- https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779
|
100
pkg/httpflv/httpflv.go
Normal file
100
pkg/httpflv/httpflv.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package httpflv
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/flv/flvio"
|
||||
"github.com/deepch/vdk/utils/bits/pio"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func Dial(uri string) (*Conn, error) {
|
||||
req, err := http.NewRequest("GET", uri, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := Conn{
|
||||
conn: res.Body,
|
||||
reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize),
|
||||
buf: make([]byte, 256),
|
||||
}
|
||||
|
||||
if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
flags, n, err := flvio.ParseFileHeader(c.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if flags&flvio.FILE_HAS_VIDEO == 0 {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
if _, err = c.reader.Discard(n); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
conn io.ReadCloser
|
||||
reader *bufio.Reader
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (c *Conn) Streams() ([]av.CodecData, error) {
|
||||
for {
|
||||
tag, _, err := flvio.ReadTag(c.reader, c.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR {
|
||||
continue
|
||||
}
|
||||
|
||||
stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []av.CodecData{stream}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) ReadPacket() (av.Packet, error) {
|
||||
for {
|
||||
tag, ts, err := flvio.ReadTag(c.reader, c.buf)
|
||||
if err != nil {
|
||||
return av.Packet{}, err
|
||||
}
|
||||
|
||||
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU {
|
||||
continue
|
||||
}
|
||||
|
||||
return av.Packet{
|
||||
Idx: 0,
|
||||
Data: tag.Data,
|
||||
CompositionTime: flvio.TsToTime(tag.CompositionTime),
|
||||
IsKeyFrame: tag.FrameType == flvio.FRAME_KEY,
|
||||
Time: flvio.TsToTime(ts),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) Close() (err error) {
|
||||
return c.conn.Close()
|
||||
}
|
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
@@ -162,9 +161,12 @@ func (c *Client) getTracks() error {
|
||||
continue
|
||||
}
|
||||
|
||||
codec := streamer.NewCodec(streamer.CodecH264)
|
||||
codec.FmtpLine = "profile-level-id=" + msg.CodecString[i+1:]
|
||||
codec.PayloadType = h264.PayloadTypeAVC
|
||||
codec := &streamer.Codec{
|
||||
Name: streamer.CodecH264,
|
||||
ClockRate: 90000,
|
||||
FmtpLine: "profile-level-id=" + msg.CodecString[i+1:],
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
i = bytes.Index(msg.Data, []byte("avcC")) - 4
|
||||
if i < 0 {
|
||||
|
@@ -61,8 +61,16 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
lqt, cqt = MakeTables(q)
|
||||
}
|
||||
|
||||
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
|
||||
// The maximum width is 2040 pixels.
|
||||
w := uint16(packet.Payload[6]) << 3
|
||||
h := uint16(packet.Payload[7]) << 3
|
||||
|
||||
// fix 2560x1920 and 2560x1440
|
||||
if w == 512 && (h == 1920 || h == 1440) {
|
||||
w = 2560
|
||||
}
|
||||
|
||||
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
|
||||
header = MakeHeaders(t, w, h, lqt, cqt)
|
||||
}
|
||||
|
@@ -3,6 +3,8 @@ package mp4
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/deepch/vdk/format/mp4/mp4io"
|
||||
"github.com/deepch/vdk/format/mp4f"
|
||||
"github.com/deepch/vdk/format/mp4f/mp4fio"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -37,25 +39,17 @@ func MOOV() *mp4io.Movie {
|
||||
SelectionDuration: time0,
|
||||
CurrentTime: time0,
|
||||
},
|
||||
MovieExtend: &mp4io.MovieExtend{
|
||||
Tracks: []*mp4io.TrackExtend{
|
||||
{
|
||||
TrackId: 1,
|
||||
DefaultSampleDescIdx: 1,
|
||||
DefaultSampleDuration: 40,
|
||||
},
|
||||
},
|
||||
},
|
||||
MovieExtend: &mp4io.MovieExtend{},
|
||||
}
|
||||
}
|
||||
|
||||
func TRAK() *mp4io.Track {
|
||||
func TRAK(id int) *mp4io.Track {
|
||||
return &mp4io.Track{
|
||||
// trak > tkhd
|
||||
Header: &mp4io.TrackHeader{
|
||||
TrackId: int32(1), // change me
|
||||
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
|
||||
Duration: 0, // OK
|
||||
TrackId: int32(id),
|
||||
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
|
||||
Duration: 0, // OK
|
||||
Matrix: matrix,
|
||||
CreateTime: time0,
|
||||
ModifyTime: time0,
|
||||
@@ -92,3 +86,15 @@ func TRAK() *mp4io.Track {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func ESDS(conf []byte) *mp4f.FDummy {
|
||||
esds := &mp4fio.ElemStreamDesc{DecConfig: conf}
|
||||
|
||||
b := make([]byte, esds.Len())
|
||||
esds.Marshal(b)
|
||||
|
||||
return &mp4f.FDummy{
|
||||
Data: b,
|
||||
Tag_: mp4io.Tag(uint32(mp4io.ESDS)),
|
||||
}
|
||||
}
|
||||
|
@@ -2,7 +2,7 @@ package mp4
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
@@ -28,44 +28,37 @@ func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
{Name: streamer.CodecH265, ClockRate: 90000},
|
||||
{Name: streamer.CodecH264},
|
||||
{Name: streamer.CodecH265},
|
||||
},
|
||||
},
|
||||
{
|
||||
Kind: streamer.KindAudio,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecAAC},
|
||||
},
|
||||
},
|
||||
//{
|
||||
// Kind: streamer.KindAudio,
|
||||
// Direction: streamer.DirectionRecvonly,
|
||||
// Codecs: []*streamer.Codec{
|
||||
// {Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
// },
|
||||
//},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
trackID := byte(len(c.codecs))
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
codec := track.Codec
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.muxer == nil {
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
if h264.IsKeyframe(packet.Payload) {
|
||||
c.start = true
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
@@ -73,7 +66,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
}
|
||||
|
||||
var wrapper streamer.WrapperFunc
|
||||
if h264.IsAVC(codec) {
|
||||
if codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
@@ -83,39 +76,51 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecH265:
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
if h265.IsKeyframe(packet.Payload) {
|
||||
c.start = true
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
if !codec.IsMP4() {
|
||||
wrapper := h265.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !codec.IsMP4() {
|
||||
wrapper := aac.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
fmt.Printf("[rtmp] unsupported codec: %+v\n", track.Codec)
|
||||
|
||||
return nil
|
||||
panic("unsupported codec")
|
||||
}
|
||||
|
||||
func (c *Consumer) MimeType() string {
|
||||
@@ -123,12 +128,14 @@ func (c *Consumer) MimeType() string {
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() ([]byte, error) {
|
||||
if c.muxer == nil {
|
||||
c.muxer = &Muxer{}
|
||||
}
|
||||
c.muxer = &Muxer{}
|
||||
return c.muxer.GetInit(c.codecs)
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
c.start = true
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
|
85
pkg/mp4/keyframe.go
Normal file
85
pkg/mp4/keyframe.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type Keyframe struct {
|
||||
streamer.Element
|
||||
|
||||
MimeType string
|
||||
}
|
||||
|
||||
func (c *Keyframe) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264},
|
||||
{Name: streamer.CodecH265},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
muxer := &Muxer{}
|
||||
|
||||
codecs := []*streamer.Codec{track.Codec}
|
||||
|
||||
init, err := muxer.GetInit(codecs)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.MimeType = muxer.MimeType(codecs)
|
||||
|
||||
switch track.Codec.Name {
|
||||
case streamer.CodecH264:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !h264.IsKeyframe(packet.Payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := muxer.Marshal(0, packet)
|
||||
c.Fire(append(init, buf...))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var wrapper streamer.WrapperFunc
|
||||
if track.Codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
}
|
||||
push = wrapper(push)
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecH265:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !h265.IsKeyframe(packet.Payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := muxer.Marshal(0, packet)
|
||||
c.Fire(append(init, buf...))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !track.Codec.IsMP4() {
|
||||
wrapper := h265.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
panic("unsupported codec")
|
||||
}
|
111
pkg/mp4/muxer.go
111
pkg/mp4/muxer.go
@@ -2,10 +2,13 @@ package mp4
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/codec/h265parser"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
@@ -16,22 +19,28 @@ import (
|
||||
|
||||
type Muxer struct {
|
||||
fragIndex uint32
|
||||
dts uint64
|
||||
pts uint32
|
||||
data []byte
|
||||
total int
|
||||
dts []uint64
|
||||
pts []uint32
|
||||
//data []byte
|
||||
//total int
|
||||
}
|
||||
|
||||
func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
|
||||
s := `video/mp4; codecs="`
|
||||
|
||||
for _, codec := range codecs {
|
||||
for i, codec := range codecs {
|
||||
if i > 0 {
|
||||
s += ","
|
||||
}
|
||||
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
s += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
|
||||
case streamer.CodecH265:
|
||||
// +Safari +Chrome +Edge -iOS15 -Android13
|
||||
s += "hvc1.1.6.L93.B0" // hev1.1.6.L93.B0
|
||||
case streamer.CodecAAC:
|
||||
s += "mp4a.40.2"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +50,7 @@ func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
|
||||
func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
moov := MOOV()
|
||||
|
||||
for _, codec := range codecs {
|
||||
for i, codec := range codecs {
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
@@ -59,11 +68,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
width := codecData.Width()
|
||||
height := codecData.Height()
|
||||
|
||||
trak := TRAK()
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.TrackWidth = float64(width)
|
||||
trak.Header.TrackHeight = float64(height)
|
||||
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
|
||||
Flags: 0x000001,
|
||||
}
|
||||
@@ -81,11 +93,6 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
},
|
||||
}
|
||||
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
|
||||
case streamer.CodecH265:
|
||||
@@ -102,11 +109,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
width := codecData.Width()
|
||||
height := codecData.Height()
|
||||
|
||||
trak := TRAK()
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.TrackWidth = float64(width)
|
||||
trak.Header.TrackHeight = float64(height)
|
||||
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
|
||||
Flags: 0x000001,
|
||||
}
|
||||
@@ -124,13 +134,52 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
s := streamer.Between(codec.FmtpLine, "config=", ";")
|
||||
b, err := hex.DecodeString(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
codecData, err := aacparser.ParseMPEG4AudioConfigBytes(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.AlternateGroup = 1
|
||||
trak.Header.Duration = 0
|
||||
trak.Header.Volume = 1
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
SubType: [4]byte{'s', 'o', 'u', 'n'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Sound = &mp4io.SoundMediaInfo{}
|
||||
|
||||
trak.Media.Info.Sample.SampleDesc.MP4ADesc = &mp4io.MP4ADesc{
|
||||
DataRefIdx: 1,
|
||||
NumberOfChannels: int16(codecData.ChannelLayout.Count()),
|
||||
SampleSize: int16(av.FLTP.BytesPerSample() * 4),
|
||||
SampleRate: float64(codecData.SampleRate),
|
||||
Unknowns: []mp4io.Atom{ESDS(b)},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
}
|
||||
|
||||
trex := &mp4io.TrackExtend{
|
||||
TrackId: uint32(i + 1),
|
||||
DefaultSampleDescIdx: 1,
|
||||
DefaultSampleDuration: 0,
|
||||
}
|
||||
moov.MovieExtend.Tracks = append(moov.MovieExtend.Tracks, trex)
|
||||
|
||||
m.pts = append(m.pts, 0)
|
||||
m.dts = append(m.dts, 0)
|
||||
}
|
||||
|
||||
data := make([]byte, moov.Len())
|
||||
@@ -139,14 +188,12 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
return append(FTYP(), data...), nil
|
||||
}
|
||||
|
||||
func (m *Muxer) Rewind() {
|
||||
m.dts = 0
|
||||
m.pts = 0
|
||||
}
|
||||
|
||||
func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
trackID := uint8(1)
|
||||
//func (m *Muxer) Rewind() {
|
||||
// m.dts = 0
|
||||
// m.pts = 0
|
||||
//}
|
||||
|
||||
func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte {
|
||||
run := &mp4fio.TrackFragRun{
|
||||
Flags: 0x000b05,
|
||||
FirstSampleFlags: uint32(fmp4io.SampleNoDependencies),
|
||||
@@ -161,12 +208,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
Tracks: []*mp4fio.TrackFrag{
|
||||
{
|
||||
Header: &mp4fio.TrackFragHeader{
|
||||
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID, 0x01, 0x01, 0x00, 0x00},
|
||||
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID + 1, 0x01, 0x01, 0x00, 0x00},
|
||||
},
|
||||
DecodeTime: &mp4fio.TrackFragDecodeTime{
|
||||
Version: 1,
|
||||
Flags: 0,
|
||||
Time: m.dts,
|
||||
Time: m.dts[trackID],
|
||||
},
|
||||
Run: run,
|
||||
},
|
||||
@@ -179,12 +226,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
}
|
||||
|
||||
newTime := packet.Timestamp
|
||||
if m.pts > 0 {
|
||||
if m.pts[trackID] > 0 {
|
||||
//m.dts += uint64(newTime - m.pts)
|
||||
entry.Duration = newTime - m.pts
|
||||
m.dts += uint64(entry.Duration)
|
||||
entry.Duration = newTime - m.pts[trackID]
|
||||
m.dts[trackID] += uint64(entry.Duration)
|
||||
}
|
||||
m.pts = newTime
|
||||
m.pts[trackID] = newTime
|
||||
|
||||
// important before moof.Len()
|
||||
run.Entries = append(run.Entries, entry)
|
||||
@@ -204,7 +251,7 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
|
||||
m.fragIndex++
|
||||
|
||||
m.total += moofLen + mdatLen
|
||||
//m.total += moofLen + mdatLen
|
||||
|
||||
return buf
|
||||
}
|
||||
|
164
pkg/mp4f/consumer.go
Normal file
164
pkg/mp4f/consumer.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package mp4f
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/mp4f"
|
||||
"github.com/pion/rtp"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
streamer.Element
|
||||
|
||||
UserAgent string
|
||||
RemoteAddr string
|
||||
|
||||
muxer *mp4f.Muxer
|
||||
streams []av.CodecData
|
||||
mimeType string
|
||||
start bool
|
||||
|
||||
send int
|
||||
}
|
||||
|
||||
func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
},
|
||||
},
|
||||
{
|
||||
Kind: streamer.KindAudio,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
codec := track.Codec
|
||||
trackID := int8(len(c.streams))
|
||||
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.mimeType += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
|
||||
c.streams = append(c.streams, stream)
|
||||
|
||||
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
|
||||
|
||||
ts2time := time.Second / time.Duration(codec.ClockRate)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
pkt.Data = packet.Payload
|
||||
newTime := time.Duration(packet.Timestamp) * ts2time
|
||||
if pkt.Time > 0 {
|
||||
pkt.Duration = newTime - pkt.Time
|
||||
}
|
||||
pkt.Time = newTime
|
||||
|
||||
ready, buf, _ := c.muxer.WritePacket(pkt, false)
|
||||
if ready {
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
stream, _ := aacparser.NewCodecDataFromMPEG4AudioConfigBytes([]byte{20, 8})
|
||||
|
||||
c.mimeType += ",mp4a.40.2"
|
||||
c.streams = append(c.streams, stream)
|
||||
|
||||
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
|
||||
|
||||
ts2time := time.Second / time.Duration(codec.ClockRate)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
pkt.Data = packet.Payload
|
||||
newTime := time.Duration(packet.Timestamp) * ts2time
|
||||
if pkt.Time > 0 {
|
||||
pkt.Duration = newTime - pkt.Time
|
||||
}
|
||||
pkt.Time = newTime
|
||||
|
||||
ready, buf, _ := c.muxer.WritePacket(pkt, false)
|
||||
if ready {
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
panic("unsupported codec")
|
||||
}
|
||||
|
||||
func (c *Consumer) MimeType() string {
|
||||
return `video/mp4; codecs="` + c.mimeType + `"`
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() ([]byte, error) {
|
||||
c.muxer = mp4f.NewMuxer(nil)
|
||||
if err := c.muxer.WriteHeader(c.streams); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, data := c.muxer.GetInit(c.streams)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
c.start = true
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
"type": "MSE server consumer",
|
||||
"send": c.send,
|
||||
"remote_addr": c.RemoteAddr,
|
||||
"user_agent": c.UserAgent,
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
}
|
@@ -4,16 +4,24 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/httpflv"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/rtmp"
|
||||
"github.com/pion/rtp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Conn for RTMP and RTMPT (flv over HTTP)
|
||||
type Conn interface {
|
||||
Streams() (streams []av.CodecData, err error)
|
||||
ReadPacket() (pkt av.Packet, err error)
|
||||
Close() (err error)
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
streamer.Element
|
||||
|
||||
@@ -22,7 +30,7 @@ type Client struct {
|
||||
medias []*streamer.Media
|
||||
tracks []*streamer.Track
|
||||
|
||||
conn *rtmp.Conn
|
||||
conn Conn
|
||||
closed bool
|
||||
|
||||
receive int
|
||||
@@ -33,7 +41,12 @@ func NewClient(uri string) *Client {
|
||||
}
|
||||
|
||||
func (c *Client) Dial() (err error) {
|
||||
c.conn, err = rtmp.Dial(c.URI)
|
||||
if strings.HasPrefix(c.URI, "http") {
|
||||
c.conn, err = httpflv.Dial(c.URI)
|
||||
} else {
|
||||
c.conn, err = rtmp.Dial(c.URI)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -60,7 +73,7 @@ func (c *Client) Dial() (err error) {
|
||||
Name: streamer.CodecH264,
|
||||
ClockRate: 90000,
|
||||
FmtpLine: fmtp,
|
||||
PayloadType: h264.PayloadTypeAVC,
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
@@ -79,17 +92,13 @@ func (c *Client) Dial() (err error) {
|
||||
// TODO: fix support
|
||||
cd := stream.(aacparser.CodecData)
|
||||
|
||||
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
|
||||
fmtp := fmt.Sprintf(
|
||||
"config=%s",
|
||||
hex.EncodeToString(cd.ConfigBytes),
|
||||
)
|
||||
|
||||
codec := &streamer.Codec{
|
||||
Name: streamer.CodecAAC,
|
||||
ClockRate: uint32(cd.Config.SampleRate),
|
||||
Channels: uint16(cd.Config.ChannelConfig),
|
||||
FmtpLine: fmtp,
|
||||
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
|
||||
FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes),
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
|
@@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
streamer.JSONReceive: c.receive,
|
||||
streamer.JSONType: "RTMP client producer",
|
||||
streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
|
||||
//streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
|
||||
"url": c.URI,
|
||||
}
|
||||
for i, media := range c.medias {
|
||||
|
118
pkg/rtsp/conn.go
118
pkg/rtsp/conn.go
@@ -7,6 +7,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
@@ -43,8 +44,6 @@ const (
|
||||
ModeServerConsumer
|
||||
)
|
||||
|
||||
const KeepAlive = time.Second * 25
|
||||
|
||||
type Conn struct {
|
||||
streamer.Element
|
||||
|
||||
@@ -60,6 +59,7 @@ type Conn struct {
|
||||
// internal
|
||||
|
||||
auth *tcp.Auth
|
||||
closed bool
|
||||
conn net.Conn
|
||||
mode Mode
|
||||
reader *bufio.Reader
|
||||
@@ -115,9 +115,7 @@ func (c *Conn) Dial() (err error) {
|
||||
_ = c.parseURI()
|
||||
}
|
||||
|
||||
c.conn, err = net.DialTimeout(
|
||||
"tcp", c.URL.Host, 10*time.Second,
|
||||
)
|
||||
c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -362,21 +360,25 @@ func (c *Conn) SetupMedia(
|
||||
var res *tcp.Response
|
||||
res, err = c.Do(req)
|
||||
if err != nil {
|
||||
// Dahua VTO2111D fail on this step because of backchannel
|
||||
// some Dahua/Amcrest cameras fail here because two simultaneous
|
||||
// backchannel connections
|
||||
if c.Backchannel {
|
||||
if err = c.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Backchannel = false
|
||||
if err = c.Describe(); err != nil {
|
||||
if err := c.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err = c.Do(req)
|
||||
if err := c.Describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, newMedia := range c.Medias {
|
||||
if newMedia.Control == media.Control {
|
||||
return c.SetupMedia(newMedia, newMedia.Codecs[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.Session == "" {
|
||||
@@ -455,24 +457,19 @@ func (c *Conn) Teardown() (err error) {
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
if err := c.Teardown(); err != nil {
|
||||
return err
|
||||
}
|
||||
conn := c.conn
|
||||
c.conn = nil
|
||||
return conn.Close()
|
||||
c.closed = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
const transport = "RTP/AVP/TCP;unicast;interleaved="
|
||||
|
||||
func (c *Conn) Accept() error {
|
||||
//if c.state != StateServerInit {
|
||||
// panic("wrong state")
|
||||
//}
|
||||
|
||||
for {
|
||||
req, err := tcp.ReadRequest(c.reader)
|
||||
if err != nil {
|
||||
@@ -575,7 +572,7 @@ func (c *Conn) Accept() error {
|
||||
Request: req,
|
||||
}
|
||||
|
||||
if tr[:len(transport)] == transport {
|
||||
if strings.HasPrefix(tr, transport) {
|
||||
c.Session = "1" // TODO: fixme
|
||||
res.Header.Set("Transport", tr[:len(transport)+3])
|
||||
} else {
|
||||
@@ -598,16 +595,44 @@ func (c *Conn) Accept() error {
|
||||
|
||||
func (c *Conn) Handle() (err error) {
|
||||
defer func() {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
err = nil
|
||||
} else {
|
||||
// may have gotten here because of the deadline
|
||||
// so close the connection to stop keepalive
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
//c.Fire(streamer.StateNull)
|
||||
}()
|
||||
|
||||
//c.Fire(streamer.StatePlaying)
|
||||
ts := time.Now().Add(KeepAlive)
|
||||
var timeout time.Duration
|
||||
|
||||
switch c.mode {
|
||||
case ModeClientProducer:
|
||||
// polling frames from remote RTSP Server (ex Camera)
|
||||
timeout = time.Second * 5
|
||||
go c.keepalive()
|
||||
|
||||
case ModeServerProducer:
|
||||
// polling frames from remote RTSP Client (ex FFmpeg)
|
||||
timeout = time.Second * 15
|
||||
|
||||
case ModeServerConsumer:
|
||||
// pushing frames to remote RTSP Client (ex VLC)
|
||||
timeout = time.Second * 60
|
||||
|
||||
default:
|
||||
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
|
||||
}
|
||||
|
||||
for {
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// we can read:
|
||||
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
||||
// 2. RTSP response: RTSP/1.0 200 OK
|
||||
@@ -685,16 +710,19 @@ func (c *Conn) Handle() (err error) {
|
||||
|
||||
c.Fire(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keep-alive
|
||||
now := time.Now()
|
||||
if now.After(ts) {
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
// don't need to wait respose on this request
|
||||
if err = c.Request(req); err != nil {
|
||||
return err
|
||||
}
|
||||
ts = now.Add(KeepAlive)
|
||||
func (c *Conn) keepalive() {
|
||||
// TODO: rewrite to RTCP
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
for {
|
||||
time.Sleep(time.Second * 25)
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
if err := c.Request(req); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -712,20 +740,16 @@ func (c *Conn) bindTrack(
|
||||
track *streamer.Track, channel uint8, payloadType uint8,
|
||||
) *streamer.Track {
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
packet.Header.PayloadType = payloadType
|
||||
//packet.Header.PayloadType = 100
|
||||
//packet.Header.PayloadType = 8
|
||||
//packet.Header.PayloadType = 106
|
||||
|
||||
size := packet.MarshalSize()
|
||||
|
||||
data := make([]byte, 4+size)
|
||||
data[0] = '$'
|
||||
data[1] = channel
|
||||
//data[1] = 10
|
||||
binary.BigEndian.PutUint16(data[2:], uint16(size))
|
||||
|
||||
if _, err := packet.MarshalTo(data[4:]); err != nil {
|
||||
@@ -741,9 +765,15 @@ func (c *Conn) bindTrack(
|
||||
return nil
|
||||
}
|
||||
|
||||
if h264.IsAVC(track.Codec) {
|
||||
wrapper := h264.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
if track.Codec.IsMP4() {
|
||||
switch track.Codec.Name {
|
||||
case streamer.CodecH264:
|
||||
wrapper := h264.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
case streamer.CodecAAC:
|
||||
wrapper := aac.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
}
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
@@ -2,6 +2,7 @@ package rtsp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strconv"
|
||||
)
|
||||
@@ -27,13 +28,16 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
|
||||
}
|
||||
|
||||
func (c *Conn) Start() error {
|
||||
if c.mode == ModeServerProducer {
|
||||
return nil
|
||||
switch c.mode {
|
||||
case ModeClientProducer:
|
||||
if err := c.Play(); err != nil {
|
||||
return err
|
||||
}
|
||||
case ModeServerProducer:
|
||||
default:
|
||||
return fmt.Errorf("start wrong mode: %d", c.mode)
|
||||
}
|
||||
|
||||
if err := c.Play(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Handle()
|
||||
}
|
||||
|
||||
|
@@ -35,6 +35,8 @@ const (
|
||||
CodecMPA = "MPA" // payload: 14
|
||||
)
|
||||
|
||||
const PayloadTypeMP4 byte = 255
|
||||
|
||||
func GetKind(name string) string {
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
|
||||
@@ -75,13 +77,13 @@ func (m *Media) AV() bool {
|
||||
return m.Kind == KindVideo || m.Kind == KindAudio
|
||||
}
|
||||
|
||||
func (m *Media) MatchCodec(codec *Codec) bool {
|
||||
func (m *Media) MatchCodec(codec *Codec) *Codec {
|
||||
for _, c := range m.Codecs {
|
||||
if c.Match(codec) {
|
||||
return true
|
||||
return c
|
||||
}
|
||||
}
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Media) MatchMedia(media *Media) *Codec {
|
||||
@@ -127,22 +129,6 @@ type Codec struct {
|
||||
PayloadType uint8
|
||||
}
|
||||
|
||||
func NewCodec(name string) *Codec {
|
||||
name = strings.ToUpper(name)
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
|
||||
return &Codec{Name: name, ClockRate: 90000}
|
||||
case CodecPCMU, CodecPCMA:
|
||||
return &Codec{Name: name, ClockRate: 8000}
|
||||
case CodecOpus:
|
||||
return &Codec{Name: name, ClockRate: 48000, Channels: 2}
|
||||
case "MJPEG":
|
||||
return &Codec{Name: CodecJPEG, ClockRate: 90000}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("unsupported codec: %s", name))
|
||||
}
|
||||
|
||||
func (c *Codec) String() string {
|
||||
s := fmt.Sprintf("%d %s/%d", c.PayloadType, c.Name, c.ClockRate)
|
||||
if c.Channels > 0 {
|
||||
@@ -151,6 +137,10 @@ func (c *Codec) String() string {
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *Codec) IsMP4() bool {
|
||||
return c.PayloadType == PayloadTypeMP4
|
||||
}
|
||||
|
||||
func (c *Codec) Clone() *Codec {
|
||||
clone := *c
|
||||
return &clone
|
||||
|
@@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc
|
||||
type Track struct {
|
||||
Codec *Codec
|
||||
Direction string
|
||||
Sink map[*Track]WriterFunc
|
||||
mx sync.Mutex
|
||||
sink map[*Track]WriterFunc
|
||||
sinkMu sync.RWMutex
|
||||
}
|
||||
|
||||
func (t *Track) String() string {
|
||||
s := t.Codec.String()
|
||||
s += fmt.Sprintf(", sinks=%d", len(t.Sink))
|
||||
s += fmt.Sprintf(", sinks=%d", len(t.sink))
|
||||
return s
|
||||
}
|
||||
|
||||
func (t *Track) WriteRTP(p *rtp.Packet) error {
|
||||
t.mx.Lock()
|
||||
for _, f := range t.Sink {
|
||||
t.sinkMu.RLock()
|
||||
for _, f := range t.sink {
|
||||
_ = f(p)
|
||||
}
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Track) Bind(w WriterFunc) *Track {
|
||||
t.mx.Lock()
|
||||
t.sinkMu.Lock()
|
||||
|
||||
if t.Sink == nil {
|
||||
t.Sink = map[*Track]WriterFunc{}
|
||||
if t.sink == nil {
|
||||
t.sink = map[*Track]WriterFunc{}
|
||||
}
|
||||
|
||||
clone := &Track{
|
||||
Codec: t.Codec, Direction: t.Direction, Sink: t.Sink,
|
||||
Codec: t.Codec, Direction: t.Direction, sink: t.sink,
|
||||
}
|
||||
t.Sink[clone] = w
|
||||
t.sink[clone] = w
|
||||
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.Unlock()
|
||||
|
||||
return clone
|
||||
}
|
||||
|
||||
func (t *Track) Unbind() {
|
||||
t.mx.Lock()
|
||||
delete(t.Sink, t)
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.Lock()
|
||||
delete(t.sink, t)
|
||||
t.sinkMu.Unlock()
|
||||
}
|
||||
|
||||
func (t *Track) GetSink(from *Track) {
|
||||
t.sink = from.sink
|
||||
}
|
||||
|
||||
func (t *Track) HasSink() bool {
|
||||
t.sinkMu.RLock()
|
||||
defer t.sinkMu.RUnlock()
|
||||
return len(t.sink) > 0
|
||||
}
|
||||
|
@@ -57,7 +57,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.
|
||||
wrapper := h264.RTPPay(1200)
|
||||
push = wrapper(push)
|
||||
|
||||
if h264.IsAVC(codec) {
|
||||
if codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewCandidate(address string) (string, error) {
|
||||
@@ -38,7 +39,7 @@ func NewCandidate(address string) (string, error) {
|
||||
|
||||
func LookupIP(address string) (string, error) {
|
||||
if strings.HasPrefix(address, "stun:") {
|
||||
ip, err := GetPublicIP()
|
||||
ip, err := GetCachedPublicIP()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -63,11 +64,20 @@ func LookupIP(address string) (string, error) {
|
||||
|
||||
// GetPublicIP example from https://github.com/pion/stun
|
||||
func GetPublicIP() (net.IP, error) {
|
||||
c, err := stun.Dial("udp", "stun.l.google.com:19302")
|
||||
conn, err := net.Dial("udp", "stun.l.google.com:19302")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := stun.NewClient(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = conn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res stun.Event
|
||||
|
||||
message := stun.MustBuild(stun.TransactionID, stun.BindingRequest)
|
||||
@@ -90,6 +100,24 @@ func GetPublicIP() (net.IP, error) {
|
||||
return xorAddr.IP, nil
|
||||
}
|
||||
|
||||
var cachedIP net.IP
|
||||
var cachedTS time.Time
|
||||
|
||||
func GetCachedPublicIP() (net.IP, error) {
|
||||
now := time.Now()
|
||||
if now.After(cachedTS) {
|
||||
newIP, err := GetPublicIP()
|
||||
if err == nil {
|
||||
cachedIP = newIP
|
||||
cachedTS = now.Add(time.Minute * 5)
|
||||
} else if cachedIP == nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return cachedIP, nil
|
||||
}
|
||||
|
||||
func IsIP(host string) bool {
|
||||
for _, i := range host {
|
||||
if i >= 'A' {
|
||||
|
@@ -53,3 +53,5 @@ pc.ontrack = ev => {
|
||||
- https://www.webrtc-experiment.com/DetectRTC/
|
||||
- https://divtable.com/table-styler/
|
||||
- https://www.chromium.org/audio-video/
|
||||
- https://web.dev/i18n/en/fast-playback-with-preload/#manual_buffering
|
||||
- https://developer.mozilla.org/en-US/docs/Web/API/Media_Source_Extensions_API
|
||||
|
@@ -70,6 +70,7 @@
|
||||
'<a href="api/stream.mp4?src={name}">mp4</a>',
|
||||
'<a href="api/frame.mp4?src={name}">frame</a>',
|
||||
`<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`,
|
||||
'<a href="api/stream.mjpeg?src={name}">mjpeg</a>',
|
||||
'<a href="api/streams?src={name}">info</a>',
|
||||
];
|
||||
|
||||
|
122
www/mse.html
122
www/mse.html
@@ -25,93 +25,75 @@
|
||||
<!-- muted is important for autoplay -->
|
||||
<video id="video" autoplay controls playsinline muted></video>
|
||||
<script>
|
||||
const video = document.querySelector('#video');
|
||||
|
||||
// support api_path
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.binaryType = "arraybuffer";
|
||||
const video = document.querySelector('#video');
|
||||
|
||||
let mediaSource;
|
||||
function init() {
|
||||
let mediaSource, sourceBuffer, queueBuffer = [];
|
||||
|
||||
ws.onopen = () => {
|
||||
console.log("Start WS");
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.binaryType = "arraybuffer";
|
||||
|
||||
// https://web.dev/i18n/en/fast-playback-with-preload/#manual_buffering
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/Media_Source_Extensions_API
|
||||
mediaSource = new MediaSource();
|
||||
video.src = URL.createObjectURL(mediaSource);
|
||||
mediaSource.onsourceopen = () => {
|
||||
console.debug("mediaSource.onsourceopen");
|
||||
|
||||
mediaSource.onsourceopen = null;
|
||||
URL.revokeObjectURL(video.src);
|
||||
ws.send(JSON.stringify({"type": "mse"}));
|
||||
ws.onopen = () => {
|
||||
mediaSource = new MediaSource();
|
||||
video.src = URL.createObjectURL(mediaSource);
|
||||
mediaSource.onsourceopen = () => {
|
||||
mediaSource.onsourceopen = null;
|
||||
URL.revokeObjectURL(video.src);
|
||||
ws.send(JSON.stringify({"type": "mse"}));
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
let sourceBuffer, queueBuffer = [];
|
||||
ws.onmessage = ev => {
|
||||
if (typeof ev.data === 'string') {
|
||||
const data = JSON.parse(ev.data);
|
||||
console.debug("ws.onmessage", data);
|
||||
|
||||
ws.onmessage = ev => {
|
||||
if (typeof ev.data === 'string') {
|
||||
const data = JSON.parse(ev.data);
|
||||
console.debug("ws.onmessage", data);
|
||||
|
||||
if (data.type === "mse") {
|
||||
sourceBuffer = mediaSource.addSourceBuffer(data.value);
|
||||
// important: segments supports TrackFragDecodeTime
|
||||
// sequence supports only TrackFragRunEntry Duration
|
||||
sourceBuffer.mode = "segments";
|
||||
sourceBuffer.onupdateend = () => {
|
||||
if (!sourceBuffer.updating && queueBuffer.length > 0) {
|
||||
sourceBuffer.appendBuffer(queueBuffer.shift());
|
||||
if (data.type === "mse") {
|
||||
sourceBuffer = mediaSource.addSourceBuffer(data.value);
|
||||
sourceBuffer.mode = "segments"; // segments or sequence
|
||||
sourceBuffer.onupdateend = () => {
|
||||
if (!sourceBuffer.updating && queueBuffer.length > 0) {
|
||||
try {
|
||||
sourceBuffer.appendBuffer(queueBuffer.shift());
|
||||
} catch (e) {
|
||||
// console.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (sourceBuffer.updating) {
|
||||
queueBuffer.push(ev.data)
|
||||
} else if (sourceBuffer.updating || queueBuffer.length > 0) {
|
||||
queueBuffer.push(ev.data);
|
||||
} else {
|
||||
sourceBuffer.appendBuffer(ev.data);
|
||||
try {
|
||||
sourceBuffer.appendBuffer(ev.data);
|
||||
} catch (e) {
|
||||
// console.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (video.seekable.length > 0) {
|
||||
const delay = video.seekable.end(video.seekable.length - 1) - video.currentTime;
|
||||
if (delay < 1) {
|
||||
video.playbackRate = 1;
|
||||
} else if (delay > 10) {
|
||||
video.playbackRate = 10;
|
||||
} else if (delay > 2) {
|
||||
video.playbackRate = Math.floor(delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
video.onpause = () => {
|
||||
ws.close();
|
||||
setTimeout(init, 0);
|
||||
}
|
||||
}
|
||||
|
||||
let offsetTime = 1, noWaiting = 0;
|
||||
|
||||
setInterval(() => {
|
||||
if (video.paused || video.seekable.length === 0) return;
|
||||
|
||||
if (noWaiting < 0) {
|
||||
offsetTime = Math.min(offsetTime * 1.1, 5);
|
||||
console.debug("offset time up:", offsetTime);
|
||||
} else if (noWaiting >= 30) {
|
||||
noWaiting = 0;
|
||||
offsetTime = Math.max(offsetTime * 0.9, 0.5);
|
||||
console.debug("offset time down:", offsetTime);
|
||||
}
|
||||
noWaiting += 1;
|
||||
|
||||
const endTime = video.seekable.end(video.seekable.length - 1);
|
||||
let playbackRate = (endTime - video.currentTime) / offsetTime;
|
||||
if (playbackRate < 0.1) {
|
||||
// video.currentTime = endTime - offsetTime;
|
||||
playbackRate = 0.1;
|
||||
} else if (playbackRate > 10) {
|
||||
// video.currentTime = endTime - offsetTime;
|
||||
playbackRate = 10;
|
||||
}
|
||||
// https://github.com/GoogleChrome/developer.chrome.com/issues/135
|
||||
video.playbackRate = playbackRate;
|
||||
}, 1000);
|
||||
|
||||
video.onwaiting = () => {
|
||||
const endTime = video.seekable.end(video.seekable.length - 1);
|
||||
video.currentTime = endTime - offsetTime;
|
||||
noWaiting = -1;
|
||||
}
|
||||
init();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
@@ -25,12 +25,12 @@
|
||||
<body>
|
||||
<video id="video" autoplay controls playsinline muted></video>
|
||||
<script>
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
|
||||
function init(stream) {
|
||||
// support api_path
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.onopen = () => {
|
||||
console.debug('ws.onopen');
|
||||
@@ -51,11 +51,6 @@
|
||||
pc.addIceCandidate({candidate: msg.value, sdpMid: ''});
|
||||
} else if (msg.type === 'webrtc/answer') {
|
||||
pc.setRemoteDescription({type: 'answer', sdp: msg.value});
|
||||
pc.getTransceivers().forEach(t => {
|
||||
if (t.receiver.track.kind === 'audio') {
|
||||
t.currentDirection
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user