mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2025-10-05 00:12:48 +08:00
Compare commits
45 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
79f1dcfea3 | ||
![]() |
3feaf852af | ||
![]() |
76ec70d2a0 | ||
![]() |
6cef5faf27 | ||
![]() |
edb4e6eaad | ||
![]() |
116319f876 | ||
![]() |
a0e6005598 | ||
![]() |
fd580b6f2c | ||
![]() |
1837e7c86c | ||
![]() |
235f2fde0d | ||
![]() |
35087e0812 | ||
![]() |
da08d8e973 | ||
![]() |
757091e43d | ||
![]() |
a5c4854aeb | ||
![]() |
4b4deaaaf2 | ||
![]() |
553f5ff0d8 | ||
![]() |
25dc3664fd | ||
![]() |
8dd9991268 | ||
![]() |
d633d331bb | ||
![]() |
7d3fbf2ee0 | ||
![]() |
c44aaebd65 | ||
![]() |
d6259fc0e9 | ||
![]() |
5c657d557a | ||
![]() |
93be5cd92f | ||
![]() |
cf6a35d0c7 | ||
![]() |
af79e6054b | ||
![]() |
9f3d5e7460 | ||
![]() |
abbf180b1b | ||
![]() |
696588e52e | ||
![]() |
3e97ce8b2a | ||
![]() |
722b2827a1 | ||
![]() |
69598b508c | ||
![]() |
f49fcc4f68 | ||
![]() |
59347a409e | ||
![]() |
45b25d29b7 | ||
![]() |
49e861d1b0 | ||
![]() |
b1701e856a | ||
![]() |
a6260d0f56 | ||
![]() |
693d41be87 | ||
![]() |
222dc6a5c2 | ||
![]() |
8fde2b6fe5 | ||
![]() |
15e205cc01 | ||
![]() |
1db9ed4946 | ||
![]() |
fd83d151d2 | ||
![]() |
a36359f3dd |
9
.github/workflows/docker.yml
vendored
9
.github/workflows/docker.yml
vendored
@@ -52,7 +52,7 @@ jobs:
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
platforms: |
|
||||
@@ -63,9 +63,10 @@ jobs:
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
- name: Build and push Hardware
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
file: hardware.Dockerfile
|
||||
@@ -73,3 +74,5 @@ jobs:
|
||||
push: ${{ github.event_name != 'pull_request' }}
|
||||
tags: ${{ steps.meta-hw.outputs.tags }}
|
||||
labels: ${{ steps.meta-hw.outputs.labels }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
2
.github/workflows/gh-pages.yml
vendored
2
.github/workflows/gh-pages.yml
vendored
@@ -34,4 +34,4 @@ jobs:
|
||||
path: './website'
|
||||
- name: Deploy to GitHub Pages
|
||||
id: deployment
|
||||
uses: actions/deploy-pages@v1
|
||||
uses: actions/deploy-pages@v2
|
||||
|
6
.github/workflows/test.yml
vendored
6
.github/workflows/test.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.19'
|
||||
go-version: '1.20'
|
||||
|
||||
- name: Build Go binary
|
||||
run: go build -ldflags "-s -w" -trimpath -o ./go2rtc
|
||||
@@ -76,7 +76,7 @@ jobs:
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
platforms: linux/${{ matrix.platform }}
|
||||
@@ -89,7 +89,7 @@ jobs:
|
||||
|
||||
- name: Build and push Hardware
|
||||
if: matrix.platform == 'amd64'
|
||||
uses: docker/build-push-action@v3
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
file: hardware.Dockerfile
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
# 0. Prepare images
|
||||
ARG PYTHON_VERSION="3.11"
|
||||
ARG GO_VERSION="1.19"
|
||||
ARG GO_VERSION="1.20"
|
||||
ARG NGROK_VERSION="3"
|
||||
|
||||
FROM python:${PYTHON_VERSION}-alpine AS base
|
||||
|
@@ -121,6 +121,7 @@ func middlewareCORS(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Authorization")
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
@@ -16,7 +16,7 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
var Version = "1.3.1"
|
||||
var Version = "1.3.2"
|
||||
var UserAgent = "go2rtc/" + Version
|
||||
|
||||
var ConfigPath string
|
||||
|
@@ -12,8 +12,10 @@ import (
|
||||
const deviceInputPrefix = "-f v4l2"
|
||||
|
||||
func deviceInputSuffix(videoIdx, audioIdx int) string {
|
||||
video := findMedia(core.KindVideo, videoIdx)
|
||||
if video := findMedia(core.KindVideo, videoIdx); video != nil {
|
||||
return video.ID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func loadMedias() {
|
||||
|
@@ -51,7 +51,7 @@ var defaults = map[string]string{
|
||||
"rtsp/udp": "-fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -i {input}",
|
||||
|
||||
// output
|
||||
"output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}",
|
||||
"output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -bufsize 8192k -f rtsp {output}",
|
||||
|
||||
// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
|
||||
// `-tune zerolatency` - for minimal latency
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/webrtc"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -25,6 +24,7 @@ func initAPI() {
|
||||
|
||||
api.HandleFunc("/streams", ok)
|
||||
|
||||
// api from RTSPtoWeb
|
||||
api.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
// /stream/{id}/add
|
||||
@@ -40,13 +40,7 @@ func initAPI() {
|
||||
// 3. dynamic link to Hass camera
|
||||
stream := streams.Get(v.Name)
|
||||
if stream == nil {
|
||||
// check if it is rtsp link to go2rtc
|
||||
stream = rtspStream(v.Channels.First.Url)
|
||||
if stream != nil {
|
||||
streams.New(v.Name, stream)
|
||||
} else {
|
||||
stream = streams.New(v.Name, "{input}")
|
||||
}
|
||||
stream = streams.NewTemplate(v.Name, v.Channels.First.Url)
|
||||
}
|
||||
|
||||
stream.SetSource(v.Channels.First.Url)
|
||||
@@ -90,48 +84,6 @@ func initAPI() {
|
||||
_, _ = w.Write([]byte(s))
|
||||
}
|
||||
})
|
||||
|
||||
// api from RTSPtoWebRTC
|
||||
api.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
str := r.FormValue("sdp64")
|
||||
offer, err := base64.StdEncoding.DecodeString(str)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
src := r.FormValue("url")
|
||||
src, err = url.QueryUnescape(src)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
stream := streams.Get(src)
|
||||
if stream == nil {
|
||||
if stream = rtspStream(src); stream != nil {
|
||||
streams.New(src, stream)
|
||||
} else {
|
||||
stream = streams.New(src, src)
|
||||
}
|
||||
}
|
||||
|
||||
str, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
v := struct {
|
||||
Answer string `json:"sdp64"`
|
||||
}{
|
||||
Answer: base64.StdEncoding.EncodeToString([]byte(str)),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(v)
|
||||
})
|
||||
}
|
||||
|
||||
func HassioAddr() string {
|
||||
@@ -153,15 +105,6 @@ func HassioAddr() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func rtspStream(url string) *streams.Stream {
|
||||
if strings.HasPrefix(url, "rtsp://") {
|
||||
if i := strings.IndexByte(url[7:], '/'); i > 0 {
|
||||
return streams.Get(url[8+i:])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type addJSON struct {
|
||||
Name string `json:"name"`
|
||||
Channels struct {
|
||||
|
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"github.com/rs/zerolog/log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -29,6 +28,7 @@ func Init() {
|
||||
|
||||
type Consumer interface {
|
||||
core.Consumer
|
||||
Listen(f core.EventFunc)
|
||||
Init() ([]byte, error)
|
||||
MimeCodecs() string
|
||||
Start()
|
||||
@@ -84,7 +84,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
session := &Session{cons: cons}
|
||||
|
||||
cons.(any).(*core.Listener).Listen(func(msg any) {
|
||||
cons.Listen(func(msg any) {
|
||||
if data, ok := msg.([]byte); ok {
|
||||
session.mu.Lock()
|
||||
session.segment = append(session.segment, data...)
|
||||
@@ -104,7 +104,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
cons.Start()
|
||||
|
||||
sid := strconv.FormatInt(time.Now().UnixNano(), 10)
|
||||
sid := core.RandString(8, 62)
|
||||
|
||||
// two segments important for Chromecast
|
||||
if medias != nil {
|
||||
|
@@ -1,6 +1,12 @@
|
||||
package mp4
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/cmd/api"
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
@@ -8,10 +14,6 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/mp4"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"github.com/rs/zerolog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -44,12 +46,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan []byte)
|
||||
exit := make(chan []byte, 1)
|
||||
|
||||
cons := &mp4.Segment{OnlyKeyframe: true}
|
||||
cons.Listen(func(msg any) {
|
||||
if data, ok := msg.([]byte); ok && exit != nil {
|
||||
exit <- data
|
||||
select {
|
||||
case exit <- data:
|
||||
default:
|
||||
}
|
||||
exit = nil
|
||||
}
|
||||
})
|
||||
@@ -75,18 +80,13 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
|
||||
func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
log.Trace().Msgf("[mp4] %s %+v", r.Method, r.Header)
|
||||
|
||||
// Chrome has Safari in UA, so check first Chrome and later Safari
|
||||
query := r.URL.Query()
|
||||
|
||||
ua := r.UserAgent()
|
||||
if strings.Contains(ua, " Chrome/") {
|
||||
if r.Header.Values("Range") == nil {
|
||||
w.Header().Set("Content-Type", "video/mp4")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
} else if strings.Contains(ua, " Safari/") {
|
||||
if strings.Contains(ua, " Safari/") && !strings.Contains(ua, " Chrome/") && !query.Has("duration") {
|
||||
// auto redirect to HLS/fMP4 format, because Safari not support MP4 stream
|
||||
url := "stream.m3u8?" + r.URL.RawQuery
|
||||
if !r.URL.Query().Has("mp4") {
|
||||
if !query.Has("mp4") {
|
||||
url += "&mp4"
|
||||
}
|
||||
|
||||
@@ -94,14 +94,14 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
src := r.URL.Query().Get("src")
|
||||
src := query.Get("src")
|
||||
stream := streams.GetOrNew(src)
|
||||
if stream == nil {
|
||||
http.Error(w, api.StreamNotFound, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
exit := make(chan error)
|
||||
exit := make(chan error, 1) // Add buffer to prevent blocking
|
||||
|
||||
cons := &mp4.Consumer{
|
||||
RemoteAddr: tcp.RemoteAddr(r),
|
||||
@@ -109,10 +109,18 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
Medias: core.ParseQuery(r.URL.Query()),
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
cons.Listen(func(msg any) {
|
||||
if data, ok := msg.([]byte); ok {
|
||||
if _, err := w.Write(data); err != nil && exit != nil {
|
||||
exit <- err
|
||||
mu.Lock()
|
||||
_, err := w.Write(data)
|
||||
mu.Unlock()
|
||||
|
||||
if err != nil && exit != nil {
|
||||
select {
|
||||
case exit <- err:
|
||||
default:
|
||||
}
|
||||
exit = nil
|
||||
}
|
||||
}
|
||||
@@ -141,7 +149,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
|
||||
cons.Start()
|
||||
|
||||
var duration *time.Timer
|
||||
if s := r.URL.Query().Get("duration"); s != "" {
|
||||
if s := query.Get("duration"); s != "" {
|
||||
if i, _ := strconv.Atoi(s); i > 0 {
|
||||
duration = time.AfterFunc(time.Second*time.Duration(i), func() {
|
||||
if exit != nil {
|
||||
|
@@ -1,6 +1,11 @@
|
||||
package rtsp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/cmd/app"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
@@ -8,9 +13,6 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/rtsp"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"github.com/rs/zerolog"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -123,6 +125,7 @@ func rtspHandler(url string) (core.Producer, error) {
|
||||
if !backchannel {
|
||||
return nil, err
|
||||
}
|
||||
log.Trace().Msgf("[rtsp] describe (backchannel=%t) err: %v", backchannel, err)
|
||||
|
||||
// second try without backchannel, we need to reconnect
|
||||
conn.Backchannel = false
|
||||
@@ -211,7 +214,9 @@ func tcpHandler(conn *rtsp.Conn) {
|
||||
})
|
||||
|
||||
if err := conn.Accept(); err != nil {
|
||||
if err != io.EOF {
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
}
|
||||
if closer != nil {
|
||||
closer()
|
||||
}
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/app/store"
|
||||
"github.com/rs/zerolog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
@@ -39,6 +40,20 @@ func New(name string, source any) *Stream {
|
||||
return stream
|
||||
}
|
||||
|
||||
func NewTemplate(name string, source any) *Stream {
|
||||
// check if source links to some stream name from go2rtc
|
||||
if rawURL, ok := source.(string); ok {
|
||||
if u, err := url.Parse(rawURL); err == nil && u.Scheme == "rtsp" {
|
||||
if stream, ok := streams[u.Path[1:]]; ok {
|
||||
streams[name] = stream
|
||||
return stream
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return New(name, "{input}")
|
||||
}
|
||||
|
||||
func GetOrNew(src string) *Stream {
|
||||
if stream, ok := streams[src]; ok {
|
||||
return stream
|
||||
@@ -85,11 +100,12 @@ func streamsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if stream := Get(name); stream != nil {
|
||||
stream.SetSource(src)
|
||||
} else {
|
||||
New(name, src)
|
||||
// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
|
||||
stream := Get(name)
|
||||
if stream == nil {
|
||||
stream = NewTemplate(name, src)
|
||||
}
|
||||
stream.SetSource(src)
|
||||
|
||||
case "POST":
|
||||
// with dst - redirect source to dst
|
||||
|
@@ -30,8 +30,6 @@ type Producer struct {
|
||||
receivers []*core.Receiver
|
||||
senders []*core.Receiver
|
||||
|
||||
lastErr error
|
||||
|
||||
state state
|
||||
mu sync.Mutex
|
||||
workerID int
|
||||
|
@@ -3,7 +3,6 @@ package streams
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -31,8 +30,6 @@ func NewStream(source any) *Stream {
|
||||
s.producers = append(s.producers, prod)
|
||||
}
|
||||
return s
|
||||
case *Stream:
|
||||
return source
|
||||
case map[string]any:
|
||||
return NewStream(source["url"])
|
||||
case nil:
|
||||
@@ -50,24 +47,28 @@ func (s *Stream) SetSource(source string) {
|
||||
|
||||
func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
// support for multiple simultaneous requests from different consumers
|
||||
atomic.AddInt32(&s.requests, 1)
|
||||
consN := atomic.AddInt32(&s.requests, 1) - 1
|
||||
|
||||
var producers []*Producer // matched producers for consumer
|
||||
|
||||
var codecs string
|
||||
var statErrors []error
|
||||
var statMedias []*core.Media
|
||||
var statProds []*Producer // matched producers for consumer
|
||||
|
||||
// Step 1. Get consumer medias
|
||||
for _, consMedia := range cons.GetMedias() {
|
||||
log.Trace().Msgf("[streams] check cons=%d media=%s", consN, consMedia)
|
||||
|
||||
producers:
|
||||
for _, prod := range s.producers {
|
||||
for prodN, prod := range s.producers {
|
||||
if err = prod.Dial(); err != nil {
|
||||
log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url)
|
||||
statErrors = append(statErrors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Step 2. Get producer medias (not tracks yet)
|
||||
for _, prodMedia := range prod.GetMedias() {
|
||||
collectCodecs(prodMedia, &codecs)
|
||||
log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia)
|
||||
statMedias = append(statMedias, prodMedia)
|
||||
|
||||
// Step 3. Match consumer/producer codecs list
|
||||
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
|
||||
@@ -79,6 +80,8 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
|
||||
switch prodMedia.Direction {
|
||||
case core.DirectionRecvonly:
|
||||
log.Trace().Msgf("[streams] match prod=%d => cons=%d", prodN, consN)
|
||||
|
||||
// Step 4. Get recvonly track from producer
|
||||
if track, err = prod.GetTrack(prodMedia, prodCodec); err != nil {
|
||||
log.Info().Err(err).Msg("[streams] can't get track")
|
||||
@@ -91,6 +94,8 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
}
|
||||
|
||||
case core.DirectionSendonly:
|
||||
log.Trace().Msgf("[streams] match cons=%d => prod=%d", consN, prodN)
|
||||
|
||||
// Step 4. Get recvonly track from consumer (backchannel)
|
||||
if track, err = cons.(core.Producer).GetTrack(consMedia, consCodec); err != nil {
|
||||
log.Info().Err(err).Msg("[streams] can't get track")
|
||||
@@ -103,7 +108,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
producers = append(producers, prod)
|
||||
statProds = append(statProds, prod)
|
||||
|
||||
if !consMedia.MatchAll() {
|
||||
break producers
|
||||
@@ -117,18 +122,8 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
s.stopProducers()
|
||||
}
|
||||
|
||||
if len(producers) == 0 {
|
||||
if len(codecs) > 0 {
|
||||
return errors.New("codecs not match: " + codecs)
|
||||
}
|
||||
|
||||
for i, producer := range s.producers {
|
||||
if producer.lastErr != nil {
|
||||
return fmt.Errorf("source %d error: %w", i, producer.lastErr)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("sources unavailable: %d", len(s.producers))
|
||||
if len(statProds) == 0 {
|
||||
return formatError(statMedias, statErrors)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
@@ -136,7 +131,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
|
||||
s.mu.Unlock()
|
||||
|
||||
// there may be duplicates, but that's not a problem
|
||||
for _, prod := range producers {
|
||||
for _, prod := range statProds {
|
||||
prod.start()
|
||||
}
|
||||
|
||||
@@ -185,6 +180,11 @@ producers:
|
||||
continue producers
|
||||
}
|
||||
}
|
||||
for _, track := range producer.senders {
|
||||
if len(track.Senders()) > 0 {
|
||||
continue producers
|
||||
}
|
||||
}
|
||||
producer.stop()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
@@ -208,9 +208,12 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(info)
|
||||
}
|
||||
|
||||
func collectCodecs(media *core.Media, codecs *string) {
|
||||
func formatError(statMedias []*core.Media, statErrors []error) error {
|
||||
var text string
|
||||
|
||||
for _, media := range statMedias {
|
||||
if media.Direction == core.DirectionRecvonly {
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
for _, codec := range media.Codecs {
|
||||
@@ -218,12 +221,34 @@ func collectCodecs(media *core.Media, codecs *string) {
|
||||
if name == core.CodecAAC {
|
||||
name = "AAC"
|
||||
}
|
||||
if strings.Contains(*codecs, name) {
|
||||
if strings.Contains(text, name) {
|
||||
continue
|
||||
}
|
||||
if len(*codecs) > 0 {
|
||||
*codecs += ","
|
||||
if len(text) > 0 {
|
||||
text += ","
|
||||
}
|
||||
*codecs += name
|
||||
text += name
|
||||
}
|
||||
}
|
||||
|
||||
if text != "" {
|
||||
return errors.New(text)
|
||||
}
|
||||
|
||||
for _, err := range statErrors {
|
||||
s := err.Error()
|
||||
if strings.Contains(text, s) {
|
||||
continue
|
||||
}
|
||||
if len(text) > 0 {
|
||||
text += ","
|
||||
}
|
||||
text += s
|
||||
}
|
||||
|
||||
if text != "" {
|
||||
return errors.New(text)
|
||||
}
|
||||
|
||||
return errors.New("unknown error")
|
||||
}
|
||||
|
19
cmd/streams/stream_test.go
Normal file
19
cmd/streams/stream_test.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package streams
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTemplate(t *testing.T) {
|
||||
source1 := "does not matter"
|
||||
|
||||
stream1 := New("from_yaml", source1)
|
||||
require.Len(t, streams, 1)
|
||||
|
||||
stream2 := NewTemplate("camera.from_hass", "rtsp://localhost:8554/from_yaml?video")
|
||||
|
||||
require.Equal(t, stream1, stream2)
|
||||
require.Equal(t, stream2.producers[0].url, source1)
|
||||
require.Len(t, streams, 2)
|
||||
}
|
35
cmd/tcp/init.go
Normal file
35
cmd/tcp/init.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package tcp
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mpegts"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Init() {
|
||||
streams.HandleFunc("tcp", handle)
|
||||
}
|
||||
|
||||
func handle(rawURL string) (core.Producer, error) {
|
||||
u, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialTimeout("tcp", u.Host, time.Second*3)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := &http.Request{URL: u}
|
||||
res := &http.Response{Body: conn, Request: req}
|
||||
client := mpegts.NewClient(res)
|
||||
if err := client.Handle(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
}
|
@@ -1,7 +1,9 @@
|
||||
# syntax=docker/dockerfile:labs
|
||||
|
||||
# 0. Prepare images
|
||||
# only debian 12 (bookworm) has latest ffmpeg
|
||||
ARG DEBIAN_VERSION="bookworm-slim"
|
||||
ARG GO_VERSION="1.19-buster"
|
||||
ARG GO_VERSION="1.20-buster"
|
||||
ARG NGROK_VERSION="3"
|
||||
|
||||
FROM debian:${DEBIAN_VERSION} AS base
|
||||
@@ -16,37 +18,39 @@ WORKDIR /build
|
||||
|
||||
# Cache dependencies
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build go mod download
|
||||
|
||||
COPY . .
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
|
||||
|
||||
|
||||
# 2. Collect all files
|
||||
FROM scratch AS rootfs
|
||||
|
||||
COPY --from=build /build/go2rtc /usr/local/bin/
|
||||
COPY --from=ngrok /bin/ngrok /usr/local/bin/
|
||||
COPY ./build/docker/run.sh /
|
||||
|
||||
COPY --link --from=build /build/go2rtc /usr/local/bin/
|
||||
COPY --link --from=ngrok /bin/ngrok /usr/local/bin/
|
||||
|
||||
# 3. Final image
|
||||
FROM base
|
||||
|
||||
# Prepare apt for buildkit cache
|
||||
RUN rm -f /etc/apt/apt.conf.d/docker-clean \
|
||||
&& echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' >/etc/apt/apt.conf.d/keep-cache
|
||||
# Install ffmpeg, bash (for run.sh), tini (for signal handling),
|
||||
# and other common tools for the echo source.
|
||||
# non-free for Intel QSV support (not used by go2rtc, just for tests)
|
||||
RUN echo 'deb http://deb.debian.org/debian bookworm non-free' > /etc/apt/sources.list.d/debian-non-free.list && \
|
||||
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked --mount=type=cache,target=/var/lib/apt,sharing=locked \
|
||||
echo 'deb http://deb.debian.org/debian bookworm non-free' > /etc/apt/sources.list.d/debian-non-free.list && \
|
||||
apt-get -y update && apt-get -y install tini ffmpeg python3 curl jq intel-media-va-driver-non-free
|
||||
|
||||
COPY --from=rootfs / /
|
||||
COPY --link --from=rootfs / /
|
||||
|
||||
|
||||
RUN chmod a+x /run.sh && mkdir -p /config
|
||||
|
||||
ENTRYPOINT ["/usr/bin/tini", "--"]
|
||||
|
||||
VOLUME /config
|
||||
WORKDIR /config
|
||||
# https://github.com/NVIDIA/nvidia-docker/wiki/Installation-(Native-GPU-Support)
|
||||
ENV NVIDIA_VISIBLE_DEVICES all
|
||||
ENV NVIDIA_DRIVER_CAPABILITIES compute,video,utility
|
||||
|
||||
CMD ["/run.sh"]
|
||||
CMD ["go2rtc", "-config", "/config/go2rtc.yaml"]
|
||||
|
2
main.go
2
main.go
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/cmd/srtp"
|
||||
"github.com/AlexxIT/go2rtc/cmd/streams"
|
||||
"github.com/AlexxIT/go2rtc/cmd/tapo"
|
||||
"github.com/AlexxIT/go2rtc/cmd/tcp"
|
||||
"github.com/AlexxIT/go2rtc/cmd/webrtc"
|
||||
"github.com/AlexxIT/go2rtc/cmd/webtorrent"
|
||||
"os"
|
||||
@@ -49,6 +50,7 @@ func main() {
|
||||
isapi.Init()
|
||||
mpegts.Init()
|
||||
roborock.Init()
|
||||
tcp.Init()
|
||||
|
||||
srtp.Init()
|
||||
homekit.Init()
|
||||
|
@@ -82,6 +82,13 @@ func (m *Media) MatchAll() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *Media) Equal(media *Media) bool {
|
||||
if media.ID != "" {
|
||||
return m.ID == media.ID
|
||||
}
|
||||
return m.String() == media.String()
|
||||
}
|
||||
|
||||
func GetKind(name string) string {
|
||||
switch name {
|
||||
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
|
||||
|
@@ -18,7 +18,7 @@ type Receiver struct {
|
||||
ID byte // Channel for RTSP, PayloadType for MPEG-TS
|
||||
|
||||
senders map[*Sender]chan *rtp.Packet
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
bytes int
|
||||
}
|
||||
|
||||
@@ -32,9 +32,9 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
|
||||
t.mu.Lock()
|
||||
t.bytes += len(packet.Payload)
|
||||
for sender, buffer := range t.senders {
|
||||
if len(buffer) < cap(buffer) {
|
||||
buffer <- packet
|
||||
} else {
|
||||
select {
|
||||
case buffer <- packet:
|
||||
default:
|
||||
sender.overflow++
|
||||
}
|
||||
}
|
||||
@@ -42,11 +42,11 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
|
||||
}
|
||||
|
||||
func (t *Receiver) Senders() (senders []*Sender) {
|
||||
t.mu.Lock()
|
||||
t.mu.RLock()
|
||||
for sender := range t.senders {
|
||||
senders = append(senders, sender)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -73,12 +73,9 @@ func (t *Receiver) Replace(target *Receiver) {
|
||||
|
||||
func (t *Receiver) String() string {
|
||||
s := t.Codec.String() + ", bytes=" + strconv.Itoa(t.bytes)
|
||||
if t.mu.TryLock() {
|
||||
t.mu.RLock()
|
||||
s += fmt.Sprintf(", senders=%d", len(t.senders))
|
||||
t.mu.Unlock()
|
||||
} else {
|
||||
s += fmt.Sprintf(", senders=?")
|
||||
}
|
||||
t.mu.RUnlock()
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -93,7 +90,7 @@ type Sender struct {
|
||||
Handler HandlerFunc
|
||||
|
||||
receivers []*Receiver
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
bytes int
|
||||
|
||||
overflow int
|
||||
@@ -127,7 +124,6 @@ func (s *Sender) HandleRTP(track *Receiver) {
|
||||
}
|
||||
track.senders[s] = buffer
|
||||
track.mu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
s.receivers = append(s.receivers, track)
|
||||
s.mu.Unlock()
|
||||
@@ -135,7 +131,9 @@ func (s *Sender) HandleRTP(track *Receiver) {
|
||||
go func() {
|
||||
// read packets from buffer channel until it will be closed
|
||||
for packet := range buffer {
|
||||
s.mu.Lock()
|
||||
s.bytes += len(packet.Payload)
|
||||
s.mu.Unlock()
|
||||
s.Handler(packet)
|
||||
}
|
||||
|
||||
@@ -171,12 +169,9 @@ func (s *Sender) Close() {
|
||||
|
||||
func (s *Sender) String() string {
|
||||
info := s.Codec.String() + ", bytes=" + strconv.Itoa(s.bytes)
|
||||
if s.mu.TryLock() {
|
||||
s.mu.RLock()
|
||||
info += ", receivers=" + strconv.Itoa(len(s.receivers))
|
||||
s.mu.Unlock()
|
||||
} else {
|
||||
info += ", receivers=?"
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
if s.overflow > 0 {
|
||||
info += ", overflow=" + strconv.Itoa(s.overflow)
|
||||
}
|
||||
|
@@ -5,16 +5,19 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
)
|
||||
|
||||
var Timeout = time.Second * 5
|
||||
|
||||
func NewClient(uri string) *Conn {
|
||||
return &Conn{uri: uri}
|
||||
}
|
||||
@@ -28,10 +31,6 @@ func (c *Conn) Dial() (err error) {
|
||||
c.URL.Host += ":554"
|
||||
}
|
||||
|
||||
// remove UserInfo from URL
|
||||
c.auth = tcp.NewAuth(c.URL.User)
|
||||
c.URL.User = nil
|
||||
|
||||
c.conn, err = net.DialTimeout("tcp", c.URL.Host, time.Second*5)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -53,50 +52,24 @@ func (c *Conn) Dial() (err error) {
|
||||
c.conn = tlsConn
|
||||
}
|
||||
|
||||
// remove UserInfo from URL
|
||||
c.auth = tcp.NewAuth(c.URL.User)
|
||||
c.URL.User = nil
|
||||
|
||||
c.reader = bufio.NewReader(c.conn)
|
||||
c.session = ""
|
||||
c.state = StateConn
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Request sends only Request
|
||||
func (c *Conn) Request(req *tcp.Request) error {
|
||||
if req.Proto == "" {
|
||||
req.Proto = ProtoRTSP
|
||||
}
|
||||
|
||||
if req.Header == nil {
|
||||
req.Header = make(map[string][]string)
|
||||
}
|
||||
|
||||
c.sequence++
|
||||
// important to send case sensitive CSeq
|
||||
// https://github.com/AlexxIT/go2rtc/issues/7
|
||||
req.Header["CSeq"] = []string{strconv.Itoa(c.sequence)}
|
||||
|
||||
c.auth.Write(req)
|
||||
|
||||
if c.Session != "" {
|
||||
req.Header.Set("Session", c.Session)
|
||||
}
|
||||
|
||||
if req.Body != nil {
|
||||
val := strconv.Itoa(len(req.Body))
|
||||
req.Header.Set("Content-Length", val)
|
||||
}
|
||||
|
||||
c.Fire(req)
|
||||
|
||||
return req.Write(c.conn)
|
||||
}
|
||||
|
||||
// Do send Request and receive and process Response
|
||||
// Do send WriteRequest and receive and process WriteResponse
|
||||
func (c *Conn) Do(req *tcp.Request) (*tcp.Response, error) {
|
||||
if err := c.Request(req); err != nil {
|
||||
if err := c.WriteRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := tcp.ReadResponse(c.reader)
|
||||
res, err := c.ReadResponse()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -126,40 +99,6 @@ func (c *Conn) Do(req *tcp.Request) (*tcp.Response, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Conn) Response(res *tcp.Response) error {
|
||||
if res.Proto == "" {
|
||||
res.Proto = ProtoRTSP
|
||||
}
|
||||
|
||||
if res.Status == "" {
|
||||
res.Status = "200 OK"
|
||||
}
|
||||
|
||||
if res.Header == nil {
|
||||
res.Header = make(map[string][]string)
|
||||
}
|
||||
|
||||
if res.Request != nil && res.Request.Header != nil {
|
||||
seq := res.Request.Header.Get("CSeq")
|
||||
if seq != "" {
|
||||
res.Header.Set("CSeq", seq)
|
||||
}
|
||||
}
|
||||
|
||||
if c.Session != "" {
|
||||
res.Header.Set("Session", c.Session)
|
||||
}
|
||||
|
||||
if res.Body != nil {
|
||||
val := strconv.Itoa(len(res.Body))
|
||||
res.Header.Set("Content-Length", val)
|
||||
}
|
||||
|
||||
c.Fire(res)
|
||||
|
||||
return res.Write(c.conn)
|
||||
}
|
||||
|
||||
func (c *Conn) Options() error {
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
|
||||
@@ -211,11 +150,18 @@ func (c *Conn) Describe() error {
|
||||
}
|
||||
}
|
||||
|
||||
c.Medias, err = UnmarshalSDP(res.Body)
|
||||
medias, err := UnmarshalSDP(res.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: rewrite more smart
|
||||
if c.Medias == nil {
|
||||
c.Medias = medias
|
||||
} else if len(c.Medias) > len(medias) {
|
||||
c.Medias = c.Medias[:len(medias)]
|
||||
}
|
||||
|
||||
c.mode = core.ModeActiveProducer
|
||||
|
||||
return nil
|
||||
@@ -242,33 +188,12 @@ func (c *Conn) Announce() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) Setup() error {
|
||||
for _, media := range c.Medias {
|
||||
_, err := c.SetupMedia(media, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) SetupMedia(media *core.Media, first bool) (byte, error) {
|
||||
// TODO: rewrite recoonection and first flag
|
||||
if first {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
}
|
||||
|
||||
if c.state != StateConn && c.state != StateSetup {
|
||||
return 0, fmt.Errorf("RTSP SETUP from wrong state: %s", c.state)
|
||||
}
|
||||
|
||||
func (c *Conn) SetupMedia(media *core.Media) (byte, error) {
|
||||
var transport string
|
||||
|
||||
// try to use media position as channel number
|
||||
for i, m := range c.Medias {
|
||||
if m.ID == media.ID {
|
||||
if m.Equal(media) {
|
||||
transport = fmt.Sprintf(
|
||||
// i - RTP (data channel)
|
||||
// i+1 - RTCP (control channel)
|
||||
@@ -303,37 +228,28 @@ func (c *Conn) SetupMedia(media *core.Media, first bool) (byte, error) {
|
||||
},
|
||||
}
|
||||
|
||||
var res *tcp.Response
|
||||
res, err = c.Do(req)
|
||||
res, err := c.Do(req)
|
||||
if err != nil {
|
||||
// some Dahua/Amcrest cameras fail here because two simultaneous
|
||||
// backchannel connections
|
||||
if c.Backchannel {
|
||||
c.Backchannel = false
|
||||
if err := c.Dial(); err != nil {
|
||||
if err = c.Reconnect(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := c.Describe(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, newMedia := range c.Medias {
|
||||
if newMedia.ID == media.ID {
|
||||
return c.SetupMedia(newMedia, false)
|
||||
}
|
||||
}
|
||||
return c.SetupMedia(media)
|
||||
}
|
||||
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if c.Session == "" {
|
||||
if c.session == "" {
|
||||
// Session: 216525287999;timeout=60
|
||||
if s := res.Header.Get("Session"); s != "" {
|
||||
if j := strings.IndexByte(s, ';'); j > 0 {
|
||||
s = s[:j]
|
||||
c.session, s, _ = strings.Cut(s, ";timeout=")
|
||||
if s != "" {
|
||||
c.keepalive, _ = strconv.Atoi(s)
|
||||
}
|
||||
c.Session = s
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,8 +267,6 @@ func (c *Conn) SetupMedia(media *core.Media, first bool) (byte, error) {
|
||||
}
|
||||
}
|
||||
|
||||
c.state = StateSetup
|
||||
|
||||
channel := core.Between(transport, "interleaved=", "-")
|
||||
i, err := strconv.Atoi(channel)
|
||||
if err != nil {
|
||||
@@ -363,36 +277,17 @@ func (c *Conn) SetupMedia(media *core.Media, first bool) (byte, error) {
|
||||
}
|
||||
|
||||
func (c *Conn) Play() (err error) {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state != StateSetup {
|
||||
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
|
||||
}
|
||||
|
||||
req := &tcp.Request{Method: MethodPlay, URL: c.URL}
|
||||
if err = c.Request(req); err == nil {
|
||||
c.state = StatePlay
|
||||
}
|
||||
|
||||
return
|
||||
return c.WriteRequest(req)
|
||||
}
|
||||
|
||||
func (c *Conn) Teardown() (err error) {
|
||||
// allow TEARDOWN from any state (ex. ANNOUNCE > SETUP)
|
||||
req := &tcp.Request{Method: MethodTeardown, URL: c.URL}
|
||||
return c.Request(req)
|
||||
return c.WriteRequest(req)
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state == StateNone {
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = c.Teardown()
|
||||
c.state = StateNone
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
94
pkg/rtsp/client_test.go
Normal file
94
pkg/rtsp/client_test.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package rtsp
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
Timeout = time.Millisecond
|
||||
|
||||
ln, err := net.Listen("tcp", "localhost:0")
|
||||
require.Nil(t, err)
|
||||
|
||||
client := NewClient("rtsp://" + ln.Addr().String() + "/stream")
|
||||
client.Backchannel = true
|
||||
|
||||
err = client.Dial()
|
||||
require.Nil(t, err)
|
||||
|
||||
err = client.Describe()
|
||||
require.ErrorIs(t, err, os.ErrDeadlineExceeded)
|
||||
}
|
||||
|
||||
func TestMissedControl(t *testing.T) {
|
||||
Timeout = time.Millisecond
|
||||
|
||||
ln, err := net.Listen("tcp", "localhost:0")
|
||||
require.Nil(t, err)
|
||||
|
||||
go func() {
|
||||
conn, err := ln.Accept()
|
||||
require.Nil(t, err)
|
||||
|
||||
b := make([]byte, 8192)
|
||||
for {
|
||||
n, err := conn.Read(b)
|
||||
require.Nil(t, err)
|
||||
|
||||
req := string(b[:n])
|
||||
|
||||
switch req[:4] {
|
||||
case "DESC":
|
||||
_, _ = conn.Write([]byte(`RTSP/1.0 200 OK
|
||||
Cseq: 1
|
||||
Content-Length: 495
|
||||
Content-Type: application/sdp
|
||||
|
||||
v=0
|
||||
o=- 1 1 IN IP4 0.0.0.0
|
||||
s=go2rtc/1.2.0
|
||||
c=IN IP4 0.0.0.0
|
||||
t=0 0
|
||||
m=audio 0 RTP/AVP 96
|
||||
a=rtpmap:96 MPEG4-GENERIC/48000/2
|
||||
a=fmtp:96 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=119056E500
|
||||
m=audio 0 RTP/AVP 97
|
||||
a=rtpmap:97 OPUS/48000/2
|
||||
a=fmtp:97 sprop-stereo=1
|
||||
m=video 0 RTP/AVP 98
|
||||
a=rtpmap:98 H264/90000
|
||||
a=fmtp:98 packetization-mode=1; sprop-parameter-sets=Z2QAKaw0yAeAIn5cBagICAoAAAfQAAE4gdDAAjhAACOEF3lxoYAEcIAARwgu8uFA,aO48MAA=; profile-level-id=640029
|
||||
`))
|
||||
|
||||
case "SETU":
|
||||
_, _ = conn.Write([]byte(`RTSP/1.0 200 OK
|
||||
Transport: RTP/AVP/TCP;unicast;interleaved=4-5
|
||||
Cseq: 3
|
||||
Session: 1
|
||||
|
||||
`))
|
||||
|
||||
default:
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
client := NewClient("rtsp://" + ln.Addr().String() + "/stream")
|
||||
client.Backchannel = true
|
||||
|
||||
err = client.Dial()
|
||||
require.Nil(t, err)
|
||||
|
||||
err = client.Describe()
|
||||
require.Nil(t, err)
|
||||
require.Len(t, client.Medias, 3)
|
||||
|
||||
ch, err := client.SetupMedia(client.Medias[2], true)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, ch, byte(4))
|
||||
}
|
170
pkg/rtsp/conn.go
170
pkg/rtsp/conn.go
@@ -25,7 +25,6 @@ type Conn struct {
|
||||
SessionName string
|
||||
|
||||
Medias []*core.Media
|
||||
Session string
|
||||
UserAgent string
|
||||
URL *url.URL
|
||||
|
||||
@@ -33,13 +32,16 @@ type Conn struct {
|
||||
|
||||
auth *tcp.Auth
|
||||
conn net.Conn
|
||||
keepalive int
|
||||
mode core.Mode
|
||||
state State
|
||||
stateMu sync.Mutex
|
||||
reader *bufio.Reader
|
||||
sequence int
|
||||
session string
|
||||
uri string
|
||||
|
||||
state State
|
||||
stateMu sync.Mutex
|
||||
|
||||
receivers []*core.Receiver
|
||||
senders []*core.Sender
|
||||
|
||||
@@ -68,13 +70,12 @@ func (s State) String() string {
|
||||
case StateNone:
|
||||
return "NONE"
|
||||
case StateConn:
|
||||
|
||||
return "CONN"
|
||||
case StateSetup:
|
||||
return "SETUP"
|
||||
return MethodSetup
|
||||
case StatePlay:
|
||||
return "PLAY"
|
||||
case StateHandle:
|
||||
return "HANDLE"
|
||||
return MethodPlay
|
||||
}
|
||||
return strconv.Itoa(int(s))
|
||||
}
|
||||
@@ -84,38 +85,24 @@ const (
|
||||
StateConn
|
||||
StateSetup
|
||||
StatePlay
|
||||
StateHandle
|
||||
)
|
||||
|
||||
func (c *Conn) Handle() (err error) {
|
||||
c.stateMu.Lock()
|
||||
|
||||
switch c.state {
|
||||
case StateNone: // Close after PLAY and before Handle is OK (because SETUP after PLAY)
|
||||
case StatePlay:
|
||||
c.state = StateHandle
|
||||
default:
|
||||
err = fmt.Errorf("RTSP HANDLE from wrong state: %s", c.state)
|
||||
|
||||
c.state = StateNone
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
|
||||
ok := c.state == StateHandle
|
||||
|
||||
c.stateMu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var timeout time.Duration
|
||||
|
||||
var keepaliveDT time.Duration
|
||||
var keepaliveTS time.Time
|
||||
|
||||
switch c.mode {
|
||||
case core.ModeActiveProducer:
|
||||
// polling frames from remote RTSP Server (ex Camera)
|
||||
go c.keepalive()
|
||||
if c.keepalive > 5 {
|
||||
keepaliveDT = time.Duration(c.keepalive-5) * time.Second
|
||||
} else {
|
||||
keepaliveDT = 25 * time.Second
|
||||
}
|
||||
keepaliveTS = time.Now().Add(keepaliveDT)
|
||||
|
||||
// polling frames from remote RTSP Server (ex Camera)
|
||||
if len(c.receivers) > 0 {
|
||||
// if we receiving video/audio from camera
|
||||
timeout = time.Second * 5
|
||||
@@ -137,7 +124,9 @@ func (c *Conn) Handle() (err error) {
|
||||
}
|
||||
|
||||
for c.state != StateNone {
|
||||
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
|
||||
ts := time.Now()
|
||||
|
||||
if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -158,7 +147,7 @@ func (c *Conn) Handle() (err error) {
|
||||
switch string(buf4) {
|
||||
case "RTSP":
|
||||
var res *tcp.Response
|
||||
if res, err = tcp.ReadResponse(c.reader); err != nil {
|
||||
if res, err = c.ReadResponse(); err != nil {
|
||||
return
|
||||
}
|
||||
c.Fire(res)
|
||||
@@ -166,13 +155,15 @@ func (c *Conn) Handle() (err error) {
|
||||
|
||||
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
|
||||
var req *tcp.Request
|
||||
if req, err = tcp.ReadRequest(c.reader); err != nil {
|
||||
if req, err = c.ReadRequest(); err != nil {
|
||||
return
|
||||
}
|
||||
c.Fire(req)
|
||||
continue
|
||||
|
||||
default:
|
||||
c.Fire("RTSP wrong input")
|
||||
|
||||
for i := 0; ; i++ {
|
||||
// search next start symbol
|
||||
if _, err = c.reader.ReadBytes('$'); err != nil {
|
||||
@@ -204,8 +195,6 @@ func (c *Conn) Handle() (err error) {
|
||||
return fmt.Errorf("RTSP wrong input")
|
||||
}
|
||||
}
|
||||
|
||||
c.Fire("RTSP wrong input")
|
||||
}
|
||||
} else {
|
||||
// hope that the odd channels are always RTCP
|
||||
@@ -254,21 +243,106 @@ func (c *Conn) Handle() (err error) {
|
||||
|
||||
c.Fire(msg)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) keepalive() {
|
||||
// TODO: rewrite to RTCP
|
||||
if keepaliveDT != 0 && ts.After(keepaliveTS) {
|
||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||
for {
|
||||
time.Sleep(time.Second * 25)
|
||||
if c.state == StateNone {
|
||||
if err = c.WriteRequest(req); err != nil {
|
||||
return
|
||||
}
|
||||
if err := c.Request(req); err != nil {
|
||||
|
||||
keepaliveTS = ts.Add(keepaliveDT)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) WriteRequest(req *tcp.Request) error {
|
||||
if req.Proto == "" {
|
||||
req.Proto = ProtoRTSP
|
||||
}
|
||||
|
||||
if req.Header == nil {
|
||||
req.Header = make(map[string][]string)
|
||||
}
|
||||
|
||||
c.sequence++
|
||||
// important to send case sensitive CSeq
|
||||
// https://github.com/AlexxIT/go2rtc/issues/7
|
||||
req.Header["CSeq"] = []string{strconv.Itoa(c.sequence)}
|
||||
|
||||
c.auth.Write(req)
|
||||
|
||||
if c.session != "" {
|
||||
req.Header.Set("Session", c.session)
|
||||
}
|
||||
|
||||
if req.Body != nil {
|
||||
val := strconv.Itoa(len(req.Body))
|
||||
req.Header.Set("Content-Length", val)
|
||||
}
|
||||
|
||||
c.Fire(req)
|
||||
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return req.Write(c.conn)
|
||||
}
|
||||
|
||||
func (c *Conn) ReadRequest() (*tcp.Request, error) {
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tcp.ReadRequest(c.reader)
|
||||
}
|
||||
|
||||
func (c *Conn) WriteResponse(res *tcp.Response) error {
|
||||
if res.Proto == "" {
|
||||
res.Proto = ProtoRTSP
|
||||
}
|
||||
|
||||
if res.Status == "" {
|
||||
res.Status = "200 OK"
|
||||
}
|
||||
|
||||
if res.Header == nil {
|
||||
res.Header = make(map[string][]string)
|
||||
}
|
||||
|
||||
if res.Request != nil && res.Request.Header != nil {
|
||||
seq := res.Request.Header.Get("CSeq")
|
||||
if seq != "" {
|
||||
res.Header.Set("CSeq", seq)
|
||||
}
|
||||
}
|
||||
|
||||
if c.session != "" {
|
||||
if res.Request != nil && res.Request.Method == MethodSetup {
|
||||
res.Header.Set("Session", c.session+";timeout=60")
|
||||
} else {
|
||||
res.Header.Set("Session", c.session)
|
||||
}
|
||||
}
|
||||
|
||||
if res.Body != nil {
|
||||
val := strconv.Itoa(len(res.Body))
|
||||
res.Header.Set("Content-Length", val)
|
||||
}
|
||||
|
||||
c.Fire(res)
|
||||
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return res.Write(c.conn)
|
||||
}
|
||||
|
||||
func (c *Conn) ReadResponse() (*tcp.Response, error) {
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tcp.ReadResponse(c.reader)
|
||||
}
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/h265"
|
||||
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
|
||||
"github.com/pion/rtp"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (c *Conn) GetMedias() []*core.Media {
|
||||
@@ -28,9 +29,20 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
|
||||
|
||||
switch c.mode {
|
||||
case core.ModeActiveProducer: // backchannel
|
||||
if channel, err = c.SetupMedia(media, true); err != nil {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state == StatePlay {
|
||||
if err = c.Reconnect(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if channel, err = c.SetupMedia(media); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.state = StateSetup
|
||||
|
||||
case core.ModePassiveConsumer:
|
||||
channel = byte(len(c.senders)) * 2
|
||||
@@ -76,6 +88,10 @@ func (c *Conn) packetWriter(codec *core.Codec, channel uint8) core.HandlerFunc {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
n, err := c.conn.Write(data)
|
||||
if err != nil {
|
||||
return
|
||||
|
@@ -2,13 +2,15 @@ package rtsp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/sdp/v3"
|
||||
"io"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/sdp/v3"
|
||||
)
|
||||
|
||||
type RTCP struct {
|
||||
@@ -23,12 +25,6 @@ s=-
|
||||
t=0 0`
|
||||
|
||||
func UnmarshalSDP(rawSDP []byte) ([]*core.Media, error) {
|
||||
// fix bug from Reolink Doorbell
|
||||
if i := bytes.Index(rawSDP, []byte("a=sendonlym=")); i > 0 {
|
||||
rawSDP = append(rawSDP[:i+11], rawSDP[i+10:]...)
|
||||
rawSDP[i+10] = '\n'
|
||||
}
|
||||
|
||||
sd := &sdp.SessionDescription{}
|
||||
if err := sd.Unmarshal(rawSDP); err != nil {
|
||||
// fix multiple `s=` https://github.com/AlexxIT/WebRTC/issues/417
|
||||
@@ -38,10 +34,18 @@ func UnmarshalSDP(rawSDP []byte) ([]*core.Media, error) {
|
||||
// fix SDP header for some cameras
|
||||
if i := bytes.Index(rawSDP, []byte("\nm=")); i > 0 {
|
||||
rawSDP = append([]byte(sdpHeader), rawSDP[i:]...)
|
||||
sd = &sdp.SessionDescription{}
|
||||
err = sd.Unmarshal(rawSDP)
|
||||
}
|
||||
|
||||
// Fix invalid media type (errSDPInvalidValue) caused by
|
||||
// some TP-LINK IP camera, e.g. TL-IPC44GW
|
||||
rawSDP = bytes.ReplaceAll(rawSDP, []byte("m=application/TP-LINK "), []byte("m=application "))
|
||||
|
||||
if err == io.EOF {
|
||||
rawSDP = append(rawSDP, '\n')
|
||||
}
|
||||
|
||||
sd = &sdp.SessionDescription{}
|
||||
err = sd.Unmarshal(rawSDP)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -2,7 +2,7 @@ package rtsp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"errors"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
)
|
||||
|
||||
@@ -15,51 +15,86 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e
|
||||
}
|
||||
}
|
||||
|
||||
switch c.state {
|
||||
case StateConn, StateSetup:
|
||||
default:
|
||||
return nil, fmt.Errorf("RTSP GetTrack from wrong state: %s", c.state)
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state == StatePlay {
|
||||
if err := c.Reconnect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
channel, err := c.SetupMedia(media, true)
|
||||
channel, err := c.SetupMedia(media)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.state = StateSetup
|
||||
|
||||
track := core.NewReceiver(media, codec)
|
||||
track.ID = byte(channel)
|
||||
track.ID = channel
|
||||
c.receivers = append(c.receivers, track)
|
||||
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (c *Conn) Start() error {
|
||||
func (c *Conn) Start() (err error) {
|
||||
core.Assert(c.mode == core.ModeActiveProducer || c.mode == core.ModePassiveProducer)
|
||||
|
||||
for {
|
||||
ok := false
|
||||
|
||||
c.stateMu.Lock()
|
||||
switch c.state {
|
||||
case StateNone:
|
||||
err = nil
|
||||
case StateConn:
|
||||
err = errors.New("start from CONN state")
|
||||
case StateSetup:
|
||||
switch c.mode {
|
||||
case core.ModeActiveProducer:
|
||||
if err := c.Play(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.Play()
|
||||
case core.ModePassiveProducer:
|
||||
err = nil
|
||||
default:
|
||||
return fmt.Errorf("start wrong mode: %d", c.mode)
|
||||
err = errors.New("start from wrong mode: " + c.mode.String())
|
||||
}
|
||||
|
||||
if err := c.Handle(); c.state != StateNone {
|
||||
_ = c.conn.Close()
|
||||
return err
|
||||
if err == nil {
|
||||
c.state = StatePlay
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
c.stateMu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
// Handler can return different states:
|
||||
// 1. None after PLAY should exit without error
|
||||
// 2. Play after PLAY should exit from Start with error
|
||||
// 3. Setup after PLAY should Play once again
|
||||
err = c.Handle()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) Stop() error {
|
||||
func (c *Conn) Stop() (err error) {
|
||||
for _, receiver := range c.receivers {
|
||||
receiver.Close()
|
||||
}
|
||||
for _, sender := range c.senders {
|
||||
sender.Close()
|
||||
}
|
||||
return c.Close()
|
||||
|
||||
c.stateMu.Lock()
|
||||
if c.state != StateNone {
|
||||
c.state = StateNone
|
||||
err = c.Close()
|
||||
}
|
||||
c.stateMu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) MarshalJSON() ([]byte, error) {
|
||||
@@ -82,3 +117,32 @@ func (c *Conn) MarshalJSON() ([]byte, error) {
|
||||
|
||||
return json.Marshal(info)
|
||||
}
|
||||
|
||||
func (c *Conn) Reconnect() error {
|
||||
c.Fire("RTSP reconnect")
|
||||
|
||||
// close current session
|
||||
_ = c.Close()
|
||||
|
||||
// start new session
|
||||
if err := c.Dial(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.Describe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// restore previous medias
|
||||
for _, receiver := range c.receivers {
|
||||
if _, err := c.SetupMedia(receiver.Media); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, sender := range c.senders {
|
||||
if _, err := c.SetupMedia(sender.Media); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,10 +1,7 @@
|
||||
package rtsp
|
||||
|
||||
import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -79,63 +76,34 @@ a=fmtp:96 packetization-mode=1;profile-level-id=64001F;sprop-parameter-sets=Z0IA
|
||||
|
||||
func TestBugSDP3(t *testing.T) {
|
||||
s := `v=0
|
||||
o=- 1675775048103026 1 IN IP4 192.168.1.123
|
||||
o=- 1680614126554766 1 IN IP4 192.168.0.3
|
||||
s=Session streamed by "preview"
|
||||
t=0 0
|
||||
a=tool:LIVE555 Streaming Media v2020.08.12
|
||||
a=tool:BC Streaming Media v202210012022.10.01
|
||||
a=type:broadcast
|
||||
a=control:*
|
||||
a=range:npt=0-
|
||||
a=range:npt=now-
|
||||
a=x-qt-text-nam:Session streamed by "preview"
|
||||
m=video 0 RTP/AVP 96
|
||||
c=IN IP4 0.0.0.0
|
||||
b=AS:8192
|
||||
a=rtpmap:96 H264/90000
|
||||
a=range:npt=now-
|
||||
a=fmtp:96 packetization-mode=1;profile-level-id=640033;sprop-parameter-sets=Z2QAM6wVFKAoAPGQ,aO48sA==
|
||||
a=recvonly
|
||||
a=control:track1
|
||||
m=audio 0 RTP/AVP 8
|
||||
a=control:track2
|
||||
a=rtpmap:8 PCMA/8000
|
||||
a=sendonlym=audio 0 RTP/AVP 98
|
||||
m=audio 0 RTP/AVP 97
|
||||
c=IN IP4 0.0.0.0
|
||||
b=AS:8192
|
||||
a=rtpmap:98 MPEG4-GENERIC/16000
|
||||
a=fmtp:98 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408;
|
||||
a=rtpmap:97 MPEG4-GENERIC/16000
|
||||
a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408;
|
||||
a=recvonly
|
||||
a=control:track2
|
||||
m=audio 0 RTP/AVP 8
|
||||
a=control:track3
|
||||
`
|
||||
a=rtpmap:8 PCMA/8000
|
||||
a=sendonly`
|
||||
medias, err := UnmarshalSDP([]byte(s))
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, medias, 3)
|
||||
}
|
||||
|
||||
func TestBugSDP4(t *testing.T) {
|
||||
s := `v=0
|
||||
o=- 1676583297494652 1676583297494652 IN IP4 192.168.1.58
|
||||
s=Media Presentation
|
||||
e=NONE
|
||||
b=AS:5050
|
||||
t=0 0
|
||||
a=control:rtsp://192.168.1.58:554/h264_stream/
|
||||
m=video 0 RTP/AVP 96
|
||||
b=AS:5000
|
||||
a=control:rtsp://192.168.1.58:554/h264_stream/trackID=1
|
||||
a=rtpmap:96 H265/90000
|
||||
a=fmtp:96 profile-level-id=420029; packetization-mode=1; sprop-parameter-sets=
|
||||
a=Media_header:MEDIAINFO=494D4B48010100000400050000000000000000000000000000000000000000000000000000000000;
|
||||
a=appversion:1.0
|
||||
`
|
||||
s = strings.ReplaceAll(s, "\n", "\r\n")
|
||||
medias, err := UnmarshalSDP([]byte(s))
|
||||
assert.Nil(t, err)
|
||||
|
||||
codec := medias[0].Codecs[0]
|
||||
assert.Equal(t, core.CodecH264, codec.Name)
|
||||
|
||||
sps, _ := h264.GetParameterSet(codec.FmtpLine)
|
||||
assert.Nil(t, sps)
|
||||
|
||||
profile := h264.GetProfileLevelID(codec.FmtpLine)
|
||||
assert.Equal(t, "420029", profile)
|
||||
}
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -25,7 +26,7 @@ func (c *Conn) Auth(username, password string) {
|
||||
|
||||
func (c *Conn) Accept() error {
|
||||
for {
|
||||
req, err := tcp.ReadRequest(c.reader)
|
||||
req, err := c.ReadRequest()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -42,7 +43,7 @@ func (c *Conn) Accept() error {
|
||||
Status: "401 Unauthorized",
|
||||
Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}},
|
||||
}
|
||||
if err = c.Response(res); err != nil {
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
@@ -58,7 +59,7 @@ func (c *Conn) Accept() error {
|
||||
},
|
||||
Request: req,
|
||||
}
|
||||
if err = c.Response(res); err != nil {
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -83,7 +84,7 @@ func (c *Conn) Accept() error {
|
||||
c.Fire(MethodAnnounce)
|
||||
|
||||
res := &tcp.Response{Request: req}
|
||||
if err = c.Response(res); err != nil {
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -96,7 +97,7 @@ func (c *Conn) Accept() error {
|
||||
Status: "404 Not Found",
|
||||
Request: req,
|
||||
}
|
||||
return c.Response(res)
|
||||
return c.WriteResponse(res)
|
||||
}
|
||||
|
||||
res := &tcp.Response{
|
||||
@@ -108,11 +109,12 @@ func (c *Conn) Accept() error {
|
||||
|
||||
// convert tracks to real output medias medias
|
||||
var medias []*core.Media
|
||||
for _, track := range c.senders {
|
||||
for i, track := range c.senders {
|
||||
media := &core.Media{
|
||||
Kind: core.GetKind(track.Codec.Name),
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{track.Codec},
|
||||
ID: "trackID=" + strconv.Itoa(i),
|
||||
}
|
||||
medias = append(medias, media)
|
||||
}
|
||||
@@ -122,7 +124,7 @@ func (c *Conn) Accept() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.Response(res); err != nil {
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -136,27 +138,24 @@ func (c *Conn) Accept() error {
|
||||
|
||||
const transport = "RTP/AVP/TCP;unicast;interleaved="
|
||||
if strings.HasPrefix(tr, transport) {
|
||||
c.Session = "1" // TODO: fixme
|
||||
c.session = core.RandString(8, 10)
|
||||
c.state = StateSetup
|
||||
res.Header.Set("Transport", tr[:len(transport)+3])
|
||||
} else {
|
||||
res.Status = "461 Unsupported transport"
|
||||
}
|
||||
|
||||
if err = c.Response(res); err != nil {
|
||||
if err = c.WriteResponse(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case MethodRecord, MethodPlay:
|
||||
res := &tcp.Response{Request: req}
|
||||
if err = c.Response(res); err == nil {
|
||||
c.state = StatePlay
|
||||
}
|
||||
return err
|
||||
return c.WriteResponse(res)
|
||||
|
||||
case MethodTeardown:
|
||||
res := &tcp.Response{Request: req}
|
||||
_ = c.Response(res)
|
||||
_ = c.WriteResponse(res)
|
||||
c.state = StateNone
|
||||
return c.conn.Close()
|
||||
|
||||
|
Reference in New Issue
Block a user