BIG rewrite stream info

This commit is contained in:
Alex X
2024-06-15 16:46:03 +03:00
parent ecfe802065
commit 96504e2fb0
88 changed files with 1043 additions and 854 deletions

View File

@@ -8,6 +8,7 @@ import (
"net/url"
"os"
"os/exec"
"slices"
"strings"
"sync"
"time"
@@ -80,7 +81,7 @@ func execHandle(rawURL string) (core.Producer, error) {
return handleRTSP(rawURL, cmd, path)
}
func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}
@@ -104,12 +105,17 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}
if info, ok := prod.(core.Info); ok {
info.SetProtocol("pipe")
setRemoteInfo(info, source, cmd.Args)
}
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run pipe")
return prod, nil
}
func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
@@ -131,7 +137,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
ts := time.Now()
if err := cmd.Start(); err != nil {
log.Error().Err(err).Str("url", url).Msg("[exec]")
log.Error().Err(err).Str("source", source).Msg("[exec]")
return nil, err
}
@@ -143,13 +149,14 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
log.Error().Str("source", source).Msg("[exec] timeout")
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = func() error {
log.Debug().Msgf("[exec] kill rtsp")
return errors.Join(cmd.Process.Kill(), cmd.Wait())
@@ -210,3 +217,15 @@ func trimSpace(b []byte) []byte {
}
return b[start:stop]
}
func setRemoteInfo(info core.Info, source string, args []string) {
info.SetSource(source)
if i := slices.Index(args, "-i"); i > 0 && i < len(args)-1 {
rawURL := args[i+1]
if u, err := url.Parse(rawURL); err == nil && u.Host != "" {
info.SetRemoteAddr(u.Host)
info.SetURL(rawURL)
}
}
}

View File

@@ -13,7 +13,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
url string
query url.Values
ffmpeg core.Producer
@@ -31,7 +31,8 @@ func NewProducer(url string) (core.Producer, error) {
return nil, errors.New("ffmpeg: unsupported params: " + url[i:])
}
p.Type = "FFmpeg producer"
p.ID = core.NewID()
p.FormatName = "ffmpeg"
p.Medias = []*core.Media{
{
// we can support only audio, because don't know FmtpLine for H264 and PayloadType for MJPEG
@@ -81,7 +82,7 @@ func (p *Producer) Stop() error {
func (p *Producer) MarshalJSON() ([]byte, error) {
if p.ffmpeg == nil {
return json.Marshal(p.SuperProducer)
return json.Marshal(p.Connection)
}
return json.Marshal(p.ffmpeg)
}

View File

@@ -63,7 +63,7 @@ func apiStream(w http.ResponseWriter, r *http.Request) {
return
}
s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent())
s, err = webrtc.ExchangeSDP(stream, string(offer), "hass/webrtc", r.UserAgent())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -63,15 +62,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
if medias != nil {
c := mp4.NewConsumer(medias)
c.Type = "HLS/fMP4 consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/fmp4"
c.WithRequest(r)
cons = c
} else {
c := mpegts.NewConsumer()
c.Type = "HLS/TS consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/mpegts"
c.WithRequest(r)
cons = c
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
@@ -20,9 +19,8 @@ func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
codecs := msg.String()
medias := mp4.ParseCodecs(codecs, true)
cons := mp4.NewConsumer(medias)
cons.Type = "HLS/fMP4 consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "hls/fmp4"
cons.WithRequest(tr.Request)
log.Trace().Msgf("[hls] new ws consumer codecs=%s", codecs)

View File

@@ -11,9 +11,9 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hls"
"github.com/AlexxIT/go2rtc/pkg/image"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
@@ -45,6 +45,21 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}
}
prod, err := do(req)
if err != nil {
return nil, err
}
if info, ok := prod.(core.Info); ok {
info.SetProtocol("http")
info.SetRemoteAddr(req.URL.Host) // TODO: rewrite to net.Conn
info.SetURL(rawURL)
}
return prod, nil
}
func do(req *http.Request) (core.Producer, error) {
res, err := tcp.Do(req)
if err != nil {
return nil, err
@@ -66,14 +81,12 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}
switch {
case ct == "image/jpeg":
return mjpeg.NewClient(res), nil
case ct == "multipart/x-mixed-replace":
return multipart.Open(res.Body)
case ct == "application/vnd.apple.mpegurl" || ext == "m3u8":
return hls.OpenURL(req.URL, res.Body)
case ct == "image/jpeg":
return image.Open(res)
case ct == "multipart/x-mixed-replace":
return mpjpeg.Open(res.Body)
}
return magic.Open(res.Body)

View File

@@ -17,7 +17,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/y4m"
"github.com/rs/zerolog"
)
@@ -44,8 +44,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
}
cons := magic.NewKeyframe()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
@@ -100,8 +99,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
}
cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.mjpeg] add consumer")
@@ -117,7 +115,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
wr := mjpeg.NewWriter(w)
_, _ = cons.WriteTo(wr)
} else {
cons.Type = "ASCII passive consumer "
cons.FormatName = "ascii"
query := r.URL.Query()
wr := ascii.NewWriter(w, query.Get("color"), query.Get("back"), query.Get("text"))
@@ -135,17 +133,16 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
return
}
res := &http.Response{Body: r.Body, Header: r.Header, Request: r}
res.Header.Set("Content-Type", "multipart/mixed;boundary=")
prod, _ := mpjpeg.Open(r.Body)
prod.WithRequest(r)
client := mjpeg.NewClient(res)
stream.AddProducer(client)
stream.AddProducer(prod)
if err := client.Start(); err != nil && err != io.EOF {
if err := prod.Start(); err != nil && err != io.EOF {
log.Warn().Err(err).Caller().Send()
}
stream.RemoveProducer(client)
stream.RemoveProducer(prod)
}
func handlerWS(tr *ws.Transport, _ *ws.Message) error {
@@ -155,8 +152,7 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error {
}
cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mjpeg] add consumer")
@@ -183,8 +179,7 @@ func apiStreamY4M(w http.ResponseWriter, r *http.Request) {
}
cons := y4m.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()

View File

@@ -13,7 +13,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -100,9 +99,9 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
cons := mp4.NewConsumer(medias)
cons.Type = "MP4/HTTP active consumer"
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.FormatName = "mp4"
cons.Protocol = "http"
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()

View File

@@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
@@ -24,9 +23,8 @@ func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
}
cons := mp4.NewConsumer(medias)
cons.Type = "MSE/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "mse/fmp4"
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mp4] add consumer")
@@ -57,9 +55,7 @@ func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
}
cons := mp4.NewKeyframe(medias)
cons.Type = "MP4/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()

View File

