mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-09-27 04:36:12 +08:00
Compare commits
60 Commits
v0.1-beta.
...
v0.1-rc.2
Author | SHA1 | Date | |
---|---|---|---|
![]() |
49f6233bde | ||
![]() |
78c5c70c73 | ||
![]() |
32651c74ab | ||
![]() |
5c64d1f847 | ||
![]() |
717af29630 | ||
![]() |
ea18475d31 | ||
![]() |
701a9c69ec | ||
![]() |
c06253c8b2 | ||
![]() |
3a07e9fa03 | ||
![]() |
e1bc30fab3 | ||
![]() |
d16ae0972f | ||
![]() |
8b93c97e69 | ||
![]() |
d8158bc1e3 | ||
![]() |
f4f588d2c6 | ||
![]() |
e287b52808 | ||
![]() |
ff96257252 | ||
![]() |
909f21b7e4 | ||
![]() |
7d6a5b44f8 | ||
![]() |
278f7696b6 | ||
![]() |
3cbf2465ae | ||
![]() |
e9ea7a0b1f | ||
![]() |
0231fc3a90 | ||
![]() |
9ef2633840 | ||
![]() |
5a8df3e90a | ||
![]() |
a31cbec3eb | ||
![]() |
54f547977e | ||
![]() |
65d91e02bd | ||
![]() |
7fc3f0f641 | ||
![]() |
7725d5ed31 | ||
![]() |
6c1b9daa8b | ||
![]() |
6d432574bf | ||
![]() |
616f69c88b | ||
![]() |
f72440712b | ||
![]() |
ceed146fb8 | ||
![]() |
f17dadbbbf | ||
![]() |
3d4514eab9 | ||
![]() |
2629dccb81 | ||
![]() |
04f1aa2900 | ||
![]() |
0dacdea1c3 | ||
![]() |
24082b1616 | ||
![]() |
7964b1743b | ||
![]() |
49773a1ece | ||
![]() |
c97a48a73f | ||
![]() |
e03231ebb4 | ||
![]() |
649525a842 | ||
![]() |
d411c1a25c | ||
![]() |
2f0bcf4ae0 | ||
![]() |
831c504cab | ||
![]() |
12925a6bc5 | ||
![]() |
e50e929150 | ||
![]() |
d0c87e0379 | ||
![]() |
247b61790e | ||
![]() |
2ec618334a | ||
![]() |
6f9976c806 | ||
![]() |
17b3a4cf3a | ||
![]() |
ba30f46c02 | ||
![]() |
4134f2a89c | ||
![]() |
a81160bea1 | ||
![]() |
80392acb78 | ||
![]() |
5afac513b4 |
34
README.md
34
README.md
@@ -172,6 +172,8 @@ streams:
|
||||
- rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1
|
||||
```
|
||||
|
||||
**PS.** For disable bachannel just add `#backchannel=0` to end of RTSP link.
|
||||
|
||||
#### Source: RTMP
|
||||
|
||||
You can get stream from RTMP server, for example [Frigate](https://docs.frigate.video/configuration/rtmp). Support ONLY `H264` video codec without audio.
|
||||
@@ -385,13 +387,13 @@ ngrok:
|
||||
command: ...
|
||||
```
|
||||
|
||||
**Own TCP-tunnel**
|
||||
**Hard tech way 1. Own TCP-tunnel**
|
||||
|
||||
If you have personal VPS, you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
|
||||
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can create TCP-tunnel and setup in the same way as "Static public IP". But use your VPS IP-address in YAML config.
|
||||
|
||||
**Using TURN-server**
|
||||
**Hard tech way 2. Using TURN-server**
|
||||
|
||||
TODO...
|
||||
If you have personal [VPS](https://en.wikipedia.org/wiki/Virtual_private_server), you can install TURN server (e.g. [coturn](https://github.com/coturn/coturn), config [example](https://github.com/AlexxIT/WebRTC/wiki/Coturn-Example)).
|
||||
|
||||
```yaml
|
||||
webrtc:
|
||||
@@ -547,6 +549,30 @@ If you need Web interface protection without Home Assistant Add-on - you need to
|
||||
|
||||
PS. Additionally WebRTC opens a lot of random UDP ports for transmit encrypted media. They work without problems on the local network. And sometimes work for external access, even if you haven't opened ports on your router. But for stable external WebRTC access, you need to configure the TCP port.
|
||||
|
||||
## Codecs madness
|
||||
|
||||
`AVC/H.264` codec can be played almost anywhere. But `HEVC/H.265` has a lot of limitations in supporting with different devices and browsers. It's all about patents and money, you can't do anything about it.
|
||||
|
||||
Device | WebRTC | MSE | MP4
|
||||
-------|--------|-----|----
|
||||
*latency* | best | medium | bad
|
||||
Desktop Chrome | H264 | H264, H265* | H264, H265*
|
||||
Desktop Safari | H264, H265* | H264 | no
|
||||
Desktop Edge | H264 | H264, H265* | H264, H265*
|
||||
Desktop Firefox | H264 | H264 | H264
|
||||
Desktop Opera | no | H264 | H264
|
||||
iPhone Safari | H264, H265* | no | no
|
||||
iPad Safari | H264, H265* | H264 | no
|
||||
Android Chrome | H264 | H264 | H264
|
||||
masOS Hass App | no | no | no
|
||||
|
||||
- WebRTC audio codecs: `PCMU/8000`, `PCMA/8000`, `OPUS/48000/2`
|
||||
- MSE/MP4 audio codecs: not supported yet (should be: `AAC`)
|
||||
- Chrome H265: [read this](https://github.com/StaZhu/enable-chromium-hevc-hardware-decoding)
|
||||
- Edge H265: [read this](https://www.reddit.com/r/MicrosoftEdge/comments/v9iw8k/enable_hevc_support_in_edge/)
|
||||
- Desktop Safari H265: Menu > Develop > Experimental > WebRTC H265
|
||||
- iOS Safari H265: Settings > Safari > Advanced > Experimental > WebRTC H265
|
||||
|
||||
## FAQ
|
||||
|
||||
**Q. What's the difference between go2rtc, WebRTC Camera and RTSPtoWebRTC?**
|
||||
|
@@ -85,10 +85,15 @@ var wsHandlers = make(map[string]WSHandler)
|
||||
|
||||
func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
name := r.URL.Query().Get("name")
|
||||
|
||||
if name == "" {
|
||||
name = src
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case "PUT":
|
||||
streams.New(src, src)
|
||||
streams.New(name, src)
|
||||
return
|
||||
case "DELETE":
|
||||
streams.Delete(src)
|
||||
|
@@ -64,22 +64,14 @@ func (ctx *Context) Close() {
|
||||
|
||||
func (ctx *Context) Write(msg interface{}) {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
|
||||
var err error
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *streamer.Message:
|
||||
err = ctx.Conn.WriteJSON(msg)
|
||||
case []byte:
|
||||
err = ctx.Conn.WriteMessage(websocket.BinaryMessage, msg)
|
||||
default:
|
||||
return
|
||||
if data, ok := msg.([]byte); ok {
|
||||
_ = ctx.Conn.WriteMessage(websocket.BinaryMessage, data)
|
||||
} else {
|
||||
_ = ctx.Conn.WriteJSON(msg)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
//panic(err) // TODO: fix panic
|
||||
}
|
||||
ctx.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ctx *Context) Error(err error) {
|
||||
|
@@ -3,6 +3,7 @@ package app
|
||||
import (
|
||||
"flag"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io"
|
||||
"os"
|
||||
@@ -30,10 +31,18 @@ func Init() {
|
||||
}
|
||||
}
|
||||
|
||||
log.Logger = NewLogger(cfg.Mod["format"], cfg.Mod["level"])
|
||||
|
||||
modules = cfg.Mod
|
||||
|
||||
path, _ := os.Getwd()
|
||||
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
|
||||
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
|
||||
}
|
||||
|
||||
func NewLogger(format string, level string) zerolog.Logger {
|
||||
var writer io.Writer = os.Stdout
|
||||
|
||||
// styles
|
||||
format := cfg.Mod["format"]
|
||||
if format != "json" {
|
||||
writer = zerolog.ConsoleWriter{
|
||||
Out: writer, TimeFormat: "15:04:05.000",
|
||||
@@ -43,18 +52,12 @@ func Init() {
|
||||
|
||||
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
|
||||
|
||||
lvl, err := zerolog.ParseLevel(cfg.Mod["level"])
|
||||
lvl, err := zerolog.ParseLevel(level)
|
||||
if err != nil || lvl == zerolog.NoLevel {
|
||||
lvl = zerolog.InfoLevel
|
||||
}
|
||||
|
||||
log = zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
|
||||
|
||||
modules = cfg.Mod
|
||||
|
||||
path, _ := os.Getwd()
|
||||
log.Debug().Str("os", runtime.GOOS).Str("arch", runtime.GOARCH).
|
||||
Str("cwd", path).Int("conf_size", len(data)).Msgf("[app]")
|
||||
return zerolog.New(writer).With().Timestamp().Logger().Level(lvl)
|
||||
}
|
||||
|
||||
func LoadConfig(v interface{}) {
|
||||
@@ -68,15 +71,13 @@ func LoadConfig(v interface{}) {
|
||||
func GetLogger(module string) zerolog.Logger {
|
||||
if s, ok := modules[module]; ok {
|
||||
lvl, err := zerolog.ParseLevel(s)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("[log]")
|
||||
return log
|
||||
if err == nil {
|
||||
return log.Level(lvl)
|
||||
}
|
||||
|
||||
return log.Level(lvl)
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
}
|
||||
|
||||
return log
|
||||
return log.Logger
|
||||
}
|
||||
|
||||
// internal
|
||||
@@ -84,8 +85,5 @@ func GetLogger(module string) zerolog.Logger {
|
||||
// data - config content
|
||||
var data []byte
|
||||
|
||||
// log - main logger
|
||||
var log zerolog.Logger
|
||||
|
||||
// modules log levels
|
||||
var modules map[string]string
|
||||
|
@@ -21,6 +21,7 @@ var stackSkip = [][]byte{
|
||||
[]byte("created by net/http.(*Server).Serve"), // TODO: why two?
|
||||
|
||||
[]byte("created by github.com/AlexxIT/go2rtc/cmd/rtsp.Init"),
|
||||
[]byte("created by github.com/AlexxIT/go2rtc/cmd/srtp.Init"),
|
||||
|
||||
// webrtc/api.go
|
||||
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -23,22 +24,22 @@ func Init() {
|
||||
return
|
||||
}
|
||||
|
||||
rtsp.OnProducer = func(prod streamer.Producer) bool {
|
||||
if conn := prod.(*pkg.Conn); conn != nil {
|
||||
if waiter := waiters[conn.URL.Path]; waiter != nil {
|
||||
waiter <- prod
|
||||
return true
|
||||
}
|
||||
rtsp.HandleFunc(func(conn *pkg.Conn) bool {
|
||||
waitersMu.Lock()
|
||||
waiter := waiters[conn.URL.Path]
|
||||
waitersMu.Unlock()
|
||||
|
||||
if waiter == nil {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
waiter <- conn
|
||||
return true
|
||||
})
|
||||
|
||||
streams.HandleFunc("exec", Handle)
|
||||
|
||||
log = app.GetLogger("exec")
|
||||
|
||||
// TODO: add sync.Mutex
|
||||
waiters = map[string]chan streamer.Producer{}
|
||||
}
|
||||
|
||||
func Handle(url string) (streamer.Producer, error) {
|
||||
@@ -60,8 +61,15 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
|
||||
ch := make(chan streamer.Producer)
|
||||
|
||||
waitersMu.Lock()
|
||||
waiters[path] = ch
|
||||
defer delete(waiters, path)
|
||||
waitersMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
waitersMu.Lock()
|
||||
delete(waiters, path)
|
||||
waitersMu.Unlock()
|
||||
}()
|
||||
|
||||
log.Debug().Str("url", url).Msg("[exec] run")
|
||||
|
||||
@@ -86,4 +94,5 @@ func Handle(url string) (streamer.Producer, error) {
|
||||
// internal
|
||||
|
||||
var log zerolog.Logger
|
||||
var waiters map[string]chan streamer.Producer
|
||||
var waiters = map[string]chan streamer.Producer{}
|
||||
var waitersMu sync.Mutex
|
||||
|
@@ -23,7 +23,7 @@ func Init() {
|
||||
// inputs
|
||||
"file": "-re -stream_loop -1 -i {input}",
|
||||
"http": "-fflags nobuffer -flags low_delay -i {input}",
|
||||
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -i {input}",
|
||||
"rtsp": "-fflags nobuffer -flags low_delay -rtsp_transport tcp -timeout 5000000 -i {input}",
|
||||
|
||||
// output
|
||||
"output": "-rtsp_transport tcp -f rtsp {output}",
|
||||
@@ -121,7 +121,7 @@ func Init() {
|
||||
|
||||
for _, audio := range query["audio"] {
|
||||
if audio == "copy" {
|
||||
s += " -codec:v copy"
|
||||
s += " -codec:a copy"
|
||||
} else {
|
||||
s += " " + tpl[audio]
|
||||
}
|
||||
|
@@ -55,6 +55,10 @@ func initAPI() {
|
||||
// /stream/{id}/channel/0/webrtc
|
||||
default:
|
||||
i := strings.IndexByte(r.RequestURI[8:], '/')
|
||||
if i <= 0 {
|
||||
log.Warn().Msgf("wrong request: %s", r.RequestURI)
|
||||
return
|
||||
}
|
||||
name := r.RequestURI[8 : 8+i]
|
||||
|
||||
stream := streams.Get(name)
|
||||
|
@@ -10,12 +10,47 @@ import (
|
||||
)
|
||||
|
||||
func Init() {
|
||||
api.HandleFunc("api/stream.mjpeg", handler)
|
||||
api.HandleFunc("api/frame.jpeg", handlerKeyframe)
|
||||
api.HandleFunc("api/stream.mjpeg", handlerStream)
|
||||
}
|
||||
|
||||
func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan []byte)
|
||||
|
||||
cons := &mjpeg.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
exit <- msg
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
data := <-exit
|
||||
|
||||
stream.RemoveConsumer(cons)
|
||||
|
||||
w.Header().Set("Content-Type", "image/jpeg")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
const header = "--frame\r\nContent-Type: image/jpeg\r\nContent-Length: "
|
||||
|
||||
func handler(w http.ResponseWriter, r *http.Request) {
|
||||
func handlerStream(w http.ResponseWriter, r *http.Request) {
|
||||
src := r.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
if stream == nil {
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -35,35 +36,29 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
exit := make(chan []byte)
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons := &mp4.Keyframe{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
exit <- msg
|
||||
if data, ok := msg.([]byte); ok && exit != nil {
|
||||
exit <- data
|
||||
exit = nil
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
defer stream.RemoveConsumer(cons)
|
||||
data := <-exit
|
||||
|
||||
w.Header().Set("Content-Type", cons.MimeType())
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] init")
|
||||
return
|
||||
}
|
||||
data = append(data, <-exit...)
|
||||
stream.RemoveConsumer(cons)
|
||||
|
||||
// Apple Safari won't show frame without length
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
|
||||
w.Header().Set("Content-Type", cons.MimeType)
|
||||
|
||||
if _, err := w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.keyframe] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,20 +75,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan struct{})
|
||||
exit := make(chan error)
|
||||
|
||||
cons := &mp4.Consumer{}
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg := msg.(type) {
|
||||
case []byte:
|
||||
if _, err := w.Write(msg); err != nil {
|
||||
exit <- struct{}{}
|
||||
if data, ok := msg.([]byte); ok {
|
||||
if _, err := w.Write(data); err != nil && exit != nil {
|
||||
exit <- err
|
||||
exit = nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] add consumer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,18 +98,36 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] init")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = w.Write(data); err != nil {
|
||||
log.Error().Err(err).Msg("[api.mp4] write")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
<-exit
|
||||
cons.Start()
|
||||
|
||||
log.Trace().Msg("[api.mp4] close")
|
||||
var duration *time.Timer
|
||||
if s := r.URL.Query().Get("duration"); s != "" {
|
||||
if i, _ := strconv.Atoi(s); i > 0 {
|
||||
duration = time.AfterFunc(time.Second*time.Duration(i), func() {
|
||||
if exit != nil {
|
||||
exit <- nil
|
||||
exit = nil
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
err = <-exit
|
||||
|
||||
log.Trace().Err(err).Caller().Send()
|
||||
|
||||
if duration != nil {
|
||||
duration.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func isChromeFirst(w http.ResponseWriter, r *http.Request) bool {
|
||||
|
@@ -9,6 +9,8 @@ import (
|
||||
|
||||
const MsgTypeMSE = "mse" // fMP4
|
||||
|
||||
const packetSize = 8192
|
||||
|
||||
func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
src := ctx.Request.URL.Query().Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
@@ -21,14 +23,17 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
cons.RemoteAddr = ctx.Request.RemoteAddr
|
||||
|
||||
cons.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case *streamer.Message, []byte:
|
||||
ctx.Write(msg)
|
||||
if data, ok := msg.([]byte); ok {
|
||||
for len(data) > packetSize {
|
||||
ctx.Write(data[:packetSize])
|
||||
data = data[packetSize:]
|
||||
}
|
||||
ctx.Write(data)
|
||||
}
|
||||
})
|
||||
|
||||
if err := stream.AddConsumer(cons); err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] add consumer")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
@@ -37,16 +42,16 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
|
||||
stream.RemoveConsumer(cons)
|
||||
})
|
||||
|
||||
ctx.Write(&streamer.Message{
|
||||
Type: MsgTypeMSE, Value: cons.MimeType(),
|
||||
})
|
||||
ctx.Write(&streamer.Message{Type: MsgTypeMSE, Value: cons.MimeType()})
|
||||
|
||||
data, err := cons.Init()
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("[api.mse] init")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
ctx.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Write(data)
|
||||
|
||||
cons.Start()
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@ import (
|
||||
|
||||
func Init() {
|
||||
streams.HandleFunc("rtmp", handle)
|
||||
streams.HandleFunc("http", handle)
|
||||
streams.HandleFunc("https", handle)
|
||||
}
|
||||
|
||||
func handle(url string) (streamer.Producer, error) {
|
||||
|
210
cmd/rtsp/rtsp.go
210
cmd/rtsp/rtsp.go
@@ -32,20 +32,43 @@ func Init() {
|
||||
|
||||
// RTSP server support
|
||||
address := conf.Mod.Listen
|
||||
if address != "" {
|
||||
_, Port, _ = net.SplitHostPort(address)
|
||||
|
||||
go worker(address)
|
||||
if address == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[rtsp] listen")
|
||||
return
|
||||
}
|
||||
|
||||
_, Port, _ = net.SplitHostPort(address)
|
||||
|
||||
log.Info().Str("addr", address).Msg("[rtsp] listen")
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go tcpHandler(conn)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
type Handler func(conn *rtsp.Conn) bool
|
||||
|
||||
func HandleFunc(handler Handler) {
|
||||
handlers = append(handlers, handler)
|
||||
}
|
||||
|
||||
var Port string
|
||||
|
||||
var OnProducer func(conn streamer.Producer) bool // TODO: maybe rewrite...
|
||||
|
||||
// internal
|
||||
|
||||
var log zerolog.Logger
|
||||
var handlers []Handler
|
||||
|
||||
func rtspHandler(url string) (streamer.Producer, error) {
|
||||
backchannel := true
|
||||
@@ -84,10 +107,10 @@ func rtspHandler(url string) (streamer.Producer, error) {
|
||||
}
|
||||
|
||||
// second try without backchannel, we need to reconnect
|
||||
conn.Backchannel = false
|
||||
if err = conn.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.Backchannel = false
|
||||
if err = conn.Describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -96,101 +119,89 @@ func rtspHandler(url string) (streamer.Producer, error) {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func worker(address string) {
|
||||
srv, err := tcp.NewServer(address)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("[rtsp] listen")
|
||||
return
|
||||
}
|
||||
func tcpHandler(c net.Conn) {
|
||||
var name string
|
||||
var closer func()
|
||||
|
||||
log.Info().Str("addr", address).Msg("[rtsp] listen")
|
||||
trace := log.Trace().Enabled()
|
||||
|
||||
srv.Listen(func(msg interface{}) {
|
||||
switch msg.(type) {
|
||||
case net.Conn:
|
||||
var name string
|
||||
var onDisconnect func()
|
||||
conn := rtsp.NewServer(c)
|
||||
conn.Listen(func(msg interface{}) {
|
||||
if trace {
|
||||
switch msg := msg.(type) {
|
||||
case *tcp.Request:
|
||||
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
|
||||
case *tcp.Response:
|
||||
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
trace := log.Trace().Enabled()
|
||||
switch msg {
|
||||
case rtsp.MethodDescribe:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
conn := rtsp.NewServer(msg.(net.Conn))
|
||||
conn.Listen(func(msg interface{}) {
|
||||
if trace {
|
||||
switch msg := msg.(type) {
|
||||
case *tcp.Request:
|
||||
log.Trace().Msgf("[rtsp] server request:\n%s", msg)
|
||||
case *tcp.Response:
|
||||
log.Trace().Msgf("[rtsp] server response:\n%s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
switch msg {
|
||||
case rtsp.MethodDescribe:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
|
||||
|
||||
stream := streams.Get(name) // TODO: rewrite
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
initMedias(conn)
|
||||
|
||||
if err = stream.AddConsumer(conn); err != nil {
|
||||
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
|
||||
return
|
||||
}
|
||||
|
||||
onDisconnect = func() {
|
||||
stream.RemoveConsumer(conn)
|
||||
}
|
||||
|
||||
case rtsp.MethodAnnounce:
|
||||
if OnProducer != nil {
|
||||
if OnProducer(conn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
|
||||
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
stream.AddProducer(conn)
|
||||
|
||||
onDisconnect = func() {
|
||||
stream.RemoveProducer(conn)
|
||||
}
|
||||
|
||||
case streamer.StatePlaying:
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] start")
|
||||
}
|
||||
})
|
||||
|
||||
if err = conn.Accept(); err != nil {
|
||||
log.Warn().Err(err).Msg("[rtsp] accept")
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = conn.Handle(); err != nil {
|
||||
//log.Warn().Err(err).Msg("[rtsp] handle server")
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new consumer")
|
||||
|
||||
initMedias(conn)
|
||||
|
||||
if err := stream.AddConsumer(conn); err != nil {
|
||||
log.Warn().Err(err).Str("stream", name).Msg("[rtsp]")
|
||||
return
|
||||
}
|
||||
|
||||
if onDisconnect != nil {
|
||||
onDisconnect()
|
||||
closer = func() {
|
||||
stream.RemoveConsumer(conn)
|
||||
}
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
|
||||
case rtsp.MethodAnnounce:
|
||||
name = conn.URL.Path[1:]
|
||||
|
||||
stream := streams.Get(name)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] new producer")
|
||||
|
||||
stream.AddProducer(conn)
|
||||
|
||||
closer = func() {
|
||||
stream.RemoveProducer(conn)
|
||||
}
|
||||
|
||||
case streamer.StatePlaying:
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] start")
|
||||
}
|
||||
})
|
||||
|
||||
srv.Serve()
|
||||
if err := conn.Accept(); err != nil {
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
for _, handler := range handlers {
|
||||
if handler(conn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if closer != nil {
|
||||
if err := conn.Handle(); err != nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
}
|
||||
|
||||
closer()
|
||||
|
||||
log.Debug().Str("stream", name).Msg("[rtsp] disconnect")
|
||||
}
|
||||
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func initMedias(conn *rtsp.Conn) {
|
||||
@@ -198,16 +209,27 @@ func initMedias(conn *rtsp.Conn) {
|
||||
for key, value := range conn.URL.Query() {
|
||||
switch key {
|
||||
case streamer.KindVideo, streamer.KindAudio:
|
||||
for _, value := range value {
|
||||
for _, name := range value {
|
||||
name = strings.ToUpper(name)
|
||||
|
||||
// check aliases
|
||||
switch name {
|
||||
case "COPY":
|
||||
name = "" // pass empty codecs list
|
||||
case "MJPEG":
|
||||
name = streamer.CodecJPEG
|
||||
case "AAC":
|
||||
name = streamer.CodecAAC
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
Kind: key, Direction: streamer.DirectionRecvonly,
|
||||
}
|
||||
|
||||
switch value {
|
||||
case "", "copy": // pass empty codecs list
|
||||
default:
|
||||
codec := streamer.NewCodec(value)
|
||||
media.Codecs = append(media.Codecs, codec)
|
||||
// empty codecs match all codecs
|
||||
if name != "" {
|
||||
// empty clock rate and channels match any values
|
||||
media.Codecs = []*streamer.Codec{{Name: name}}
|
||||
}
|
||||
|
||||
conn.Medias = append(conn.Medias, media)
|
||||
|
@@ -4,30 +4,36 @@ import (
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Handler func(url string) (streamer.Producer, error)
|
||||
|
||||
var handlers map[string]Handler
|
||||
var handlers = map[string]Handler{}
|
||||
var handlersMu sync.Mutex
|
||||
|
||||
func HandleFunc(scheme string, handler Handler) {
|
||||
if handlers == nil {
|
||||
handlers = make(map[string]Handler)
|
||||
}
|
||||
handlersMu.Lock()
|
||||
handlers[scheme] = handler
|
||||
handlersMu.Unlock()
|
||||
}
|
||||
|
||||
func getHandler(url string) Handler {
|
||||
i := strings.IndexByte(url, ':')
|
||||
if i <= 0 { // TODO: i < 4 ?
|
||||
return nil
|
||||
}
|
||||
handlersMu.Lock()
|
||||
defer handlersMu.Unlock()
|
||||
return handlers[url[:i]]
|
||||
}
|
||||
|
||||
func HasProducer(url string) bool {
|
||||
i := strings.IndexByte(url, ':')
|
||||
if i <= 0 { // TODO: i < 4 ?
|
||||
return false
|
||||
}
|
||||
return handlers[url[:i]] != nil
|
||||
return getHandler(url) != nil
|
||||
}
|
||||
|
||||
func GetProducer(url string) (streamer.Producer, error) {
|
||||
i := strings.IndexByte(url, ':')
|
||||
handler := handlers[url[:i]]
|
||||
handler := getHandler(url)
|
||||
if handler == nil {
|
||||
return nil, fmt.Errorf("unsupported scheme: %s", url)
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type state byte
|
||||
@@ -24,8 +25,9 @@ type Producer struct {
|
||||
element streamer.Producer
|
||||
tracks []*streamer.Track
|
||||
|
||||
state state
|
||||
mx sync.Mutex
|
||||
state state
|
||||
mu sync.Mutex
|
||||
restart *time.Timer
|
||||
}
|
||||
|
||||
func (p *Producer) SetSource(s string) {
|
||||
@@ -36,16 +38,16 @@ func (p *Producer) SetSource(s string) {
|
||||
}
|
||||
|
||||
func (p *Producer) GetMedias() []*streamer.Media {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state == stateNone {
|
||||
log.Debug().Str("url", p.url).Msg("[streams] probe producer")
|
||||
log.Debug().Msgf("[streams] probe producer url=%s", p.url)
|
||||
|
||||
var err error
|
||||
p.element, err = GetProducer(p.url)
|
||||
if err != nil || p.element == nil {
|
||||
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer")
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -56,59 +58,124 @@ func (p *Producer) GetMedias() []*streamer.Media {
|
||||
}
|
||||
|
||||
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state == stateMedias {
|
||||
p.state = stateTracks
|
||||
if p.state == stateNone {
|
||||
return nil
|
||||
}
|
||||
|
||||
track := p.element.GetTrack(media, codec)
|
||||
|
||||
for _, t := range p.tracks {
|
||||
if track == t {
|
||||
for _, track := range p.tracks {
|
||||
if track.Codec == codec {
|
||||
return track
|
||||
}
|
||||
}
|
||||
|
||||
// can't get new tracks after start
|
||||
if p.state == stateStart {
|
||||
return nil
|
||||
}
|
||||
|
||||
track := p.element.GetTrack(media, codec)
|
||||
if track == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.tracks = append(p.tracks, track)
|
||||
|
||||
if p.state == stateMedias {
|
||||
p.state = stateTracks
|
||||
}
|
||||
|
||||
return track
|
||||
}
|
||||
|
||||
// internals
|
||||
|
||||
func (p *Producer) start() {
|
||||
p.mx.Lock()
|
||||
defer p.mx.Unlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state != stateTracks {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Str("url", p.url).Msg("[streams] start producer")
|
||||
log.Debug().Msgf("[streams] start producer url=%s", p.url)
|
||||
|
||||
p.state = stateStart
|
||||
go func() {
|
||||
// safe read element while mu locked
|
||||
if err := p.element.Start(); err != nil {
|
||||
log.Warn().Err(err).Str("url", p.url).Msg("[streams] start")
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Producer) reconnect() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.state != stateStart {
|
||||
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
|
||||
|
||||
var err error
|
||||
p.element, err = GetProducer(p.url)
|
||||
if err != nil || p.element == nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
// TODO: dynamic timeout
|
||||
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
|
||||
return
|
||||
}
|
||||
|
||||
medias := p.element.GetMedias()
|
||||
|
||||
// convert all old producer tracks to new tracks
|
||||
for i, oldTrack := range p.tracks {
|
||||
// match new element medias with old track codec
|
||||
for _, media := range medias {
|
||||
codec := media.MatchCodec(oldTrack.Codec)
|
||||
if codec == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// move sink from old track to new track
|
||||
newTrack := p.element.GetTrack(media, codec)
|
||||
newTrack.GetSink(oldTrack)
|
||||
p.tracks[i] = newTrack
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err = p.element.Start(); err != nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Producer) stop() {
|
||||
p.mx.Lock()
|
||||
p.mu.Lock()
|
||||
|
||||
log.Debug().Str("url", p.url).Msg("[streams] stop producer")
|
||||
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
|
||||
|
||||
if p.element != nil {
|
||||
_ = p.element.Stop()
|
||||
p.element = nil
|
||||
} else {
|
||||
log.Warn().Str("url", p.url).Msg("[streams] stop empty producer")
|
||||
}
|
||||
p.tracks = nil
|
||||
p.state = stateNone
|
||||
if p.restart != nil {
|
||||
p.restart.Stop()
|
||||
p.restart = nil
|
||||
}
|
||||
|
||||
p.mx.Unlock()
|
||||
p.state = stateNone
|
||||
p.tracks = nil
|
||||
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
@@ -14,6 +15,7 @@ type Consumer struct {
|
||||
type Stream struct {
|
||||
producers []*Producer
|
||||
consumers []*Consumer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewStream(source interface{}) *Stream {
|
||||
@@ -51,18 +53,19 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
ic := len(s.consumers)
|
||||
|
||||
consumer := &Consumer{element: cons}
|
||||
var producers []*Producer // matched producers for consumer
|
||||
|
||||
// Step 1. Get consumer medias
|
||||
for icc, consMedia := range cons.GetMedias() {
|
||||
log.Trace().Stringer("media", consMedia).
|
||||
Msgf("[streams] consumer:%d:%d candidate", ic, icc)
|
||||
Msgf("[streams] consumer=%d candidate=%d", ic, icc)
|
||||
|
||||
producers:
|
||||
for ip, prod := range s.producers {
|
||||
// Step 2. Get producer medias (not tracks yet)
|
||||
for ipc, prodMedia := range prod.GetMedias() {
|
||||
log.Trace().Stringer("media", prodMedia).
|
||||
Msgf("[streams] producer:%d:%d candidate", ip, ipc)
|
||||
Msgf("[streams] producer=%d candidate=%d", ip, ipc)
|
||||
|
||||
// Step 3. Match consumer/producer codecs list
|
||||
prodCodec := prodMedia.MatchMedia(consMedia)
|
||||
@@ -81,20 +84,23 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
consTrack := consumer.element.AddTrack(consMedia, prodTrack)
|
||||
|
||||
consumer.tracks = append(consumer.tracks, consTrack)
|
||||
producers = append(producers, prod)
|
||||
break producers
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// can't match tracks for consumer
|
||||
if len(consumer.tracks) == 0 {
|
||||
if len(producers) == 0 {
|
||||
return errors.New("couldn't find the matching tracks")
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.consumers = append(s.consumers, consumer)
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, prod := range s.producers {
|
||||
// there may be duplicates, but that's not a problem
|
||||
for _, prod := range producers {
|
||||
prod.start()
|
||||
}
|
||||
|
||||
@@ -102,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
}
|
||||
|
||||
func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
s.mu.Lock()
|
||||
for i, consumer := range s.consumers {
|
||||
if consumer == nil {
|
||||
log.Warn().Msgf("empty consumer: %+v\n", s)
|
||||
@@ -127,7 +134,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
|
||||
var sink bool
|
||||
for _, track := range producer.tracks {
|
||||
if len(track.Sink) > 0 {
|
||||
if track.HasSink() {
|
||||
sink = true
|
||||
}
|
||||
}
|
||||
@@ -135,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
|
||||
producer.stop()
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) AddProducer(prod streamer.Producer) {
|
||||
producer := &Producer{element: prod, state: stateTracks}
|
||||
s.mu.Lock()
|
||||
s.producers = append(s.producers, producer)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) RemoveProducer(prod streamer.Producer) {
|
||||
s.mu.Lock()
|
||||
for i, producer := range s.producers {
|
||||
if producer.element == prod {
|
||||
s.removeProducer(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Stream) Active() bool {
|
||||
if len(s.consumers) > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, prod := range s.producers {
|
||||
if prod.element != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
//func (s *Stream) Active() bool {
|
||||
// if len(s.consumers) > 0 {
|
||||
// return true
|
||||
// }
|
||||
//
|
||||
// for _, prod := range s.producers {
|
||||
// if prod.element != nil {
|
||||
// return true
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return false
|
||||
//}
|
||||
|
||||
func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||
var v []interface{}
|
||||
s.mu.Lock()
|
||||
for _, prod := range s.producers {
|
||||
if prod.element != nil {
|
||||
v = append(v, prod.element)
|
||||
@@ -176,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||
// cons.element always not nil
|
||||
v = append(v, cons.element)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if len(v) == 0 {
|
||||
v = nil
|
||||
}
|
||||
|
@@ -8,7 +8,9 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/webrtc"
|
||||
pion "github.com/pion/webrtc/v3"
|
||||
"github.com/rs/zerolog"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -55,6 +57,8 @@ func Init() {
|
||||
|
||||
api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
|
||||
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
|
||||
|
||||
api.HandleFunc("api/webrtc", syncHandler)
|
||||
}
|
||||
|
||||
var Port string
|
||||
@@ -137,6 +141,32 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
|
||||
ctx.Consumer = conn
|
||||
}
|
||||
|
||||
func syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||
url := r.URL.Query().Get("src")
|
||||
stream := streams.Get(url)
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// get offer
|
||||
offer, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := ExchangeSDP(stream, string(offer), r.UserAgent())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
return
|
||||
}
|
||||
|
||||
// send SDP to client
|
||||
if _, err = w.Write([]byte(answer)); err != nil {
|
||||
log.Error().Err(err).Caller().Send()
|
||||
}
|
||||
}
|
||||
|
||||
func ExchangeSDP(
|
||||
stream *streams.Stream, offer string, userAgent string,
|
||||
) (answer string, err error) {
|
||||
|
57
pkg/aac/rtp.go
Normal file
57
pkg/aac/rtp.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package aac
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const RTPPacketVersionAAC = 0
|
||||
|
||||
func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
// support ONLY 2 bytes header size!
|
||||
// streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408
|
||||
headersSize := binary.BigEndian.Uint16(packet.Payload) >> 3
|
||||
|
||||
//log.Printf("[RTP/AAC] units: %d, size: %4d, ts: %10d, %t", headersSize/2, len(packet.Payload), packet.Timestamp, packet.Marker)
|
||||
|
||||
clone := *packet
|
||||
clone.Version = RTPPacketVersionAAC
|
||||
clone.Payload = packet.Payload[2+headersSize:]
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
sequencer := rtp.NewRandomSequencer()
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
if packet.Version != RTPPacketVersionAAC {
|
||||
return push(packet)
|
||||
}
|
||||
|
||||
// support ONLY one unit in payload
|
||||
size := uint16(len(packet.Payload))
|
||||
// 2 bytes header size + 2 bytes first payload size
|
||||
payload := make([]byte, 2+2+size)
|
||||
payload[1] = 16 // header size in bits
|
||||
binary.BigEndian.PutUint16(payload[2:], size<<3)
|
||||
copy(payload[4:], packet.Payload)
|
||||
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,3 +1,11 @@
|
||||
# H264
|
||||
|
||||
Access Unit (AU) can contain one or multiple NAL Unit:
|
||||
|
||||
1. [SEI,] SPS, PPS, IFrame, [IFrame...]
|
||||
2. BFrame, [BFrame...]
|
||||
3. IFrame, [IFrame...]
|
||||
|
||||
## RTP H264
|
||||
|
||||
Camera | NALu
|
||||
|
@@ -6,55 +6,38 @@ import (
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const PayloadTypeAVC = 255
|
||||
func EncodeAVC(nals ...[]byte) (avc []byte) {
|
||||
var i, n int
|
||||
|
||||
func IsAVC(codec *streamer.Codec) bool {
|
||||
return codec.PayloadType == PayloadTypeAVC
|
||||
}
|
||||
for _, nal := range nals {
|
||||
if i = len(nal); i > 0 {
|
||||
n += 4 + i
|
||||
}
|
||||
}
|
||||
|
||||
avc = make([]byte, n)
|
||||
|
||||
n = 0
|
||||
for _, nal := range nals {
|
||||
if i = len(nal); i > 0 {
|
||||
binary.BigEndian.PutUint32(avc[n:], uint32(i))
|
||||
n += 4 + copy(avc[n+4:], nal)
|
||||
}
|
||||
}
|
||||
|
||||
func EncodeAVC(raw []byte) (avc []byte) {
|
||||
avc = make([]byte, len(raw)+4)
|
||||
binary.BigEndian.PutUint32(avc, uint32(len(raw)))
|
||||
copy(avc[4:], raw)
|
||||
return
|
||||
}
|
||||
|
||||
func RepairAVC(track *streamer.Track) streamer.WrapperFunc {
|
||||
sps, pps := GetParameterSet(track.Codec.FmtpLine)
|
||||
sps = EncodeAVC(sps)
|
||||
pps = EncodeAVC(pps)
|
||||
ps := EncodeAVC(sps, pps)
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) (err error) {
|
||||
naluType := NALUType(packet.Payload)
|
||||
switch naluType {
|
||||
case NALUTypeSPS:
|
||||
sps = packet.Payload
|
||||
return
|
||||
case NALUTypePPS:
|
||||
pps = packet.Payload
|
||||
return
|
||||
if NALUType(packet.Payload) == NALUTypeIFrame {
|
||||
packet.Payload = Join(ps, packet.Payload)
|
||||
}
|
||||
|
||||
var clone rtp.Packet
|
||||
|
||||
if naluType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Payload = pps
|
||||
if err = push(&clone); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Payload = packet.Payload
|
||||
return push(&clone)
|
||||
return push(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,12 +46,12 @@ func SplitAVC(data []byte) [][]byte {
|
||||
var nals [][]byte
|
||||
for {
|
||||
// get AVC length
|
||||
size := int(binary.BigEndian.Uint32(data))
|
||||
size := int(binary.BigEndian.Uint32(data)) + 4
|
||||
|
||||
// check if multiple items in one packet
|
||||
if size+4 < len(data) {
|
||||
nals = append(nals, data[:size+4])
|
||||
data = data[size+4:]
|
||||
if size < len(data) {
|
||||
nals = append(nals, data[:size])
|
||||
data = data[size:]
|
||||
} else {
|
||||
nals = append(nals, data)
|
||||
break
|
||||
@@ -76,3 +59,18 @@ func SplitAVC(data []byte) [][]byte {
|
||||
}
|
||||
return nals
|
||||
}
|
||||
|
||||
func Types(data []byte) []byte {
|
||||
var types []byte
|
||||
for {
|
||||
types = append(types, NALUType(data))
|
||||
|
||||
size := 4 + int(binary.BigEndian.Uint32(data))
|
||||
if size < len(data) {
|
||||
data = data[size:]
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return types
|
||||
}
|
||||
|
@@ -2,24 +2,49 @@ package h264
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
NALUTypePFrame = 1
|
||||
NALUTypeIFrame = 5
|
||||
NALUTypeSEI = 6
|
||||
NALUTypeSPS = 7
|
||||
NALUTypePPS = 8
|
||||
NALUTypePFrame = 1 // Coded slice of a non-IDR picture
|
||||
NALUTypeIFrame = 5 // Coded slice of an IDR picture
|
||||
NALUTypeSEI = 6 // Supplemental enhancement information (SEI)
|
||||
NALUTypeSPS = 7 // Sequence parameter set
|
||||
NALUTypePPS = 8 // Picture parameter set
|
||||
NALUTypeAUD = 9 // Access unit delimiter
|
||||
)
|
||||
|
||||
func NALUType(b []byte) byte {
|
||||
return b[4] & 0x1F
|
||||
}
|
||||
|
||||
// IsKeyframe - check if any NALU in one AU is Keyframe
|
||||
func IsKeyframe(b []byte) bool {
|
||||
return NALUType(b) == NALUTypeIFrame
|
||||
for {
|
||||
switch NALUType(b) {
|
||||
case NALUTypePFrame:
|
||||
return false
|
||||
case NALUTypeIFrame:
|
||||
return true
|
||||
}
|
||||
|
||||
size := int(binary.BigEndian.Uint32(b)) + 4
|
||||
if size < len(b) {
|
||||
b = b[size:]
|
||||
continue
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Join(ps, iframe []byte) []byte {
|
||||
b := make([]byte, len(ps)+len(iframe))
|
||||
i := copy(b, ps)
|
||||
copy(b[i:], iframe)
|
||||
return b
|
||||
}
|
||||
|
||||
func GetProfileLevelID(fmtp string) string {
|
||||
|
172
pkg/h264/rtp.go
172
pkg/h264/rtp.go
@@ -13,97 +13,69 @@ func RTPDepay(track *streamer.Track) streamer.WrapperFunc {
|
||||
depack := &codecs.H264Packet{IsAVC: true}
|
||||
|
||||
sps, pps := GetParameterSet(track.Codec.FmtpLine)
|
||||
sps = EncodeAVC(sps)
|
||||
pps = EncodeAVC(pps)
|
||||
ps := EncodeAVC(sps, pps)
|
||||
|
||||
var buffer []byte
|
||||
buf := make([]byte, 0, 512*1024) // 512K
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
//nalUnitType := packet.Payload[0] & 0x1F
|
||||
//fmt.Printf(
|
||||
// "[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d\n",
|
||||
// track.Codec.Name, nalUnitType, len(packet.Payload), packet.Timestamp,
|
||||
// packet.PayloadType, packet.SSRC, packet.SequenceNumber,
|
||||
//)
|
||||
//log.Printf("[RTP] codec: %s, nalu: %2d, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, %v", track.Codec.Name, packet.Payload[0]&0x1F, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker)
|
||||
|
||||
data, err := depack.Unmarshal(packet.Payload)
|
||||
if len(data) == 0 || err != nil {
|
||||
payload, err := depack.Unmarshal(packet.Payload)
|
||||
if len(payload) == 0 || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
unitType := NALUType(data)
|
||||
//fmt.Printf("[H264] nalu: %2d, size: %6d\n", unitType, len(data))
|
||||
|
||||
// multiple 5 and 1 in one payload is OK
|
||||
if unitType != NALUTypeIFrame && unitType != NALUTypePFrame {
|
||||
i := int(binary.BigEndian.Uint32(data)) + 4
|
||||
if i < len(data) {
|
||||
data0 := data[:i] // NAL Unit with AVC header
|
||||
data = data[i:]
|
||||
switch unitType {
|
||||
case NALUTypeSPS:
|
||||
sps = data0
|
||||
continue
|
||||
case NALUTypePPS:
|
||||
pps = data0
|
||||
continue
|
||||
case NALUTypeSEI:
|
||||
// some unnecessary text information
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch unitType {
|
||||
case NALUTypeSPS:
|
||||
sps = data
|
||||
return nil
|
||||
case NALUTypePPS:
|
||||
pps = data
|
||||
return nil
|
||||
case NALUTypeSEI:
|
||||
// some unnecessary text information
|
||||
// Fix TP-Link Tapo TC70: sends SPS and PPS with packet.Marker = true
|
||||
if packet.Marker {
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeSPS, NALUTypePPS:
|
||||
buf = append(buf, payload...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ffmpeg with `-tune zerolatency` enable option `-x264opts sliced-threads=1`
|
||||
// and every NALU will be sliced to multiple NALUs
|
||||
if !packet.Marker {
|
||||
buffer = append(buffer, data...)
|
||||
return nil
|
||||
}
|
||||
|
||||
if buffer != nil {
|
||||
buffer = append(buffer, data...)
|
||||
data = buffer
|
||||
buffer = nil
|
||||
}
|
||||
|
||||
var clone rtp.Packet
|
||||
|
||||
if unitType == NALUTypeIFrame {
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = sps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = pps
|
||||
if err = push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
clone = *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = data
|
||||
return push(&clone)
|
||||
}
|
||||
|
||||
if len(buf) == 0 {
|
||||
// Amcrest IP4M-1051: 9, 7, 8, 6, 28...
|
||||
// Amcrest IP4M-1051: 9, 6, 1
|
||||
switch NALUType(payload) {
|
||||
case NALUTypeIFrame:
|
||||
// fix IFrame without SPS,PPS
|
||||
buf = append(buf, ps...)
|
||||
case NALUTypeSEI, NALUTypeAUD:
|
||||
// fix ffmpeg with transcoding first frame
|
||||
i := int(4 + binary.BigEndian.Uint32(payload))
|
||||
|
||||
// check if only one NAL (fix ffmpeg transcoding for Reolink RLC-510A)
|
||||
if i == len(payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload = payload[i:]
|
||||
|
||||
if NALUType(payload) == NALUTypeIFrame {
|
||||
buf = append(buf, ps...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// collect all NALs for Access Unit
|
||||
if !packet.Marker {
|
||||
buf = append(buf, payload...)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(buf) > 0 {
|
||||
payload = append(buf, payload...)
|
||||
buf = buf[:0]
|
||||
}
|
||||
|
||||
//log.Printf("[AVC] %v, len: %d", Types(payload), len(payload))
|
||||
|
||||
clone := *packet
|
||||
clone.Version = RTPPacketVersionAVC
|
||||
clone.Payload = payload
|
||||
return push(&clone)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,28 +87,28 @@ func RTPPay(mtu uint16) streamer.WrapperFunc {
|
||||
|
||||
return func(push streamer.WriterFunc) streamer.WriterFunc {
|
||||
return func(packet *rtp.Packet) error {
|
||||
if packet.Version == RTPPacketVersionAVC {
|
||||
payloads := payloader.Payload(mtu, packet.Payload)
|
||||
for i, payload := range payloads {
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: i == len(payloads)-1,
|
||||
//PayloadType: packet.PayloadType,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
//SSRC: packet.SSRC,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
if err := push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
if packet.Version != RTPPacketVersionAVC {
|
||||
return push(packet)
|
||||
}
|
||||
|
||||
return push(packet)
|
||||
payloads := payloader.Payload(mtu, packet.Payload)
|
||||
last := len(payloads) - 1
|
||||
for i, payload := range payloads {
|
||||
clone := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: i == last,
|
||||
SequenceNumber: sequencer.NextSequenceNumber(),
|
||||
Timestamp: packet.Timestamp,
|
||||
},
|
||||
Payload: payload,
|
||||
}
|
||||
if err := push(&clone); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
3
pkg/httpflv/README.md
Normal file
3
pkg/httpflv/README.md
Normal file
@@ -0,0 +1,3 @@
|
||||
## Useful links
|
||||
|
||||
- https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779
|
100
pkg/httpflv/httpflv.go
Normal file
100
pkg/httpflv/httpflv.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package httpflv
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/flv/flvio"
|
||||
"github.com/deepch/vdk/utils/bits/pio"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func Dial(uri string) (*Conn, error) {
|
||||
req, err := http.NewRequest("GET", uri, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := Conn{
|
||||
conn: res.Body,
|
||||
reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize),
|
||||
buf: make([]byte, 256),
|
||||
}
|
||||
|
||||
if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
flags, n, err := flvio.ParseFileHeader(c.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if flags&flvio.FILE_HAS_VIDEO == 0 {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
if _, err = c.reader.Discard(n); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
conn io.ReadCloser
|
||||
reader *bufio.Reader
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (c *Conn) Streams() ([]av.CodecData, error) {
|
||||
for {
|
||||
tag, _, err := flvio.ReadTag(c.reader, c.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR {
|
||||
continue
|
||||
}
|
||||
|
||||
stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []av.CodecData{stream}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) ReadPacket() (av.Packet, error) {
|
||||
for {
|
||||
tag, ts, err := flvio.ReadTag(c.reader, c.buf)
|
||||
if err != nil {
|
||||
return av.Packet{}, err
|
||||
}
|
||||
|
||||
if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU {
|
||||
continue
|
||||
}
|
||||
|
||||
return av.Packet{
|
||||
Idx: 0,
|
||||
Data: tag.Data,
|
||||
CompositionTime: flvio.TsToTime(tag.CompositionTime),
|
||||
IsKeyFrame: tag.FrameType == flvio.FRAME_KEY,
|
||||
Time: flvio.TsToTime(ts),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) Close() (err error) {
|
||||
return c.conn.Close()
|
||||
}
|
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
@@ -162,9 +161,12 @@ func (c *Client) getTracks() error {
|
||||
continue
|
||||
}
|
||||
|
||||
codec := streamer.NewCodec(streamer.CodecH264)
|
||||
codec.FmtpLine = "profile-level-id=" + msg.CodecString[i+1:]
|
||||
codec.PayloadType = h264.PayloadTypeAVC
|
||||
codec := &streamer.Codec{
|
||||
Name: streamer.CodecH264,
|
||||
ClockRate: 90000,
|
||||
FmtpLine: "profile-level-id=" + msg.CodecString[i+1:],
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
i = bytes.Index(msg.Data, []byte("avcC")) - 4
|
||||
if i < 0 {
|
||||
@@ -245,14 +247,12 @@ func (c *Client) worker() {
|
||||
time.Sleep(d)
|
||||
|
||||
// can be SPS, PPS and IFrame in one packet
|
||||
for _, payload := range h264.SplitAVC(data[:entry.Size]) {
|
||||
packet := &rtp.Packet{
|
||||
// ivideon clockrate=1000, RTP clockrate=90000
|
||||
Header: rtp.Header{Timestamp: ts * 90},
|
||||
Payload: payload,
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
packet := &rtp.Packet{
|
||||
// ivideon clockrate=1000, RTP clockrate=90000
|
||||
Header: rtp.Header{Timestamp: ts * 90},
|
||||
Payload: data[:entry.Size],
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
|
||||
data = data[entry.Size:]
|
||||
ts += entry.Duration
|
||||
|
@@ -61,8 +61,16 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
||||
lqt, cqt = MakeTables(q)
|
||||
}
|
||||
|
||||
// https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5
|
||||
// The maximum width is 2040 pixels.
|
||||
w := uint16(packet.Payload[6]) << 3
|
||||
h := uint16(packet.Payload[7]) << 3
|
||||
|
||||
// fix 2560x1920 and 2560x1440
|
||||
if w == 512 && (h == 1920 || h == 1440) {
|
||||
w = 2560
|
||||
}
|
||||
|
||||
//fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h)
|
||||
header = MakeHeaders(t, w, h, lqt, cqt)
|
||||
}
|
||||
|
@@ -3,6 +3,8 @@ package mp4
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/deepch/vdk/format/mp4/mp4io"
|
||||
"github.com/deepch/vdk/format/mp4f"
|
||||
"github.com/deepch/vdk/format/mp4f/mp4fio"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -37,25 +39,17 @@ func MOOV() *mp4io.Movie {
|
||||
SelectionDuration: time0,
|
||||
CurrentTime: time0,
|
||||
},
|
||||
MovieExtend: &mp4io.MovieExtend{
|
||||
Tracks: []*mp4io.TrackExtend{
|
||||
{
|
||||
TrackId: 1,
|
||||
DefaultSampleDescIdx: 1,
|
||||
DefaultSampleDuration: 40,
|
||||
},
|
||||
},
|
||||
},
|
||||
MovieExtend: &mp4io.MovieExtend{},
|
||||
}
|
||||
}
|
||||
|
||||
func TRAK() *mp4io.Track {
|
||||
func TRAK(id int) *mp4io.Track {
|
||||
return &mp4io.Track{
|
||||
// trak > tkhd
|
||||
Header: &mp4io.TrackHeader{
|
||||
TrackId: int32(1), // change me
|
||||
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
|
||||
Duration: 0, // OK
|
||||
TrackId: int32(id),
|
||||
Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW
|
||||
Duration: 0, // OK
|
||||
Matrix: matrix,
|
||||
CreateTime: time0,
|
||||
ModifyTime: time0,
|
||||
@@ -92,3 +86,15 @@ func TRAK() *mp4io.Track {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func ESDS(conf []byte) *mp4f.FDummy {
|
||||
esds := &mp4fio.ElemStreamDesc{DecConfig: conf}
|
||||
|
||||
b := make([]byte, esds.Len())
|
||||
esds.Marshal(b)
|
||||
|
||||
return &mp4f.FDummy{
|
||||
Data: b,
|
||||
Tag_: mp4io.Tag(uint32(mp4io.ESDS)),
|
||||
}
|
||||
}
|
||||
|
@@ -2,7 +2,7 @@ package mp4
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
@@ -28,90 +28,99 @@ func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
{Name: streamer.CodecH265, ClockRate: 90000},
|
||||
{Name: streamer.CodecH264},
|
||||
{Name: streamer.CodecH265},
|
||||
},
|
||||
},
|
||||
{
|
||||
Kind: streamer.KindAudio,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecAAC},
|
||||
},
|
||||
},
|
||||
//{
|
||||
// Kind: streamer.KindAudio,
|
||||
// Direction: streamer.DirectionRecvonly,
|
||||
// Codecs: []*streamer.Codec{
|
||||
// {Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
// },
|
||||
//},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
trackID := byte(len(c.codecs))
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
codec := track.Codec
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch h264.NALUType(packet.Payload) {
|
||||
case h264.NALUTypeIFrame:
|
||||
c.start = true
|
||||
case h264.NALUTypePFrame:
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecH265:
|
||||
c.codecs = append(c.codecs, track.Codec)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
if h265.IsKeyframe(packet.Payload) {
|
||||
c.start = true
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(packet)
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
var wrapper streamer.WrapperFunc
|
||||
if codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
}
|
||||
push = wrapper(push)
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecH265:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !codec.IsMP4() {
|
||||
wrapper := h265.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := c.muxer.Marshal(trackID, packet)
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !codec.IsMP4() {
|
||||
wrapper := aac.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
fmt.Printf("[rtmp] unsupported codec: %+v\n", track.Codec)
|
||||
|
||||
return nil
|
||||
panic("unsupported codec")
|
||||
}
|
||||
|
||||
func (c *Consumer) MimeType() string {
|
||||
@@ -119,12 +128,14 @@ func (c *Consumer) MimeType() string {
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() ([]byte, error) {
|
||||
if c.muxer == nil {
|
||||
c.muxer = &Muxer{}
|
||||
}
|
||||
c.muxer = &Muxer{}
|
||||
return c.muxer.GetInit(c.codecs)
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
c.start = true
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
|
85
pkg/mp4/keyframe.go
Normal file
85
pkg/mp4/keyframe.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type Keyframe struct {
|
||||
streamer.Element
|
||||
|
||||
MimeType string
|
||||
}
|
||||
|
||||
func (c *Keyframe) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264},
|
||||
{Name: streamer.CodecH265},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
muxer := &Muxer{}
|
||||
|
||||
codecs := []*streamer.Codec{track.Codec}
|
||||
|
||||
init, err := muxer.GetInit(codecs)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.MimeType = muxer.MimeType(codecs)
|
||||
|
||||
switch track.Codec.Name {
|
||||
case streamer.CodecH264:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !h264.IsKeyframe(packet.Payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := muxer.Marshal(0, packet)
|
||||
c.Fire(append(init, buf...))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var wrapper streamer.WrapperFunc
|
||||
if track.Codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
}
|
||||
push = wrapper(push)
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecH265:
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !h265.IsKeyframe(packet.Payload) {
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := muxer.Marshal(0, packet)
|
||||
c.Fire(append(init, buf...))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !track.Codec.IsMP4() {
|
||||
wrapper := h265.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
panic("unsupported codec")
|
||||
}
|
116
pkg/mp4/muxer.go
116
pkg/mp4/muxer.go
@@ -2,10 +2,13 @@ package mp4
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/codec/h265parser"
|
||||
"github.com/deepch/vdk/format/fmp4/fmp4io"
|
||||
@@ -16,22 +19,28 @@ import (
|
||||
|
||||
type Muxer struct {
|
||||
fragIndex uint32
|
||||
dts uint64
|
||||
pts uint32
|
||||
data []byte
|
||||
total int
|
||||
dts []uint64
|
||||
pts []uint32
|
||||
//data []byte
|
||||
//total int
|
||||
}
|
||||
|
||||
func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
|
||||
s := `video/mp4; codecs="`
|
||||
|
||||
for _, codec := range codecs {
|
||||
for i, codec := range codecs {
|
||||
if i > 0 {
|
||||
s += ","
|
||||
}
|
||||
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
s += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
|
||||
case streamer.CodecH265:
|
||||
// +Safari +Chrome +Edge -iOS15 -Android13
|
||||
s += "hvc1.1.6.L93.B0" // hev1.1.6.L93.B0
|
||||
case streamer.CodecAAC:
|
||||
s += "mp4a.40.2"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,15 +50,16 @@ func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
|
||||
func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
moov := MOOV()
|
||||
|
||||
for _, codec := range codecs {
|
||||
for i, codec := range codecs {
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
if sps == nil {
|
||||
return nil, fmt.Errorf("empty SPS: %#v", codec)
|
||||
// some dummy SPS and PPS not a problem
|
||||
sps = []byte{0x67, 0x42, 0x00, 0x0a, 0xf8, 0x41, 0xa2}
|
||||
pps = []byte{0x68, 0xce, 0x38, 0x80}
|
||||
}
|
||||
|
||||
// TODO: remove
|
||||
codecData, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -58,11 +68,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
width := codecData.Width()
|
||||
height := codecData.Height()
|
||||
|
||||
trak := TRAK()
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.TrackWidth = float64(width)
|
||||
trak.Header.TrackHeight = float64(height)
|
||||
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
|
||||
Flags: 0x000001,
|
||||
}
|
||||
@@ -80,11 +93,6 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
},
|
||||
}
|
||||
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
|
||||
case streamer.CodecH265:
|
||||
@@ -101,11 +109,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
width := codecData.Width()
|
||||
height := codecData.Height()
|
||||
|
||||
trak := TRAK()
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.TrackWidth = float64(width)
|
||||
trak.Header.TrackHeight = float64(height)
|
||||
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Video = &mp4io.VideoMediaInfo{
|
||||
Flags: 0x000001,
|
||||
}
|
||||
@@ -123,13 +134,52 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
s := streamer.Between(codec.FmtpLine, "config=", ";")
|
||||
b, err := hex.DecodeString(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
codecData, err := aacparser.ParseMPEG4AudioConfigBytes(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
trak := TRAK(i + 1)
|
||||
trak.Header.AlternateGroup = 1
|
||||
trak.Header.Duration = 0
|
||||
trak.Header.Volume = 1
|
||||
trak.Media.Header.TimeScale = int32(codec.ClockRate)
|
||||
|
||||
trak.Media.Handler = &mp4io.HandlerRefer{
|
||||
SubType: [4]byte{'v', 'i', 'd', 'e'},
|
||||
SubType: [4]byte{'s', 'o', 'u', 'n'},
|
||||
Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0},
|
||||
}
|
||||
trak.Media.Info.Sound = &mp4io.SoundMediaInfo{}
|
||||
|
||||
trak.Media.Info.Sample.SampleDesc.MP4ADesc = &mp4io.MP4ADesc{
|
||||
DataRefIdx: 1,
|
||||
NumberOfChannels: int16(codecData.ChannelLayout.Count()),
|
||||
SampleSize: int16(av.FLTP.BytesPerSample() * 4),
|
||||
SampleRate: float64(codecData.SampleRate),
|
||||
Unknowns: []mp4io.Atom{ESDS(b)},
|
||||
}
|
||||
|
||||
moov.Tracks = append(moov.Tracks, trak)
|
||||
}
|
||||
|
||||
trex := &mp4io.TrackExtend{
|
||||
TrackId: uint32(i + 1),
|
||||
DefaultSampleDescIdx: 1,
|
||||
DefaultSampleDuration: 0,
|
||||
}
|
||||
moov.MovieExtend.Tracks = append(moov.MovieExtend.Tracks, trex)
|
||||
|
||||
m.pts = append(m.pts, 0)
|
||||
m.dts = append(m.dts, 0)
|
||||
}
|
||||
|
||||
data := make([]byte, moov.Len())
|
||||
@@ -138,14 +188,12 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
|
||||
return append(FTYP(), data...), nil
|
||||
}
|
||||
|
||||
func (m *Muxer) Rewind() {
|
||||
m.dts = 0
|
||||
m.pts = 0
|
||||
}
|
||||
|
||||
func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
trackID := uint8(1)
|
||||
//func (m *Muxer) Rewind() {
|
||||
// m.dts = 0
|
||||
// m.pts = 0
|
||||
//}
|
||||
|
||||
func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte {
|
||||
run := &mp4fio.TrackFragRun{
|
||||
Flags: 0x000b05,
|
||||
FirstSampleFlags: uint32(fmp4io.SampleNoDependencies),
|
||||
@@ -160,12 +208,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
Tracks: []*mp4fio.TrackFrag{
|
||||
{
|
||||
Header: &mp4fio.TrackFragHeader{
|
||||
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID, 0x01, 0x01, 0x00, 0x00},
|
||||
Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID + 1, 0x01, 0x01, 0x00, 0x00},
|
||||
},
|
||||
DecodeTime: &mp4fio.TrackFragDecodeTime{
|
||||
Version: 1,
|
||||
Flags: 0,
|
||||
Time: m.dts,
|
||||
Time: m.dts[trackID],
|
||||
},
|
||||
Run: run,
|
||||
},
|
||||
@@ -178,12 +226,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
}
|
||||
|
||||
newTime := packet.Timestamp
|
||||
if m.pts > 0 {
|
||||
if m.pts[trackID] > 0 {
|
||||
//m.dts += uint64(newTime - m.pts)
|
||||
entry.Duration = newTime - m.pts
|
||||
m.dts += uint64(entry.Duration)
|
||||
entry.Duration = newTime - m.pts[trackID]
|
||||
m.dts[trackID] += uint64(entry.Duration)
|
||||
}
|
||||
m.pts = newTime
|
||||
m.pts[trackID] = newTime
|
||||
|
||||
// important before moof.Len()
|
||||
run.Entries = append(run.Entries, entry)
|
||||
@@ -203,7 +251,7 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte {
|
||||
|
||||
m.fragIndex++
|
||||
|
||||
m.total += moofLen + mdatLen
|
||||
//m.total += moofLen + mdatLen
|
||||
|
||||
return buf
|
||||
}
|
||||
|
164
pkg/mp4f/consumer.go
Normal file
164
pkg/mp4f/consumer.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package mp4f
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/mp4f"
|
||||
"github.com/pion/rtp"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
streamer.Element
|
||||
|
||||
UserAgent string
|
||||
RemoteAddr string
|
||||
|
||||
muxer *mp4f.Muxer
|
||||
streams []av.CodecData
|
||||
mimeType string
|
||||
start bool
|
||||
|
||||
send int
|
||||
}
|
||||
|
||||
func (c *Consumer) GetMedias() []*streamer.Media {
|
||||
return []*streamer.Media{
|
||||
{
|
||||
Kind: streamer.KindVideo,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecH264, ClockRate: 90000},
|
||||
},
|
||||
},
|
||||
{
|
||||
Kind: streamer.KindAudio,
|
||||
Direction: streamer.DirectionRecvonly,
|
||||
Codecs: []*streamer.Codec{
|
||||
{Name: streamer.CodecAAC, ClockRate: 16000},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
|
||||
codec := track.Codec
|
||||
trackID := int8(len(c.streams))
|
||||
|
||||
switch codec.Name {
|
||||
case streamer.CodecH264:
|
||||
sps, pps := h264.GetParameterSet(codec.FmtpLine)
|
||||
stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.mimeType += "avc1." + h264.GetProfileLevelID(codec.FmtpLine)
|
||||
c.streams = append(c.streams, stream)
|
||||
|
||||
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
|
||||
|
||||
ts2time := time.Second / time.Duration(codec.ClockRate)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if packet.Version != h264.RTPPacketVersionAVC {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
pkt.Data = packet.Payload
|
||||
newTime := time.Duration(packet.Timestamp) * ts2time
|
||||
if pkt.Time > 0 {
|
||||
pkt.Duration = newTime - pkt.Time
|
||||
}
|
||||
pkt.Time = newTime
|
||||
|
||||
ready, buf, _ := c.muxer.WritePacket(pkt, false)
|
||||
if ready {
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !h264.IsAVC(codec) {
|
||||
wrapper := h264.RTPDepay(track)
|
||||
push = wrapper(push)
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
||||
case streamer.CodecAAC:
|
||||
stream, _ := aacparser.NewCodecDataFromMPEG4AudioConfigBytes([]byte{20, 8})
|
||||
|
||||
c.mimeType += ",mp4a.40.2"
|
||||
c.streams = append(c.streams, stream)
|
||||
|
||||
pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond}
|
||||
|
||||
ts2time := time.Second / time.Duration(codec.ClockRate)
|
||||
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if !c.start {
|
||||
return nil
|
||||
}
|
||||
|
||||
pkt.Data = packet.Payload
|
||||
newTime := time.Duration(packet.Timestamp) * ts2time
|
||||
if pkt.Time > 0 {
|
||||
pkt.Duration = newTime - pkt.Time
|
||||
}
|
||||
pkt.Time = newTime
|
||||
|
||||
ready, buf, _ := c.muxer.WritePacket(pkt, false)
|
||||
if ready {
|
||||
c.send += len(buf)
|
||||
c.Fire(buf)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
}
|
||||
|
||||
panic("unsupported codec")
|
||||
}
|
||||
|
||||
func (c *Consumer) MimeType() string {
|
||||
return `video/mp4; codecs="` + c.mimeType + `"`
|
||||
}
|
||||
|
||||
func (c *Consumer) Init() ([]byte, error) {
|
||||
c.muxer = mp4f.NewMuxer(nil)
|
||||
if err := c.muxer.WriteHeader(c.streams); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, data := c.muxer.GetInit(c.streams)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
c.start = true
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func (c *Consumer) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
"type": "MSE server consumer",
|
||||
"send": c.send,
|
||||
"remote_addr": c.RemoteAddr,
|
||||
"user_agent": c.UserAgent,
|
||||
}
|
||||
|
||||
return json.Marshal(v)
|
||||
}
|
@@ -4,16 +4,24 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/httpflv"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/deepch/vdk/av"
|
||||
"github.com/deepch/vdk/codec/aacparser"
|
||||
"github.com/deepch/vdk/codec/h264parser"
|
||||
"github.com/deepch/vdk/format/rtmp"
|
||||
"github.com/pion/rtp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Conn for RTMP and RTMPT (flv over HTTP)
|
||||
type Conn interface {
|
||||
Streams() (streams []av.CodecData, err error)
|
||||
ReadPacket() (pkt av.Packet, err error)
|
||||
Close() (err error)
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
streamer.Element
|
||||
|
||||
@@ -22,7 +30,7 @@ type Client struct {
|
||||
medias []*streamer.Media
|
||||
tracks []*streamer.Track
|
||||
|
||||
conn *rtmp.Conn
|
||||
conn Conn
|
||||
closed bool
|
||||
|
||||
receive int
|
||||
@@ -33,7 +41,12 @@ func NewClient(uri string) *Client {
|
||||
}
|
||||
|
||||
func (c *Client) Dial() (err error) {
|
||||
c.conn, err = rtmp.Dial(c.URI)
|
||||
if strings.HasPrefix(c.URI, "http") {
|
||||
c.conn, err = httpflv.Dial(c.URI)
|
||||
} else {
|
||||
c.conn, err = rtmp.Dial(c.URI)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -47,16 +60,20 @@ func (c *Client) Dial() (err error) {
|
||||
for _, stream := range streams {
|
||||
switch stream.Type() {
|
||||
case av.H264:
|
||||
cd := stream.(h264parser.CodecData)
|
||||
fmtp := "sprop-parameter-sets=" +
|
||||
base64.StdEncoding.EncodeToString(cd.RecordInfo.SPS[0]) + "," +
|
||||
base64.StdEncoding.EncodeToString(cd.RecordInfo.PPS[0])
|
||||
info := stream.(h264parser.CodecData).RecordInfo
|
||||
|
||||
fmtp := fmt.Sprintf(
|
||||
"profile-level-id=%02X%02X%02X;sprop-parameter-sets=%s,%s",
|
||||
info.AVCProfileIndication, info.ProfileCompatibility, info.AVCLevelIndication,
|
||||
base64.StdEncoding.EncodeToString(info.SPS[0]),
|
||||
base64.StdEncoding.EncodeToString(info.PPS[0]),
|
||||
)
|
||||
|
||||
codec := &streamer.Codec{
|
||||
Name: streamer.CodecH264,
|
||||
ClockRate: 90000,
|
||||
FmtpLine: fmtp,
|
||||
PayloadType: h264.PayloadTypeAVC,
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
@@ -75,17 +92,13 @@ func (c *Client) Dial() (err error) {
|
||||
// TODO: fix support
|
||||
cd := stream.(aacparser.CodecData)
|
||||
|
||||
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
|
||||
fmtp := fmt.Sprintf(
|
||||
"config=%s",
|
||||
hex.EncodeToString(cd.ConfigBytes),
|
||||
)
|
||||
|
||||
codec := &streamer.Codec{
|
||||
Name: streamer.CodecAAC,
|
||||
ClockRate: uint32(cd.Config.SampleRate),
|
||||
Channels: uint16(cd.Config.ChannelConfig),
|
||||
FmtpLine: fmtp,
|
||||
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
|
||||
FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes),
|
||||
PayloadType: streamer.PayloadTypeMP4,
|
||||
}
|
||||
|
||||
media := &streamer.Media{
|
||||
@@ -129,22 +142,14 @@ func (c *Client) Handle() (err error) {
|
||||
|
||||
track := c.tracks[int(pkt.Idx)]
|
||||
|
||||
timestamp := uint32(pkt.Time / time.Duration(track.Codec.ClockRate))
|
||||
// convert seconds to RTP timestamp
|
||||
timestamp := uint32(pkt.Time * time.Duration(track.Codec.ClockRate) / time.Second)
|
||||
|
||||
var payloads [][]byte
|
||||
if track.Codec.Name == streamer.CodecH264 {
|
||||
payloads = h264.SplitAVC(pkt.Data)
|
||||
} else {
|
||||
payloads = [][]byte{pkt.Data}
|
||||
}
|
||||
|
||||
for _, payload := range payloads {
|
||||
packet := &rtp.Packet{
|
||||
Header: rtp.Header{Timestamp: timestamp},
|
||||
Payload: payload,
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
packet := &rtp.Packet{
|
||||
Header: rtp.Header{Timestamp: timestamp},
|
||||
Payload: pkt.Data,
|
||||
}
|
||||
_ = track.WriteRTP(packet)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) {
|
||||
v := map[string]interface{}{
|
||||
streamer.JSONReceive: c.receive,
|
||||
streamer.JSONType: "RTMP client producer",
|
||||
streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
|
||||
//streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(),
|
||||
"url": c.URI,
|
||||
}
|
||||
for i, media := range c.medias {
|
||||
|
126
pkg/rtsp/conn.go
126
pkg/rtsp/conn.go
@@ -7,6 +7,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
@@ -43,8 +44,6 @@ const (
|
||||
ModeServerConsumer
|
||||
)
|
||||
|
||||
const KeepAlive = time.Second * 25
|
||||
|
||||
type Conn struct {
|
||||
streamer.Element
|
||||
|
||||
@@ -60,6 +59,7 @@ type Conn struct {
|
||||
// internal
|
||||
|
||||
auth *tcp.Auth
|
||||
closed bool
|
||||
conn net.Conn
|
||||
mode Mode
|
||||
reader *bufio.Reader
|
||||
@@ -115,9 +115,7 @@ func (c *Conn) Dial() (err error) {
|
||||
_ = c.parseURI()
|
||||
}
|
||||
|
||||
c.conn, err = net.DialTimeout(
|
||||
"tcp", c.URL.Host, 10*time.Second,
|
||||
)
|
||||
c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -362,21 +360,25 @@ func (c *Conn) SetupMedia(
|
||||
var res *tcp.Response
|
||||
res, err = c.Do(req)
|
||||
if err != nil {
|
||||
// Dahua VTO2111D fail on this step because of backchannel
|
||||
// some Dahua/Amcrest cameras fail here because two simultaneous
|
||||
// backchannel connections
|
||||
if c.Backchannel {
|
||||
if err = c.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Backchannel = false
|
||||
if err = c.Describe(); err != nil {
|
||||
if err := c.Dial(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err = c.Do(req)
|
||||
if err := c.Describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, newMedia := range c.Medias {
|
||||
if newMedia.Control == media.Control {
|
||||
return c.SetupMedia(newMedia, newMedia.Codecs[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.Session == "" {
|
||||
@@ -392,12 +394,16 @@ func (c *Conn) SetupMedia(
|
||||
// we send our `interleaved`, but camera can answer with another
|
||||
|
||||
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
|
||||
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.123;source=192.168.10.12;interleaved=0
|
||||
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0
|
||||
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
|
||||
s := res.Header.Get("Transport")
|
||||
// TODO: rewrite
|
||||
if !strings.HasPrefix(s, "RTP/AVP/TCP;") {
|
||||
return nil, fmt.Errorf("wrong transport: %s", s)
|
||||
// Escam Q6 has a bug:
|
||||
// Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1
|
||||
if !strings.Contains(s, ";interleaved=") {
|
||||
return nil, fmt.Errorf("wrong transport: %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
i := strings.Index(s, "interleaved=")
|
||||
@@ -451,24 +457,19 @@ func (c *Conn) Teardown() (err error) {
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
if err := c.Teardown(); err != nil {
|
||||
return err
|
||||
}
|
||||
conn := c.conn
|
||||
c.conn = nil
|
||||
return conn.Close()
|
||||
c.closed = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
const transport = "RTP/AVP/TCP;unicast;interleaved="
|
||||
|
||||
func (c *Conn) Accept() error {
|
||||
//if c.state != StateServerInit {
|
||||
// panic("wrong state")
|
||||
//}
|
||||
|
||||
for {
|
||||
req, err := tcp.ReadRequest(c.reader)
|
||||
if err != nil {
|
||||
@@ -571,7 +572,7 @@ func (c *Conn) Accept() error {
|
||||
Request: req,
|
||||
}
|
||||
|
||||
if tr[:len(transport)] == transport {
|
||||
if strings.HasPrefix(tr, transport) {
|
||||
c.Session = "1" // TODO: fixme
|
||||
res.Header.Set("Transport", tr[:len(transport)+3])
|
||||
} else {
|
||||
@@ -594,16 +595,44 @@ func (c *Conn) Accept() error {
|
||||
|
||||
func (c *Conn) Handle() (err error) {
|
||||
defer func() {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
err = nil
|
||||
} else {
|
||||
// may have gotten here because of the deadline
|
||||
// so close the connection to stop keepalive
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
//c.Fire(streamer.StateNull)
|
||||
}()
|
||||
|
||||
//c.Fire(streamer.StatePlaying)
|
||||
ts := time.Now().Add(KeepAlive)
|
||||
var timeout time.Duration
|
||||
|
||||
switch c.mode {
|
||||
case ModeClientProducer:
|
||||
// polling frames from remote RTSP Server (ex Camera)
|
||||
timeout = time.Second * 5
|
||||
go c.keepalive()
|
||||
|
||||
case ModeServerProducer:
|
||||
// polling frames from remote RTSP Client (ex FFmpeg)
|
||||
timeout = time.Second * 15
|
||||
|
||||
case ModeServerConsumer:
|
||||
// pushing frames to remote RTSP Client (ex VLC)
|
||||
timeout = time.Second * 60
|
||||
|
||||
default:
|
||||
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
|
||||
}
|
||||
|
||||
for {
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// we can read:
|
||||
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
||||
// 2. RTSP response: RTSP/1.0 200 OK
|
||||
@@ -681,16 +710,19 @@ func (c *Conn) Handle() (err error) {
|
||||
|
||||
c.Fire(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// keep-alive
|
||||
now := time.Now()
|
||||
if now.After(ts) {
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
// don't need to wait respose on this request
|
||||
if err = c.Request(req); err != nil {
|
||||
return err
|
||||
}
|
||||
ts = now.Add(KeepAlive)
|
||||
func (c *Conn) keepalive() {
|
||||
// TODO: rewrite to RTCP
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
for {
|
||||
time.Sleep(time.Second * 25)
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
if err := c.Request(req); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -708,20 +740,16 @@ func (c *Conn) bindTrack(
|
||||
track *streamer.Track, channel uint8, payloadType uint8,
|
||||
) *streamer.Track {
|
||||
push := func(packet *rtp.Packet) error {
|
||||
if c.conn == nil {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
packet.Header.PayloadType = payloadType
|
||||
//packet.Header.PayloadType = 100
|
||||
//packet.Header.PayloadType = 8
|
||||
//packet.Header.PayloadType = 106
|
||||
|
||||
size := packet.MarshalSize()
|
||||
|
||||
data := make([]byte, 4+size)
|
||||
data[0] = '$'
|
||||
data[1] = channel
|
||||
//data[1] = 10
|
||||
binary.BigEndian.PutUint16(data[2:], uint16(size))
|
||||
|
||||
if _, err := packet.MarshalTo(data[4:]); err != nil {
|
||||
@@ -737,9 +765,15 @@ func (c *Conn) bindTrack(
|
||||
return nil
|
||||
}
|
||||
|
||||
if h264.IsAVC(track.Codec) {
|
||||
wrapper := h264.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
if track.Codec.IsMP4() {
|
||||
switch track.Codec.Name {
|
||||
case streamer.CodecH264:
|
||||
wrapper := h264.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
case streamer.CodecAAC:
|
||||
wrapper := aac.RTPPay(1500)
|
||||
push = wrapper(push)
|
||||
}
|
||||
}
|
||||
|
||||
return track.Bind(push)
|
||||
|
@@ -2,6 +2,7 @@ package rtsp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strconv"
|
||||
)
|
||||
@@ -27,13 +28,16 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
|
||||
}
|
||||
|
||||
func (c *Conn) Start() error {
|
||||
if c.mode == ModeServerProducer {
|
||||
return nil
|
||||
switch c.mode {
|
||||
case ModeClientProducer:
|
||||
if err := c.Play(); err != nil {
|
||||
return err
|
||||
}
|
||||
case ModeServerProducer:
|
||||
default:
|
||||
return fmt.Errorf("start wrong mode: %d", c.mode)
|
||||
}
|
||||
|
||||
if err := c.Play(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Handle()
|
||||
}
|
||||
|
||||
|
@@ -35,6 +35,8 @@ const (
|
||||
CodecMPA = "MPA" // payload: 14
|
||||
)
|
||||
|
||||
const PayloadTypeMP4 byte = 255
|
||||
|
||||
func GetKind(name string) string {
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
|
||||
@@ -75,13 +77,13 @@ func (m *Media) AV() bool {
|
||||
return m.Kind == KindVideo || m.Kind == KindAudio
|
||||
}
|
||||
|
||||
func (m *Media) MatchCodec(codec *Codec) bool {
|
||||
func (m *Media) MatchCodec(codec *Codec) *Codec {
|
||||
for _, c := range m.Codecs {
|
||||
if c.Match(codec) {
|
||||
return true
|
||||
return c
|
||||
}
|
||||
}
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Media) MatchMedia(media *Media) *Codec {
|
||||
@@ -127,22 +129,6 @@ type Codec struct {
|
||||
PayloadType uint8
|
||||
}
|
||||
|
||||
func NewCodec(name string) *Codec {
|
||||
name = strings.ToUpper(name)
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
|
||||
return &Codec{Name: name, ClockRate: 90000}
|
||||
case CodecPCMU, CodecPCMA:
|
||||
return &Codec{Name: name, ClockRate: 8000}
|
||||
case CodecOpus:
|
||||
return &Codec{Name: name, ClockRate: 48000, Channels: 2}
|
||||
case "MJPEG":
|
||||
return &Codec{Name: CodecJPEG, ClockRate: 90000}
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("unsupported codec: %s", name))
|
||||
}
|
||||
|
||||
func (c *Codec) String() string {
|
||||
s := fmt.Sprintf("%d %s/%d", c.PayloadType, c.Name, c.ClockRate)
|
||||
if c.Channels > 0 {
|
||||
@@ -151,6 +137,10 @@ func (c *Codec) String() string {
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *Codec) IsMP4() bool {
|
||||
return c.PayloadType == PayloadTypeMP4
|
||||
}
|
||||
|
||||
func (c *Codec) Clone() *Codec {
|
||||
clone := *c
|
||||
return &clone
|
||||
|
@@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc
|
||||
type Track struct {
|
||||
Codec *Codec
|
||||
Direction string
|
||||
Sink map[*Track]WriterFunc
|
||||
mx sync.Mutex
|
||||
sink map[*Track]WriterFunc
|
||||
sinkMu sync.RWMutex
|
||||
}
|
||||
|
||||
func (t *Track) String() string {
|
||||
s := t.Codec.String()
|
||||
s += fmt.Sprintf(", sinks=%d", len(t.Sink))
|
||||
s += fmt.Sprintf(", sinks=%d", len(t.sink))
|
||||
return s
|
||||
}
|
||||
|
||||
func (t *Track) WriteRTP(p *rtp.Packet) error {
|
||||
t.mx.Lock()
|
||||
for _, f := range t.Sink {
|
||||
t.sinkMu.RLock()
|
||||
for _, f := range t.sink {
|
||||
_ = f(p)
|
||||
}
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Track) Bind(w WriterFunc) *Track {
|
||||
t.mx.Lock()
|
||||
t.sinkMu.Lock()
|
||||
|
||||
if t.Sink == nil {
|
||||
t.Sink = map[*Track]WriterFunc{}
|
||||
if t.sink == nil {
|
||||
t.sink = map[*Track]WriterFunc{}
|
||||
}
|
||||
|
||||
clone := &Track{
|
||||
Codec: t.Codec, Direction: t.Direction, Sink: t.Sink,
|
||||
Codec: t.Codec, Direction: t.Direction, sink: t.sink,
|
||||
}
|
||||
t.Sink[clone] = w
|
||||
t.sink[clone] = w
|
||||
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.Unlock()
|
||||
|
||||
return clone
|
||||
}
|
||||
|
||||
func (t *Track) Unbind() {
|
||||
t.mx.Lock()
|
||||
delete(t.Sink, t)
|
||||
t.mx.Unlock()
|
||||
t.sinkMu.Lock()
|
||||
delete(t.sink, t)
|
||||
t.sinkMu.Unlock()
|
||||
}
|
||||
|
||||
func (t *Track) GetSink(from *Track) {
|
||||
t.sink = from.sink
|
||||
}
|
||||
|
||||
func (t *Track) HasSink() bool {
|
||||
t.sinkMu.RLock()
|
||||
defer t.sinkMu.RUnlock()
|
||||
return len(t.sink) > 0
|
||||
}
|
||||
|
@@ -57,7 +57,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.
|
||||
wrapper := h264.RTPPay(1200)
|
||||
push = wrapper(push)
|
||||
|
||||
if h264.IsAVC(codec) {
|
||||
if codec.IsMP4() {
|
||||
wrapper = h264.RepairAVC(track)
|
||||
} else {
|
||||
wrapper = h264.RTPDepay(track)
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewCandidate(address string) (string, error) {
|
||||
@@ -38,7 +39,7 @@ func NewCandidate(address string) (string, error) {
|
||||
|
||||
func LookupIP(address string) (string, error) {
|
||||
if strings.HasPrefix(address, "stun:") {
|
||||
ip, err := GetPublicIP()
|
||||
ip, err := GetCachedPublicIP()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -63,11 +64,20 @@ func LookupIP(address string) (string, error) {
|
||||
|
||||
// GetPublicIP example from https://github.com/pion/stun
|
||||
func GetPublicIP() (net.IP, error) {
|
||||
c, err := stun.Dial("udp", "stun.l.google.com:19302")
|
||||
conn, err := net.Dial("udp", "stun.l.google.com:19302")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := stun.NewClient(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = conn.SetDeadline(time.Now().Add(time.Second * 3)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res stun.Event
|
||||
|
||||
message := stun.MustBuild(stun.TransactionID, stun.BindingRequest)
|
||||
@@ -90,6 +100,24 @@ func GetPublicIP() (net.IP, error) {
|
||||
return xorAddr.IP, nil
|
||||
}
|
||||
|
||||
var cachedIP net.IP
|
||||
var cachedTS time.Time
|
||||
|
||||
func GetCachedPublicIP() (net.IP, error) {
|
||||
now := time.Now()
|
||||
if now.After(cachedTS) {
|
||||
newIP, err := GetPublicIP()
|
||||
if err == nil {
|
||||
cachedIP = newIP
|
||||
cachedTS = now.Add(time.Minute * 5)
|
||||
} else if cachedIP == nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return cachedIP, nil
|
||||
}
|
||||
|
||||
func IsIP(host string) bool {
|
||||
for _, i := range host {
|
||||
if i >= 'A' {
|
||||
|
@@ -53,3 +53,5 @@ pc.ontrack = ev => {
|
||||
- https://www.webrtc-experiment.com/DetectRTC/
|
||||
- https://divtable.com/table-styler/
|
||||
- https://www.chromium.org/audio-video/
|
||||
- https://web.dev/i18n/en/fast-playback-with-preload/#manual_buffering
|
||||
- https://developer.mozilla.org/en-US/docs/Web/API/Media_Source_Extensions_API
|
||||
|
@@ -70,6 +70,7 @@
|
||||
'<a href="api/stream.mp4?src={name}">mp4</a>',
|
||||
'<a href="api/frame.mp4?src={name}">frame</a>',
|
||||
`<a href="rtsp://${location.hostname}:8554/{name}">rtsp</a>`,
|
||||
'<a href="api/stream.mjpeg?src={name}">mjpeg</a>',
|
||||
'<a href="api/streams?src={name}">info</a>',
|
||||
];
|
||||
|
||||
|
122
www/mse.html
122
www/mse.html
@@ -25,93 +25,75 @@
|
||||
<!-- muted is important for autoplay -->
|
||||
<video id="video" autoplay controls playsinline muted></video>
|
||||
<script>
|
||||
const video = document.querySelector('#video');
|
||||
|
||||
// support api_path
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.binaryType = "arraybuffer";
|
||||
const video = document.querySelector('#video');
|
||||
|
||||
let mediaSource;
|
||||
function init() {
|
||||
let mediaSource, sourceBuffer, queueBuffer = [];
|
||||
|
||||
ws.onopen = () => {
|
||||
console.log("Start WS");
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.binaryType = "arraybuffer";
|
||||
|
||||
// https://web.dev/i18n/en/fast-playback-with-preload/#manual_buffering
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/Media_Source_Extensions_API
|
||||
mediaSource = new MediaSource();
|
||||
video.src = URL.createObjectURL(mediaSource);
|
||||
mediaSource.onsourceopen = () => {
|
||||
console.debug("mediaSource.onsourceopen");
|
||||
|
||||
mediaSource.onsourceopen = null;
|
||||
URL.revokeObjectURL(video.src);
|
||||
ws.send(JSON.stringify({"type": "mse"}));
|
||||
ws.onopen = () => {
|
||||
mediaSource = new MediaSource();
|
||||
video.src = URL.createObjectURL(mediaSource);
|
||||
mediaSource.onsourceopen = () => {
|
||||
mediaSource.onsourceopen = null;
|
||||
URL.revokeObjectURL(video.src);
|
||||
ws.send(JSON.stringify({"type": "mse"}));
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
let sourceBuffer, queueBuffer = [];
|
||||
ws.onmessage = ev => {
|
||||
if (typeof ev.data === 'string') {
|
||||
const data = JSON.parse(ev.data);
|
||||
console.debug("ws.onmessage", data);
|
||||
|
||||
ws.onmessage = ev => {
|
||||
if (typeof ev.data === 'string') {
|
||||
const data = JSON.parse(ev.data);
|
||||
console.debug("ws.onmessage", data);
|
||||
|
||||
if (data.type === "mse") {
|
||||
sourceBuffer = mediaSource.addSourceBuffer(data.value);
|
||||
// important: segments supports TrackFragDecodeTime
|
||||
// sequence supports only TrackFragRunEntry Duration
|
||||
sourceBuffer.mode = "segments";
|
||||
sourceBuffer.onupdateend = () => {
|
||||
if (!sourceBuffer.updating && queueBuffer.length > 0) {
|
||||
sourceBuffer.appendBuffer(queueBuffer.shift());
|
||||
if (data.type === "mse") {
|
||||
sourceBuffer = mediaSource.addSourceBuffer(data.value);
|
||||
sourceBuffer.mode = "segments"; // segments or sequence
|
||||
sourceBuffer.onupdateend = () => {
|
||||
if (!sourceBuffer.updating && queueBuffer.length > 0) {
|
||||
try {
|
||||
sourceBuffer.appendBuffer(queueBuffer.shift());
|
||||
} catch (e) {
|
||||
// console.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (sourceBuffer.updating) {
|
||||
queueBuffer.push(ev.data)
|
||||
} else if (sourceBuffer.updating || queueBuffer.length > 0) {
|
||||
queueBuffer.push(ev.data);
|
||||
} else {
|
||||
sourceBuffer.appendBuffer(ev.data);
|
||||
try {
|
||||
sourceBuffer.appendBuffer(ev.data);
|
||||
} catch (e) {
|
||||
// console.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (video.seekable.length > 0) {
|
||||
const delay = video.seekable.end(video.seekable.length - 1) - video.currentTime;
|
||||
if (delay < 1) {
|
||||
video.playbackRate = 1;
|
||||
} else if (delay > 10) {
|
||||
video.playbackRate = 10;
|
||||
} else if (delay > 2) {
|
||||
video.playbackRate = Math.floor(delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
video.onpause = () => {
|
||||
ws.close();
|
||||
setTimeout(init, 0);
|
||||
}
|
||||
}
|
||||
|
||||
let offsetTime = 1, noWaiting = 0;
|
||||
|
||||
setInterval(() => {
|
||||
if (video.paused || video.seekable.length === 0) return;
|
||||
|
||||
if (noWaiting < 0) {
|
||||
offsetTime = Math.min(offsetTime * 1.1, 5);
|
||||
console.debug("offset time up:", offsetTime);
|
||||
} else if (noWaiting >= 30) {
|
||||
noWaiting = 0;
|
||||
offsetTime = Math.max(offsetTime * 0.9, 0.5);
|
||||
console.debug("offset time down:", offsetTime);
|
||||
}
|
||||
noWaiting += 1;
|
||||
|
||||
const endTime = video.seekable.end(video.seekable.length - 1);
|
||||
let playbackRate = (endTime - video.currentTime) / offsetTime;
|
||||
if (playbackRate < 0.1) {
|
||||
// video.currentTime = endTime - offsetTime;
|
||||
playbackRate = 0.1;
|
||||
} else if (playbackRate > 10) {
|
||||
// video.currentTime = endTime - offsetTime;
|
||||
playbackRate = 10;
|
||||
}
|
||||
// https://github.com/GoogleChrome/developer.chrome.com/issues/135
|
||||
video.playbackRate = playbackRate;
|
||||
}, 1000);
|
||||
|
||||
video.onwaiting = () => {
|
||||
const endTime = video.seekable.end(video.seekable.length - 1);
|
||||
video.currentTime = endTime - offsetTime;
|
||||
noWaiting = -1;
|
||||
}
|
||||
init();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
@@ -25,12 +25,12 @@
|
||||
<body>
|
||||
<video id="video" autoplay controls playsinline muted></video>
|
||||
<script>
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
|
||||
function init(stream) {
|
||||
// support api_path
|
||||
const baseUrl = location.origin + location.pathname.substr(
|
||||
0, location.pathname.lastIndexOf("/")
|
||||
);
|
||||
|
||||
const ws = new WebSocket(`ws${baseUrl.substr(4)}/api/ws${location.search}`);
|
||||
ws.onopen = () => {
|
||||
console.debug('ws.onopen');
|
||||
@@ -51,11 +51,6 @@
|
||||
pc.addIceCandidate({candidate: msg.value, sdpMid: ''});
|
||||
} else if (msg.type === 'webrtc/answer') {
|
||||
pc.setRemoteDescription({type: 'answer', sdp: msg.value});
|
||||
pc.getTransceivers().forEach(t => {
|
||||
if (t.receiver.track.kind === 'audio') {
|
||||
t.currentDirection
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user