@@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
@@ -18,8 +17,7 @@ func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
}
cons := aac.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func Init() {
@@ -31,8 +30,7 @@ func outputMpegTS(w http.ResponseWriter, r *http.Request) {
}
cons := mpegts.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -128,11 +127,7 @@ func tcpHandle(netConn net.Conn) error {
var log zerolog.Logger
func streamsHandle(url string) (core.Producer, error) {
client, err := rtmp.DialPlay(url)
if err != nil {
return nil, err
}
return client, nil
return rtmp.DialPlay(url)
}
func streamsConsumerHandle(url string) (core.Consumer, func(), error) {
@@ -165,9 +160,7 @@ func outputFLV(w http.ResponseWriter, r *http.Request) {
}
cons := flv.NewConsumer()
cons.Type = "HTTP-FLV consumer"
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()

View File

@@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/probe"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func apiStreams(w http.ResponseWriter, r *http.Request) {
@@ -30,8 +29,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) {
cons := probe.NewProbe(query)
if len(cons.Medias) != 0 {
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@@ -7,7 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
)
type Handler func(url string) (core.Producer, error)
type Handler func(source string) (core.Producer, error)
var handlers = map[string]Handler{}

View File

@@ -41,7 +41,7 @@ func streamsHandler(rawURL string) (core.Producer, error) {
// https://aws.amazon.com/kinesis/video-streams/
// https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/what-is-kvswebrtc.html
// https://github.com/orgs/awslabs/repositories?q=kinesis+webrtc
return kinesisClient(rawURL, query, "WebRTC/Kinesis")
return kinesisClient(rawURL, query, "webrtc/kinesis")
} else if format == "openipc" {
return openIPCClient(rawURL, query)
} else {
@@ -86,8 +86,9 @@ func go2rtcClient(url string) (core.Producer, error) {
var connMu sync.Mutex
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WebSocket async"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = url
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
@@ -180,8 +181,9 @@ func whepClient(url string) (core.Producer, error) {
}
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WHEP sync"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "http"
prod.URL = url
medias := []*core.Media{
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},

View File

@@ -34,7 +34,7 @@ func (k kinesisResponse) String() string {
return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload)
}
func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) {
func kinesisClient(rawURL string, query url.Values, format string) (core.Producer, error) {
// 1. Connect to signalign server
conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil)
if err != nil {
@@ -79,8 +79,10 @@ func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer,
}
prod := webrtc.NewConn(pc)
prod.Desc = desc
prod.FormatName = format
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = rawURL
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
@@ -216,5 +218,5 @@ func wyzeClient(rawURL string) (core.Producer, error) {
"ice_servers": []string{string(kvs.Servers)},
}
return kinesisClient(kvs.URL, query, "WebRTC/Wyze")
return kinesisClient(kvs.URL, query, "webrtc/wyze")
}

View File

@@ -193,8 +193,10 @@ func milestoneClient(rawURL string, query url.Values) (core.Producer, error) {
}
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/Milestone"
prod.FormatName = "webrtc/milestone"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "http"
prod.URL = rawURL
offer, err := mc.GetOffer()
if err != nil {

View File

@@ -53,8 +53,10 @@ func openIPCClient(rawURL string, query url.Values) (core.Producer, error) {
var connState core.Waiter
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/OpenIPC"
prod.FormatName = "webrtc/openipc"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = rawURL
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:

View File

@@ -100,11 +100,11 @@ func outputWebRTC(w http.ResponseWriter, r *http.Request) {
switch mediaType {
case "application/json":
desc = "WebRTC/JSON sync"
desc = "webrtc/json"
case MimeSDP:
desc = "WebRTC/WHEP sync"
desc = "webrtc/whep"
default:
desc = "WebRTC/HTTP sync"
desc = "webrtc/post"
}
answer, err := ExchangeSDP(stream, offer, desc, r.UserAgent())
@@ -168,8 +168,8 @@ func inputWebRTC(w http.ResponseWriter, r *http.Request) {
// create new webrtc instance
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WHIP sync"
prod.Mode = core.ModePassiveProducer
prod.Protocol = "http"
prod.UserAgent = r.UserAgent()
if err = prod.SetOffer(string(offer)); err != nil {

View File

@@ -117,8 +117,8 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error {
defer sendAnswer.Done(nil)
conn := webrtc.NewConn(pc)
conn.Desc = "WebRTC/WebSocket async"
conn.Mode = mode
conn.Protocol = "ws"
conn.UserAgent = tr.Request.UserAgent()
conn.Listen(func(msg any) {
switch msg := msg.(type) {
@@ -207,8 +207,9 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer
// create new webrtc instance
conn := webrtc.NewConn(pc)
conn.Desc = desc
conn.FormatName = desc
conn.UserAgent = userAgent
conn.Protocol = "http"
conn.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:

View File

@@ -47,7 +47,7 @@ func Init() {
if stream == nil {
return "", errors.New(api.StreamNotFound)
}
return webrtc.ExchangeSDP(stream, offer, "WebRTC/WebTorrent sync", "")
return webrtc.ExchangeSDP(stream, offer, "webtorrent", "")
},
}

View File

@@ -1,3 +1,85 @@
# Notes
go2rtc tries to name formats, protocols and codecs the same way they are named in FFmpeg.
Some formats and protocols go2rtc supports exclusively. They have no equivalent in FFmpeg.
## Producers (input)
- The initiator of the connection can be go2rtc - **Source protocols**
- The initiator of the connection can be an external program - **Ingress protocols**
- Codecs can be incoming - **Recevers codecs**
- Codecs can be outgoing (two way audio) - **Senders codecs**
| Format | Source protocols | Ingress protocols | Recevers codecs | Senders codecs | Example |
|--------------|------------------|-------------------|------------------------------|--------------------|---------------|
| adts | http,tcp,pipe | http | aac | | `http:` |
| bubble | http | | h264,hevc,pcm_alaw | | `bubble:` |
| dvrip | tcp | | h264,hevc,pcm_alaw,pcm_mulaw | pcm_alaw | `dvrip:` |
| flv | http,tcp,pipe | http | h264,aac | | `http:` |
| gopro | http+udp | | TODO | | `gopro:` |
| hass/webrtc | ws+udp,tcp | | TODO | | `hass:` |
| hls/mpegts | http | | h264,h265,aac,opus | | `http:` |
| homekit | homekit+udp | | h264,eld* | | `homekit:` |
| isapi | http | | | pcm_alaw,pcm_mulaw | `isapi:` |
| ivideon | ws | | h264 | | `ivideon:` |
| kasa | http | | h264,pcm_mulaw | | `kasa:` |
| h264 | http,tcp,pipe | http | h264 | | `http:` |
| hevc | http,tcp,pipe | http | hevc | | `http:` |
| mjpeg | http,tcp,pipe | http | mjpeg | | `http:` |
| mpjpeg | http,tcp,pipe | http | mjpeg | | `http:` |
| mpegts | http,tcp,pipe | http | h264,hevc,aac,opus | | `http:` |
| nest/webrtc | http+udp | | TODO | | `nest:` |
| roborock | mqtt+udp | | h264,opus | opus | `roborock:` |
| rtmp | rtmp | rtmp | h264,aac | | `rtmp:` |
| rtsp | rtsp+tcp,ws | rtsp+tcp | h264,hevc,aac,pcm*,opus | pcm*,opus | `rtsp:` |
| stdin | pipe | | | pcm_alaw,pcm_mulaw | `stdin:` |
| tapo | http | | h264,pcma | pcm_alaw | `tapo:` |
| wav | http,tcp,pipe | http | pcm_alaw,pcm_mulaw | | `http:` |
| webrtc* | TODO | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw | `webrtc:` |
| webtorrent | TODO | TODO | TODO | TODO | `webtorrent:` |
| yuv4mpegpipe | http,tcp,pipe | http | rawvideo | | `http:` |
- **eld** - rare variant of aac codec
- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le
- **webrtc** - webrtc/kinesis, webrtc/openipc, webrtc/milestone, webrtc/wyze, webrtc/whep
## Consumers (output)
| Format | Protocol | Send codecs | Recv codecs | Example |
|--------------|-------------|------------------------------|-------------------------|---------------------------------------|
| adts | http | aac | | `GET /api/stream.adts` |
| ascii | http | mjpeg | | `GET /api/stream.ascii` |
| flv | http | h264,aac | | `GET /api/stream.flv` |
| hls/mpegts | http | h264,hevc,aac | | `GET /api/stream.m3u8` |
| hls/fmp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.m3u8?mp4` |
| homekit | homekit+udp | h264,opus | | Apple HomeKit app |
| mjpeg | ws | mjpeg | | `{"type":"mjpeg"}` -> `/api/ws` |
| mpjpeg | http | mjpeg | | `GET /api/stream.mjpeg` |
| mp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.mp4` |
| mse/fmp4 | ws | h264,hevc,aac,pcm*,opus | | `{"type":"mse"}` -> `/api/ws` |
| mpegts | http | h264,hevc,aac | | `GET /api/stream.ts` |
| rtmp | rtmp | h264,aac | | `rtmp://localhost:1935/{stream_name}` |
| rtsp | rtsp+tcp | h264,hevc,aac,pcm*,opus | | `rtsp://localhost:8554/{stream_name}` |
| webrtc | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw,opus | `{"type":"webrtc"}` -> `/api/ws` |
| yuv4mpegpipe | http | rawvideo | | `GET /api/stream.y4m` |
- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le
## Snapshots
| Format | Protocol | Send codecs | Example |
|--------|----------|-------------|-----------------------|
| jpeg | http | mjpeg | `GET /api/frame.jpeg` |
| mp4 | http | h264,hevc | `GET /api/frame.mp4` |
## Developers
File naming:
- `pkg/{format}/producer.go` - producer for this format (also if support backchannel)
- `pkg/{format}/consumer.go` - consumer for this format
- `pkg/{format}/backchanel.go` - producer with only backchannel func
## Useful links
- https://www.wowza.com/blog/streaming-protocols

View File

@@ -8,15 +8,12 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
cons := &Consumer{
wr: core.NewWriteBuffer(nil),
}
cons.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
@@ -25,7 +22,16 @@ func NewConsumer() *Consumer {
},
},
}
return cons
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "adts",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -51,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -10,9 +10,8 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *bufio.Reader
cl io.Closer
}
func Open(r io.Reader) (*Producer, error) {
@@ -23,18 +22,22 @@ func Open(r io.Reader) (*Producer, error) {
return nil, err
}
codec := ADTSToCodec(b)
prod := &Producer{rd: rd, cl: r.(io.Closer)}
prod.Type = "ADTS producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
Codecs: []*core.Codec{ADTSToCodec(b)},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "adts",
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
func (c *Producer) Start() error {
@@ -66,8 +69,3 @@ func (c *Producer) Start() error {
c.Receivers[0].WriteRTP(pkt)
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.cl.Close()
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/pion/rtp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener

View File

@@ -65,11 +65,16 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Bubble active producer",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
info := &core.Connection{
ID: core.ID(c),
FormatName: "bubble",
Protocol: "http",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
return json.Marshal(info)
}

View File

@@ -46,7 +46,7 @@ func FFmpegCodecName(name string) string {
case CodecH264:
return "h264"
case CodecH265:
return "h265"
return "hevc"
case CodecJPEG:
return "mjpeg"
case CodecRAW:

139
pkg/core/connection.go Normal file
View File

@@ -0,0 +1,139 @@
package core
import (
"io"
"net/http"
"reflect"
"sync/atomic"
)
func NewID() uint32 {
return id.Add(1)
}
// Deprecated: use NewID instead
func ID(v any) uint32 {
p := uintptr(reflect.ValueOf(v).UnsafePointer())
return 0x8000_0000 | uint32(p)
}
var id atomic.Uint32
type Info interface {
SetProtocol(string)
SetRemoteAddr(string)
SetSource(string)
SetURL(string)
WithRequest(*http.Request)
}
// Connection just like webrtc.PeerConnection
// - ID and RemoteAddr used for building Connection(s) graph
// - FormatName, Protocol, RemoteAddr, Source, URL, SDP, UserAgent used for info about Connection
// - FormatName and Protocol has FFmpeg compatible names
// - Transport used for auto closing on Stop
type Connection struct {
ID uint32 `json:"id,omitempty"`
FormatName string `json:"format_name,omitempty"` // rtsp, webrtc, mp4, mjpeg, mpjpeg...
Protocol string `json:"protocol,omitempty"` // tcp, udp, http, ws, pipe...
RemoteAddr string `json:"remote_addr,omitempty"` // host:port other info
Source string `json:"source,omitempty"`
URL string `json:"url,omitempty"`
SDP string `json:"sdp,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Recv int `json:"bytes_recv,omitempty"`
Send int `json:"bytes_send,omitempty"`
Transport any `json:"-"`
}
func (c *Connection) GetMedias() []*Media {
return c.Medias
}
func (c *Connection) GetTrack(media *Media, codec *Codec) (*Receiver, error) {
for _, receiver := range c.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := NewReceiver(media, codec)
c.Receivers = append(c.Receivers, receiver)
return receiver, nil
}
func (c *Connection) Stop() error {
for _, receiver := range c.Receivers {
receiver.Close()
}
for _, sender := range c.Senders {
sender.Close()
}
if closer, ok := c.Transport.(io.Closer); ok {
return closer.Close()
}
return nil
}
// Deprecated:
func (c *Connection) Codecs() []*Codec {
codecs := make([]*Codec, len(c.Senders))
for i, sender := range c.Senders {
codecs[i] = sender.Codec
}
return codecs
}
func (c *Connection) SetProtocol(s string) {
c.Protocol = s
}
func (c *Connection) SetRemoteAddr(s string) {
if c.RemoteAddr == "" {
c.RemoteAddr = s
} else {
c.RemoteAddr += " forward " + c.RemoteAddr
}
}
func (c *Connection) SetSource(s string) {
c.Source = s
}
func (c *Connection) SetURL(s string) {
c.URL = s
}
func (c *Connection) WithRequest(r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
c.Protocol = "ws"
} else {
c.Protocol = "http"
}
c.RemoteAddr = r.RemoteAddr
if remote := r.Header.Get("X-Forwarded-For"); remote != "" {
c.RemoteAddr += " forwarded " + remote
}
c.UserAgent = r.UserAgent()
}
// Create like os.Create, init Consumer with existing Transport
func Create(w io.Writer) (*Connection, error) {
return &Connection{Transport: w}, nil
}
// Open like os.Open, init Producer from existing Transport
func Open(r io.Reader) (*Connection, error) {
return &Connection{Transport: r}, nil
}
// Dial like net.Dial, init Producer via Dialing
func Dial(rawURL string) (*Connection, error) {
return &Connection{}, nil
}

View File

@@ -1,5 +1,7 @@
package core
import "encoding/json"
const (
DirectionRecvonly = "recvonly"
DirectionSendonly = "sendonly"
@@ -90,89 +92,6 @@ func (m Mode) String() string {
return "unknown"
}
type Info struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Recv int `json:"recv,omitempty"`
Send int `json:"send,omitempty"`
}
const (
UnsupportedCodec = "unsupported codec"
WrongMediaDirection = "wrong media direction"
)
type SuperProducer struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Recv int `json:"recv,omitempty"`
}
func (s *SuperProducer) GetMedias() []*Media {
return s.Medias
}
func (s *SuperProducer) GetTrack(media *Media, codec *Codec) (*Receiver, error) {
for _, receiver := range s.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := NewReceiver(media, codec)
s.Receivers = append(s.Receivers, receiver)
return receiver, nil
}
func (s *SuperProducer) Close() error {
for _, receiver := range s.Receivers {
receiver.Close()
}
return nil
}
type SuperConsumer struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Send int `json:"send,omitempty"`
}
func (s *SuperConsumer) GetMedias() []*Media {
return s.Medias
}
func (s *SuperConsumer) AddTrack(media *Media, codec *Codec, track *Receiver) error {
return nil
}
//func (b *SuperConsumer) WriteTo(w io.Writer) (n int64, err error) {
// return 0, nil
//}
func (s *SuperConsumer) Close() error {
for _, sender := range s.Senders {
sender.Close()
}
return nil
}
func (s *SuperConsumer) Codecs() []*Codec {
codecs := make([]*Codec, len(s.Senders))
for i, sender := range s.Senders {
codecs[i] = sender.Codec
}
return codecs
func (m Mode) MarshalJSON() ([]byte, error) {
return json.Marshal(m.String())
}

View File

@@ -92,7 +92,7 @@ func (m *Media) Equal(media *Media) bool {
func GetKind(name string) string {
switch name {
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG, CodecRAW:
return KindVideo
case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722, CodecMP3, CodecPCM, CodecPCML, CodecELD, CodecFLAC:
return KindAudio

View File

@@ -23,10 +23,11 @@ type Filter func(handler HandlerFunc) HandlerFunc
// Node - Receiver or Sender or Filter (transform)
type Node struct {
Codec *Codec `json:"codec"`
Input HandlerFunc `json:"-"`
Output HandlerFunc `json:"-"`
Codec *Codec
Input HandlerFunc
Output HandlerFunc
id uint32
childs []*Node
parent *Node

View File

@@ -1,6 +1,7 @@
package core
import (
"encoding/json"
"errors"
"github.com/pion/rtp"
@@ -22,7 +23,7 @@ type Receiver struct {
func NewReceiver(media *Media, codec *Codec) *Receiver {
r := &Receiver{
Node: Node{Codec: codec},
Node: Node{id: NewID(), Codec: codec},
Media: media,
}
r.Input = func(packet *Packet) {
@@ -91,7 +92,7 @@ func NewSender(media *Media, codec *Codec) *Sender {
buf := make(chan *Packet, bufSize)
s := &Sender{
Node: Node{Codec: codec},
Node: Node{id: NewID(), Codec: codec},
Media: media,
buf: buf,
}
@@ -171,3 +172,43 @@ func (s *Sender) Close() {
s.Node.Close()
}
func (r *Receiver) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Childs []uint32 `json:"childs,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
}{
ID: r.Node.id,
Codec: r.Node.Codec,
Bytes: r.Bytes,
Packets: r.Packets,
}
for _, child := range r.childs {
v.Childs = append(v.Childs, child.id)
}
return json.Marshal(v)
}
func (s *Sender) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Parent uint32 `json:"parent,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
Drops int `json:"drops,omitempty"`
}{
ID: s.Node.id,
Codec: s.Node.Codec,
Bytes: s.Bytes,
Packets: s.Packets,
Drops: s.Drops,
}
if s.parent != nil {
v.Parent = s.parent.id
}
return json.Marshal(v)
}

View File

@@ -8,16 +8,16 @@ import (
"github.com/pion/rtp"
)
type Consumer struct {
core.SuperConsumer
type Backchannel struct {
core.Connection
client *Client
}
func (c *Consumer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Consumer) Start() error {
func (c *Backchannel) Start() error {
if err := c.client.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
@@ -30,12 +30,7 @@ func (c *Consumer) Start() error {
}
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.client.Close()
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
func (c *Backchannel) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
if err := c.client.Talk(); err != nil {
return err
}

View File

@@ -8,17 +8,22 @@ func Dial(url string) (core.Producer, error) {
return nil, err
}
conn := core.Connection{
ID: core.NewID(),
FormatName: "dvrip",
Protocol: "tcp",
RemoteAddr: client.conn.RemoteAddr().String(),
Transport: client.conn,
}
if client.stream != "" {
prod := &Producer{client: client}
prod.Type = "DVRIP active producer"
prod := &Producer{Connection: conn, client: client}
if err := prod.probe(); err != nil {
return nil, err
}
return prod, nil
} else {
cons := &Consumer{client: client}
cons.Type = "DVRIP active consumer"
cons.Medias = []*core.Media{
conn.Medias = []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
@@ -29,6 +34,6 @@ func Dial(url string) (core.Producer, error) {
},
},
}
return cons, nil
return &Backchannel{Connection: conn, client: client}, nil
}
}

View File

@@ -15,7 +15,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
client *Client
@@ -92,10 +92,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
return c.client.Close()
}
func (c *Producer) probe() error {
if err := c.client.Play(); err != nil {
return err

View File

@@ -10,17 +10,13 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
}
func NewConsumer() *Consumer {
c := &Consumer{
wr: core.NewWriteBuffer(nil),
muxer: &Muxer{},
}
c.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
@@ -36,7 +32,17 @@ func NewConsumer() *Consumer {
},
},
}
return c
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "flv",
Medias: medias,
Transport: wr,
},
wr: wr,
muxer: &Muxer{},
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -86,8 +92,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
}
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -15,18 +15,24 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
video, audio *core.Receiver
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "flv",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err := prod.probe(); err != nil {
return nil, err
}
prod.Type = "FLV producer"
return prod, nil
}
@@ -57,7 +63,7 @@ const (
)
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
receiver, _ := c.SuperProducer.GetTrack(media, codec)
receiver, _ := c.Connection.GetTrack(media, codec)
if media.Kind == core.KindVideo {
c.video = receiver
} else {
@@ -117,11 +123,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
func (c *Producer) probe() error {
if err := c.readHeader(); err != nil {
return err

View File

@@ -8,11 +8,10 @@ import (
"net/url"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
func Dial(rawURL string) (core.Producer, error) {
func Dial(rawURL string) (*mpegts.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -32,7 +31,15 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, err
}
return mpegts.Open(r)
prod, err := mpegts.Open(r)
if err != nil {
return nil, err
}
prod.FormatName = "gopro"
prod.RemoteAddr = u.Host
return prod, nil
}
type listener struct {

View File

@@ -61,8 +61,10 @@ func NewClient(rawURL string) (*Client, error) {
}
conn := webrtc.NewConn(pc)
conn.Desc = "Hass"
conn.FormatName = "hass/webrtc"
conn.Mode = core.ModeActiveProducer
conn.Protocol = "ws"
conn.URL = rawURL
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
medias := []*core.Media{

View File

@@ -4,14 +4,19 @@ import (
"io"
"net/url"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
func OpenURL(u *url.URL, body io.ReadCloser) (core.Producer, error) {
func OpenURL(u *url.URL, body io.ReadCloser) (*mpegts.Producer, error) {
rd, err := NewReader(u, body)
if err != nil {
return nil, err
}
return mpegts.Open(rd)
prod, err := mpegts.Open(rd)
if err != nil {
return nil, err
}
prod.FormatName = "hls/mpegts"
prod.RemoteAddr = u.Host
return prod, nil
}

View File

@@ -16,7 +16,7 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
conn net.Conn
srtp *srtp.Server
@@ -29,28 +29,31 @@ type Consumer struct {
}
func NewConsumer(conn net.Conn, server *srtp.Server) *Consumer {
return &Consumer{
SuperConsumer: core.SuperConsumer{
Type: "HomeKit passive consumer",
RemoteAddr: conn.RemoteAddr().String(),
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecOpus},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecOpus},
},
},
}
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "homekit",
Protocol: "udp",
RemoteAddr: conn.RemoteAddr().String(),
Medias: medias,
Transport: conn,
},
conn: conn,
srtp: server,
}
@@ -175,11 +178,10 @@ func (c *Consumer) WriteTo(io.Writer) (int64, error) {
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
if c.deadline != nil {
c.deadline.Reset(0)
}
return c.SuperConsumer.Close()
return c.Connection.Stop()
}
func (c *Consumer) srtpEndpoint() *srtp.Endpoint {

View File

@@ -15,8 +15,9 @@ import (
"github.com/pion/rtp"
)
// Deprecated: rename to Producer
type Client struct {
core.SuperProducer
core.Connection
hap *hap.Client
srtp *srtp.Server
@@ -52,9 +53,12 @@ func Dial(rawURL string, server *srtp.Server) (*Client, error) {
}
client := &Client{
SuperProducer: core.SuperProducer{
Type: "HomeKit active producer",
URL: conn.URL(),
Connection: core.Connection{
ID: core.NewID(),
FormatName: "homekit",
Protocol: "udp",
Source: conn.URL(),
Transport: conn,
},
hap: conn,
srtp: server,
@@ -93,7 +97,6 @@ func (c *Client) GetMedias() []*core.Media {
return nil
}
c.URL = c.hap.URL()
c.SDP = fmt.Sprintf("%+v\n%+v", c.videoConfig, c.audioConfig)
c.Medias = []*core.Media{
@@ -175,8 +178,6 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
_ = c.SuperProducer.Close()
if c.videoSession != nil && c.videoSession.Remote != nil {
c.srtp.DelSession(c.videoSession)
}
@@ -184,7 +185,7 @@ func (c *Client) Stop() error {
c.srtp.DelSession(c.audioSession)
}
return c.hap.Close()
return c.Connection.Stop()
}
func (c *Client) trackByKind(kind string) *core.Receiver {

92
pkg/image/producer.go Normal file
View File

@@ -0,0 +1,92 @@
package image
import (
"errors"
"io"
"net/http"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
closed bool
res *http.Response
}
func Open(res *http.Response) (*Producer, error) {
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "image",
Protocol: "http",
RemoteAddr: res.Request.URL.Host,
Transport: res.Body,
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
},
},
res: res,
}, nil
}
func (c *Producer) Start() error {
body, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.Receivers[0].WriteRTP(pkt)
c.Recv += len(body)
req := c.res.Request
for !c.closed {
res, err := tcp.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("wrong status: " + res.Status)
}
body, err = io.ReadAll(res.Body)
if err != nil {
return err
}
c.Recv += len(body)
pkt = &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.Receivers[0].WriteRTP(pkt)
}
return nil
}
func (c *Producer) Stop() error {
c.closed = true
return c.Connection.Stop()
}

View File

@@ -2,6 +2,7 @@ package isapi
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
@@ -51,10 +52,15 @@ func (c *Client) Stop() (err error) {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "ISAPI active consumer",
Medias: c.medias,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "isapi",
Protocol: "http",
Medias: c.medias,
Send: c.send,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}

View File

@@ -11,6 +11,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener

View File

@@ -26,6 +26,7 @@ const (
StateHandle
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener

View File

@@ -2,6 +2,7 @@ package ivideon
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -32,11 +33,16 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Ivideon active producer",
URL: c.ID,
Medias: c.medias,
Recv: c.recv,
info := &core.Connection{
ID: core.ID(c),
FormatName: "ivideon",
Protocol: "ws",
URL: c.ID,
Medias: c.medias,
Recv: c.recv,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.receiver != nil {
info.Receivers = []*core.Receiver{c.receiver}

View File

@@ -12,13 +12,13 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
reader *bufio.Reader
@@ -65,11 +65,18 @@ func Dial(url string) (*Producer, error) {
rd.Reader = httputil.NewChunkedReader(buf)
}
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "kasa",
Protocol: "http",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err = prod.probe(); err != nil {
return nil, err
}
prod.Type = "Kasa producer"
return prod, nil
}
@@ -90,7 +97,7 @@ func (c *Producer) Start() error {
}
for {
header, body, err := multipart.Next(c.reader)
header, body, err := mpjpeg.Next(c.reader)
if err != nil {
return err
}
@@ -128,11 +135,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
const (
MimeVideo = "video/x-h264"
MimeG711U = "audio/g711u"
@@ -151,7 +153,7 @@ func (c *Producer) probe() error {
timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) {
header, body, err := multipart.Next(c.reader)
header, body, err := mpjpeg.Next(c.reader)
if err != nil {
return err
}

View File

@@ -13,7 +13,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
@@ -28,26 +28,35 @@ func Open(r io.Reader) (*Producer, error) {
buf = annexb.EncodeToAVCC(buf, false) // won't break original buffer
var codec *core.Codec
var format string
switch {
case h264.NALUType(buf) == h264.NALUTypeSPS:
codec = h264.AVCCToCodec(buf)
format = "h264"
case h265.NALUType(buf) == h265.NALUTypeVPS:
codec = h265.AVCCToCodec(buf)
format = "hevc"
default:
return nil, errors.New("bitstream: unsupported header: " + hex.EncodeToString(buf[:8]))
}
prod := &Producer{rd: rd}
prod.Type = "Bitstream producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: format,
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
func (c *Producer) Start() error {
@@ -84,8 +93,3 @@ func (c *Producer) Start() error {
}
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}

View File

@@ -12,26 +12,32 @@ import (
)
type Keyframe struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
// Deprecated: should be rewritten
func NewKeyframe() *Keyframe {
return &Keyframe{
core.SuperConsumer{
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecH264},
{Name: core.CodecH265},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecH264},
{Name: core.CodecH265},
},
},
core.NewWriteBuffer(nil),
}
wr := core.NewWriteBuffer(nil)
return &Keyframe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "keyframe",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
@@ -98,8 +104,3 @@ func (k *Keyframe) CodecName() string {
func (k *Keyframe) WriteTo(wr io.Writer) (int64, error) {
return k.wr.WriteTo(wr)
}
func (k *Keyframe) Stop() error {
_ = k.SuperConsumer.Close()
return k.wr.Close()
}

View File

@@ -9,14 +9,12 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod.Type = "MJPEG producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
@@ -29,7 +27,15 @@ func Open(rd io.Reader) (*Producer, error) {
},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mjpeg",
Medias: medias,
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}, nil
}
func (c *Producer) Start() error {
@@ -70,8 +76,3 @@ func (c *Producer) Start() error {
buf = buf[i:]
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}

View File

@@ -13,7 +13,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/magic/bitstream"
"github.com/AlexxIT/go2rtc/pkg/magic/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/wav"
"github.com/AlexxIT/go2rtc/pkg/y4m"
)
@@ -26,29 +26,31 @@ func Open(r io.Reader) (core.Producer, error) {
return nil, err
}
switch {
case string(b) == annexb.StartCode:
switch string(b) {
case annexb.StartCode:
return bitstream.Open(rd)
case string(b) == wav.FourCC:
case wav.FourCC:
return wav.Open(rd)
case string(b) == y4m.FourCC:
case y4m.FourCC:
return y4m.Open(rd)
}
case bytes.HasPrefix(b, []byte{0xFF, 0xD8}):
return mjpeg.Open(rd)
case bytes.HasPrefix(b, []byte(flv.Signature)):
switch string(b[:3]) {
case flv.Signature:
return flv.Open(rd)
}
case bytes.HasPrefix(b, []byte("--")):
return multipart.Open(rd)
case b[0] == 0xFF && (b[1] == 0xF1 || b[1] == 0xF9):
switch string(b[:2]) {
case "\xFF\xD8":
return mjpeg.Open(rd)
case "\xFF\xF1", "\xFF\xF9":
return aac.Open(rd)
case "--":
return mpjpeg.Open(rd)
}
case b[0] == mpegts.SyncByte:
switch b[0] {
case mpegts.SyncByte:
return mpegts.Open(rd)
}

View File

@@ -1,75 +0,0 @@
package mjpeg
import (
"errors"
"io"
"net/http"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Client struct {
core.Listener
UserAgent string
RemoteAddr string
closed bool
res *http.Response
medias []*core.Media
receiver *core.Receiver
recv int
}
func NewClient(res *http.Response) *Client {
return &Client{res: res}
}
func (c *Client) Handle() error {
body, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.receiver.WriteRTP(pkt)
c.recv += len(body)
req := c.res.Request
for !c.closed {
res, err := tcp.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("wrong status: " + res.Status)
}
body, err = io.ReadAll(res.Body)
if err != nil {
return err
}
c.recv += len(body)
if c.receiver != nil {
pkt = &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.receiver.WriteRTP(pkt)
}
}
return nil
}

View File

@@ -8,26 +8,30 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
return &Consumer{
core.SuperConsumer{
Type: "MJPEG passive consumer",
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecRAW},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecRAW},
},
},
core.NewWriteBuffer(nil),
}
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mjpeg",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
@@ -53,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -1,61 +0,0 @@
package mjpeg
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
if c.medias == nil {
c.medias = []*core.Media{{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
}}
}
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
}
func (c *Client) Start() error {
// https://github.com/AlexxIT/go2rtc/issues/278
return c.Handle()
}
func (c *Client) Stop() error {
if c.receiver != nil {
c.receiver.Close()
}
// important for close reader/writer gorutines
_ = c.res.Body.Close()
c.closed = true
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "JPEG active producer",
URL: c.res.Request.URL.String(),
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Medias: c.medias,
Recv: c.recv,
}
if c.receiver != nil {
info.Receivers = []*core.Receiver{c.receiver}
}
return json.Marshal(info)
}

View File

@@ -14,7 +14,7 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
mu sync.Mutex
@@ -47,12 +47,17 @@ func NewConsumer(medias []*core.Media) *Consumer {
}
}
cons := &Consumer{
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mp4",
Medias: medias,
Transport: wr,
},
muxer: &Muxer{},
wr: core.NewWriteBuffer(nil),
wr: wr,
}
cons.Medias = medias
return cons
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
@@ -182,8 +187,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -10,11 +10,12 @@ import (
)
type Keyframe struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
}
// Deprecated: should be rewritten
func NewKeyframe(medias []*core.Media) *Keyframe {
if medias == nil {
medias = []*core.Media{
@@ -29,9 +30,15 @@ func NewKeyframe(medias []*core.Media) *Keyframe {
}
}
wr := core.NewWriteBuffer(nil)
cons := &Keyframe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mp4",
Transport: wr,
},
muxer: &Muxer{},
wr: core.NewWriteBuffer(nil),
wr: wr,
}
cons.Medias = medias
return cons
@@ -95,8 +102,3 @@ func (c *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
func (c *Keyframe) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Keyframe) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -11,17 +11,13 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
muxer *Muxer
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
c := &Consumer{
muxer: NewMuxer(),
wr: core.NewWriteBuffer(nil),
}
c.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
@@ -38,7 +34,17 @@ func NewConsumer() *Consumer {
},
},
}
return c
wr := core.NewWriteBuffer(nil)
return &Consumer{
core.Connection{
ID: core.NewID(),
FormatName: "mpegts",
Medias: medias,
Transport: wr,
},
NewMuxer(),
wr,
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -110,14 +116,9 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
if codec.ClockRate == ClockRate {
return
}
rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
}
//func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
// if codec.ClockRate == ClockRate {
// return
// }
// rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
//}

View File

@@ -13,12 +13,19 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mpegts",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err := prod.probe(); err != nil {
return nil, err
}
@@ -26,7 +33,7 @@ func Open(rd io.Reader) (*Producer, error) {
}
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
receiver, _ := c.SuperProducer.GetTrack(media, codec)
receiver, _ := c.Connection.GetTrack(media, codec)
receiver.ID = StreamType(codec)
return receiver, nil
}
@@ -40,6 +47,8 @@ func (c *Producer) Start() error {
return err
}
c.Recv += len(pkt.Payload)
//log.Printf("[mpegts] size: %6d, muxer: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType)
for _, receiver := range c.Receivers {
@@ -52,11 +61,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
func (c *Producer) probe() error {
c.rd.BufferSize = core.ProbeSize
defer c.rd.Reset()

View File

@@ -1,4 +1,4 @@
package multipart
package mpjpeg
import (
"bufio"

65
pkg/mpjpeg/producer.go Normal file
View File

@@ -0,0 +1,65 @@
package mpjpeg
import (
"bufio"
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
rd *bufio.Reader
}
func Open(rd io.Reader) (*Producer, error) {
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mpjpeg", // Multipart JPEG
Transport: rd,
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
},
},
}, nil
}
func (c *Producer) Start() error {
if len(c.Receivers) != 1 {
return errors.New("mjpeg: no receivers")
}
rd := bufio.NewReader(c.Transport.(io.Reader))
mjpeg := c.Receivers[0]
for {
_, body, err := Next(rd)
if err != nil {
return err
}
c.Recv += len(body)
if mjpeg != nil {
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
mjpeg.WriteRTP(packet)
}
}
}

View File

@@ -1,68 +0,0 @@
package multipart
import (
"bufio"
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.SuperProducer
closer io.Closer
reader *bufio.Reader
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{
closer: rd.(io.Closer),
reader: bufio.NewReader(rd),
}
prod.Medias = []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
}
prod.Type = "Multipart producer"
return prod, nil
}
func (c *Producer) Start() error {
if len(c.Receivers) != 1 {
return errors.New("mjpeg: no receivers")
}
mjpeg := c.Receivers[0]
for {
_, body, err := Next(c.reader)
if err != nil {
return err
}
c.Recv += len(body)
if mjpeg != nil {
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
mjpeg.WriteRTP(packet)
}
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.closer.Close()
}

View File

@@ -48,8 +48,10 @@ func Dial(rawURL string) (*Client, error) {
}
conn := webrtc.NewConn(pc)
conn.Desc = "Nest"
conn.FormatName = "nest/webrtc"
conn.Mode = core.ModeActiveProducer
conn.Protocol = "http"
conn.URL = rawURL
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
medias := []*core.Media{

View File

@@ -8,17 +8,11 @@ import (
)
type Probe struct {
Type string `json:"type,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Medias []*core.Media `json:"medias,omitempty"`
Receivers []*core.Receiver `json:"receivers,omitempty"`
Senders []*core.Sender `json:"senders,omitempty"`
core.Connection
}
func NewProbe(query url.Values) *Probe {
c := &Probe{Type: "probe"}
c.Medias = core.ParseQuery(query)
medias := core.ParseQuery(query)
for _, value := range query["microphone"] {
media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly}
@@ -32,10 +26,16 @@ func NewProbe(query url.Values) *Probe {
media.Codecs = append(media.Codecs, &core.Codec{Name: name})
}
c.Medias = append(c.Medias, media)
medias = append(medias, media)
}
return c
return &Probe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "probe",
Medias: medias,
},
}
}
func (p *Probe) GetMedias() []*core.Media {

View File

@@ -18,6 +18,7 @@ import (
pion "github.com/pion/webrtc/v3"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
@@ -110,8 +111,10 @@ func (c *Client) Connect() error {
var sendOffer sync.WaitGroup
c.conn = webrtc.NewConn(pc)
c.conn.Desc = "Roborock"
c.conn.FormatName = "roborock"
c.conn.Mode = core.ModeActiveProducer
c.conn.Protocol = "mqtt"
c.conn.URL = c.url
c.conn.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:

View File

@@ -8,10 +8,11 @@ import (
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func DialPlay(rawURL string) (core.Producer, error) {
func DialPlay(rawURL string) (*flv.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -22,16 +23,16 @@ func DialPlay(rawURL string) (core.Producer, error) {
return nil, err
}
rtmpConn, err := NewClient(conn, u)
client, err := NewClient(conn, u)
if err != nil {
return nil, err
}
if err = rtmpConn.play(); err != nil {
if err = client.play(); err != nil {
return nil, err
}
return rtmpConn.Producer()
return client.Producer()
}
func DialPublish(rawURL string) (io.Writer, error) {

View File

@@ -1,11 +1,10 @@
package rtmp
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
)
func (c *Conn) Producer() (core.Producer, error) {
func (c *Conn) Producer() (*flv.Producer, error) {
c.rdBuf = []byte{
'F', 'L', 'V', // signature
1, // version
@@ -13,7 +12,17 @@ func (c *Conn) Producer() (core.Producer, error) {
0, 0, 0, 9, // header size
}
return flv.Open(c)
prod, err := flv.Open(c)
if err != nil {
return nil, err
}
prod.FormatName = "rtmp"
prod.Protocol = "rtmp"
prod.RemoteAddr = c.conn.RemoteAddr().String()
prod.URL = c.url
return prod, nil
}
// Read - convert RTMP to FLV format

View File

@@ -20,7 +20,13 @@ import (
var Timeout = time.Second * 5
func NewClient(uri string) *Conn {
return &Conn{uri: uri}
return &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "rtsp",
},
uri: uri,
}
}
func (c *Conn) Dial() (err error) {
@@ -36,8 +42,10 @@ func (c *Conn) Dial() (err error) {
timeout = time.Second * time.Duration(c.Timeout)
}
conn, err = tcp.Dial(c.URL, timeout)
c.Protocol = "rtsp+tcp"
} else {
conn, err = websocket.Dial(c.Transport)
c.Protocol = "ws"
}
if err != nil {
return
@@ -53,6 +61,10 @@ func (c *Conn) Dial() (err error) {
c.sequence = 0
c.state = StateConn
c.Connection.RemoteAddr = conn.RemoteAddr().String()
c.Connection.Transport = conn
c.Connection.URL = c.uri
return nil
}
@@ -143,7 +155,7 @@ func (c *Conn) Describe() error {
}
}
c.sdp = string(res.Body) // for info
c.SDP = string(res.Body) // for info
medias, err := UnmarshalSDP(res.Body)
if err != nil {

View File

@@ -18,6 +18,7 @@ import (
)
type Conn struct {
core.Connection
core.Listener
// public
@@ -30,9 +31,7 @@ type Conn struct {
Timeout int
Transport string // custom transport support, ex. RTSP over WebSocket
Medias []*core.Media
UserAgent string
URL *url.URL
URL *url.URL
// internal
@@ -44,19 +43,10 @@ type Conn struct {
reader *bufio.Reader
sequence int
session string
sdp string
uri string
state State
stateMu sync.Mutex
receivers []*core.Receiver
senders []*core.Sender
// stats
recv int
send int
}
const (
@@ -114,7 +104,7 @@ func (c *Conn) Handle() (err error) {
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
if len(c.receivers) == 0 {
if len(c.Receivers) == 0 {
// if we only send audio to camera
// https://github.com/AlexxIT/go2rtc/issues/659
timeout += keepaliveDT
@@ -239,7 +229,7 @@ func (c *Conn) Handle() (err error) {
return
}
c.recv += int(size)
c.Recv += int(size)
if channelID&1 == 0 {
packet := &rtp.Packet{}
@@ -247,7 +237,7 @@ func (c *Conn) Handle() (err error) {
return
}
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
if receiver.ID == channelID {
receiver.WriteRTP(packet)
break

View File

@@ -18,15 +18,6 @@ func (c *Conn) GetMedias() []*core.Media {
}
func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) (err error) {
core.Assert(media.Direction == core.DirectionSendonly)
for _, sender := range c.senders {
if sender.Codec == codec {
sender.HandleRTP(track)
return
}
}
var channel byte
switch c.mode {
@@ -47,12 +38,12 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
c.state = StateSetup
case core.ModePassiveConsumer:
channel = byte(len(c.senders)) * 2
channel = byte(len(c.Senders)) * 2
// for consumer is better to use original track codec
codec = track.Codec.Clone()
// generate new payload type, starting from 96
codec.PayloadType = byte(96 + len(c.senders))
codec.PayloadType = byte(96 + len(c.Senders))
default:
panic(core.Caller())
@@ -70,7 +61,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
sender.HandleRTP(track)
c.senders = append(c.senders, sender)
c.Senders = append(c.Senders, sender)
return nil
}
@@ -99,7 +90,7 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.
}
//log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf))
if _, err := c.conn.Write(buf[:n]); err == nil {
c.send += n
c.Send += n
}
n = 0
}

View File

@@ -10,7 +10,7 @@ import (
func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
core.Assert(media.Direction == core.DirectionRecvonly)
for _, track := range c.receivers {
for _, track := range c.Receivers {
if track.Codec == codec {
return track, nil
}
@@ -34,7 +34,7 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e
track := core.NewReceiver(media, codec)
track.ID = channel
c.receivers = append(c.receivers, track)
c.Receivers = append(c.Receivers, track)
return track, nil
}
@@ -81,10 +81,10 @@ func (c *Conn) Start() (err error) {
}
func (c *Conn) Stop() (err error) {
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
receiver.Close()
}
for _, sender := range c.senders {
for _, sender := range c.Senders {
sender.Close()
}
@@ -99,25 +99,7 @@ func (c *Conn) Stop() (err error) {
}
func (c *Conn) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "RTSP " + c.mode.String(),
SDP: c.sdp,
UserAgent: c.UserAgent,
Medias: c.Medias,
Receivers: c.receivers,
Senders: c.senders,
Recv: c.recv,
Send: c.send,
}
if c.URL != nil {
info.URL = c.URL.String()
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
return json.Marshal(info)
return json.Marshal(c.Connection)
}
func (c *Conn) Reconnect() error {
@@ -135,12 +117,12 @@ func (c *Conn) Reconnect() error {
}
// restore previous medias
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
if _, err := c.SetupMedia(receiver.Media); err != nil {
return err
}
}
for _, sender := range c.senders {
for _, sender := range c.Senders {
if _, err := c.SetupMedia(sender.Media); err != nil {
return err
}

View File

@@ -14,10 +14,16 @@ import (
)
func NewServer(conn net.Conn) *Conn {
c := new(Conn)
c.conn = conn
c.reader = bufio.NewReader(conn)
return c
return &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "rtsp",
Protocol: "rtsp+tcp",
RemoteAddr: conn.RemoteAddr().String(),
},
conn: conn,
reader: bufio.NewReader(conn),
}
}
func (c *Conn) Auth(username, password string) {
@@ -70,7 +76,7 @@ func (c *Conn) Accept() error {
return errors.New("wrong content type")
}
c.sdp = string(req.Body) // for info
c.SDP = string(req.Body) // for info
c.Medias, err = UnmarshalSDP(req.Body)
if err != nil {
@@ -81,7 +87,7 @@ func (c *Conn) Accept() error {
for i, media := range c.Medias {
track := core.NewReceiver(media, media.Codecs[0])
track.ID = byte(i * 2)
c.receivers = append(c.receivers, track)
c.Receivers = append(c.Receivers, track)
}
c.mode = core.ModePassiveProducer
@@ -96,7 +102,7 @@ func (c *Conn) Accept() error {
c.mode = core.ModePassiveConsumer
c.Fire(MethodDescribe)
if c.senders == nil {
if c.Senders == nil {
res := &tcp.Response{
Status: "404 Not Found",
Request: req,
@@ -113,7 +119,7 @@ func (c *Conn) Accept() error {
// convert tracks to real output medias medias
var medias []*core.Media
for i, track := range c.senders {
for i, track := range c.Senders {
media := &core.Media{
Kind: core.GetKind(track.Codec.Name),
Direction: core.DirectionRecvonly,
@@ -128,7 +134,7 @@ func (c *Conn) Accept() error {
return err
}
c.sdp = string(res.Body) // for info
c.SDP = string(res.Body) // for info
if err = c.WriteResponse(res); err != nil {
return err
@@ -148,9 +154,9 @@ func (c *Conn) Accept() error {
c.state = StateSetup
if c.mode == core.ModePassiveConsumer {
if i := reqTrackID(req); i >= 0 && i < len(c.senders) {
if i := reqTrackID(req); i >= 0 && i < len(c.Senders) {
// mark sender as SETUP
c.senders[i].Media.ID = MethodSetup
c.Senders[i].Media.ID = MethodSetup
tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1)
res.Header.Set("Transport", tr)
} else {
@@ -170,7 +176,7 @@ func (c *Conn) Accept() error {
case MethodRecord, MethodPlay:
if c.mode == core.ModePassiveConsumer {
// stop unconfigured senders
for _, track := range c.senders {
for _, track := range c.Senders {
if track.Media.ID != MethodSetup {
track.Close()
}

View File

@@ -49,10 +49,12 @@ func (c *Client) Stop() (err error) {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Exec active consumer",
Medias: c.medias,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "exec",
Protocol: "pipe",
Medias: c.medias,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}

View File

@@ -6,6 +6,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *exec.Cmd

View File

@@ -23,6 +23,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener

View File

@@ -2,6 +2,7 @@ package tapo
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
@@ -74,15 +75,20 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Tapo active producer",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "tapo",
Protocol: "http",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
}
if c.conn1 != nil {
info.RemoteAddr = c.conn1.RemoteAddr().String()
}
return json.Marshal(info)
}

View File

@@ -1,12 +0,0 @@
package tcp
import (
"net/http"
)
func RemoteAddr(r *http.Request) string {
if remote := r.Header.Get("X-Forwarded-For"); remote != "" {
return remote + ", " + r.RemoteAddr
}
return r.RemoteAddr
}

View File

@@ -54,22 +54,27 @@ func Open(r io.Reader) (*Producer, error) {
return nil, errors.New("waw: unsupported codec")
}
prod := &Producer{rd: rd, cl: r.(io.Closer)}
prod.Type = "WAV producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wav",
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
type Producer struct {
core.SuperProducer
core.Connection
rd *bufio.Reader
cl io.Closer
}
func (c *Producer) Start() error {
@@ -106,11 +111,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.cl.Close()
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {

View File

@@ -71,7 +71,7 @@ func (c *Conn) SetAnswer(answer string) (err error) {
return
}
c.medias = UnmarshalMedias(sd.MediaDescriptions)
c.Medias = UnmarshalMedias(sd.MediaDescriptions)
return nil
}

View File

@@ -1,6 +1,9 @@
package webrtc
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -10,28 +13,25 @@ import (
)
type Conn struct {
core.Connection
core.Listener
UserAgent string
Desc string
Mode core.Mode
Mode core.Mode `json:"mode"`
pc *webrtc.PeerConnection
medias []*core.Media
receivers []*core.Receiver
senders []*core.Sender
recv int
send int
offer string
remote string
closed core.Waiter
}
func NewConn(pc *webrtc.PeerConnection) *Conn {
c := &Conn{pc: pc}
c := &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "webrtc",
},
pc: pc,
}
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
// last candidate will be empty
@@ -50,7 +50,15 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
}
pc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange(
func(pair *webrtc.ICECandidatePair) {
c.remote = pair.Remote.String()
c.Protocol += "+" + pair.Remote.Protocol.String()
c.RemoteAddr = fmt.Sprintf(
"%s:%d %s", sanitizeIP6(pair.Remote.Address), pair.Remote.Port, pair.Remote.Typ,
)
if pair.Remote.RelatedAddress != "" {
c.RemoteAddr += fmt.Sprintf(
" %s:%d", sanitizeIP6(pair.Remote.RelatedAddress), pair.Remote.RelatedPort,
)
}
},
)
})
@@ -92,7 +100,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
return
}
c.recv += n
c.Recv += n
packet := &rtp.Packet{}
if err := packet.Unmarshal(b[:n]); err != nil {
@@ -121,7 +129,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
switch state {
case webrtc.PeerConnectionStateConnected:
for _, sender := range c.senders {
for _, sender := range c.Senders {
sender.Start()
}
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
@@ -134,6 +142,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
return c
}
func (c *Conn) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Connection)
}
func (c *Conn) Close() error {
c.closed.Done(nil)
return c.pc.Close()
@@ -172,7 +184,7 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod
}
// search Media for this MID
for _, media := range c.medias {
for _, media := range c.Medias {
if media.ID != tr.Mid() || media.Direction != core.DirectionRecvonly {
continue
}
@@ -194,3 +206,10 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod
return nil, nil
}
func sanitizeIP6(host string) string {
if strings.IndexByte(host, ':') > 0 {
return "[" + host + "]"
}
return host
}

View File

@@ -1,7 +1,6 @@
package webrtc
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -12,13 +11,13 @@ import (
)
func (c *Conn) GetMedias() []*core.Media {
return WithResampling(c.medias)
return WithResampling(c.Medias)
}
func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
core.Assert(media.Direction == core.DirectionSendonly)
for _, sender := range c.senders {
for _, sender := range c.Senders {
if sender.Codec == codec {
sender.Bind(track)
return nil
@@ -42,7 +41,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) {
c.send += packet.MarshalSize()
c.Send += packet.MarshalSize()
//important to send with remote PayloadType
_ = localTrack.WriteRTP(payloadType, packet)
}
@@ -85,20 +84,6 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
sender.HandleRTP(track)
}
c.senders = append(c.senders, sender)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *Conn) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: c.Desc + " " + c.Mode.String(),
RemoteAddr: c.remote,
UserAgent: c.UserAgent,
Medias: c.medias,
Receivers: c.receivers,
Senders: c.senders,
Recv: c.recv,
Send: c.send,
}
return json.Marshal(info)
}

View File

@@ -8,7 +8,7 @@ import (
func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
core.Assert(media.Direction == core.DirectionRecvonly)
for _, track := range c.receivers {
for _, track := range c.Receivers {
if track.Codec == codec {
return track, nil
}
@@ -39,7 +39,7 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e
}
track := core.NewReceiver(media, codec)
c.receivers = append(c.receivers, track)
c.Receivers = append(c.Receivers, track)
return track, nil
}
@@ -47,13 +47,3 @@ func (c *Conn) Start() error {
c.closed.Wait()
return nil
}
func (c *Conn) Stop() error {
for _, receiver := range c.receivers {
receiver.Close()
}
for _, sender := range c.senders {
sender.Close()
}
return c.pc.Close()
}

View File

@@ -42,7 +42,7 @@ func (c *Conn) SetOffer(offer string) (err error) {
}
}
c.medias = UnmarshalMedias(sd.MediaDescriptions)
c.Medias = UnmarshalMedias(sd.MediaDescriptions)
return
}
@@ -57,7 +57,7 @@ func (c *Conn) GetAnswer() (answer string, err error) {
// disable transceivers if we don't have track, make direction=inactive
transeivers:
for _, tr := range c.pc.GetTransceivers() {
for _, sender := range c.senders {
for _, sender := range c.Senders {
if sender.Media.ID == tr.Mid() {
continue transeivers
}

View File

@@ -3,19 +3,21 @@ package webtorrent
import (
"encoding/base64"
"fmt"
"strconv"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/gorilla/websocket"
pion "github.com/pion/webrtc/v3"
"strconv"
"time"
)
func NewClient(tracker, share, pwd string, pc *pion.PeerConnection) (*webrtc.Conn, error) {
// 1. Create WebRTC producer
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WebTorrent sync"
prod.FormatName = "webtorrent"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
medias := []*core.Media{
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},

View File

@@ -9,14 +9,17 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
wr := core.NewWriteBuffer(nil)
return &Consumer{
core.SuperConsumer{
Type: "YUV4MPEG2 passive consumer",
core.Connection{
ID: core.NewID(),
Transport: wr,
FormatName: "yuv4mpegpipe",
Medias: []*core.Media{
{
Kind: core.KindVideo,
@@ -27,7 +30,7 @@ func NewConsumer() *Consumer {
},
},
},
core.NewWriteBuffer(nil),
wr,
}
}
@@ -60,8 +63,3 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}

View File

@@ -2,7 +2,6 @@ package y4m
import (
"bufio"
"bytes"
"errors"
"io"
@@ -19,41 +18,13 @@ func Open(r io.Reader) (*Producer, error) {
b = b[:len(b)-1] // remove \n
sdp := string(b)
var fmtp string
for b != nil {
// YUV4MPEG2 W1280 H720 F24:1 Ip A1:1 C420mpeg2 XYSCSS=420MPEG2
// https://manned.org/yuv4mpeg.5
// https://github.com/FFmpeg/FFmpeg/blob/master/libavformat/yuv4mpegenc.c
key := b[0]
var value string
if i := bytes.IndexByte(b, ' '); i > 0 {
value = string(b[1:i])
b = b[i+1:]
} else {
value = string(b[1:])
b = nil
}
switch key {
case 'W':
fmtp = "width=" + value
case 'H':
fmtp += ";height=" + value
case 'C':
fmtp += ";colorspace=" + value
}
}
fmtp := ParseHeader(b)
if GetSize(fmtp) == 0 {
return nil, errors.New("y4m: unsupported format: " + sdp)
return nil, errors.New("y4m: unsupported format: " + string(b))
}
prod := &Producer{rd: rd, cl: r.(io.Closer)}
prod.Type = "YUV4MPEG2 producer"
prod.SDP = sdp
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
@@ -67,14 +38,21 @@ func Open(r io.Reader) (*Producer, error) {
},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "yuv4mpegpipe",
Medias: medias,
SDP: string(b),
Transport: r,
},
rd: rd,
}, nil
}
type Producer struct {
core.SuperProducer
core.Connection
rd *bufio.Reader
cl io.Closer
}
func (c *Producer) Start() error {
@@ -103,8 +81,3 @@ func (c *Producer) Start() error {
c.Receivers[0].WriteRTP(pkt)
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.cl.Close()
}

View File

@@ -1,6 +1,7 @@
package y4m
import (
"bytes"
"image"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -10,6 +11,34 @@ const FourCC = "YUV4"
const frameHdr = "FRAME\n"
func ParseHeader(b []byte) (fmtp string) {
for b != nil {
// YUV4MPEG2 W1280 H720 F24:1 Ip A1:1 C420mpeg2 XYSCSS=420MPEG2
// https://manned.org/yuv4mpeg.5
// https://github.com/FFmpeg/FFmpeg/blob/master/libavformat/yuv4mpegenc.c
key := b[0]
var value string
if i := bytes.IndexByte(b, ' '); i > 0 {
value = string(b[1:i])
b = b[i+1:]
} else {
value = string(b[1:])
b = nil
}
switch key {
case 'W':
fmtp = "width=" + value
case 'H':
fmtp += ";height=" + value
case 'C':
fmtp += ";colorspace=" + value
}
}
return
}
func GetSize(fmtp string) int {
w := core.Atoi(core.Between(fmtp, "width=", ";"))
h := core.Atoi(core.Between(fmtp, "height=", ";"))