diff --git a/cmd/api/api.go b/cmd/api/api.go index 0f0a162d..cb4f1215 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -3,7 +3,6 @@ package api import ( "encoding/json" "github.com/AlexxIT/go2rtc/cmd/app" - "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/rs/zerolog" "net" "net/http" @@ -41,7 +40,6 @@ func Init() { HandleFunc("api", apiHandler) HandleFunc("api/config", configHandler) HandleFunc("api/exit", exitHandler) - HandleFunc("api/streams", streamsHandler) HandleFunc("api/ws", apiWS) // ensure we can listen without errors @@ -124,32 +122,3 @@ func exitHandler(w http.ResponseWriter, r *http.Request) { code, _ := strconv.Atoi(s) os.Exit(code) } - -func streamsHandler(w http.ResponseWriter, r *http.Request) { - src := r.URL.Query().Get("src") - name := r.URL.Query().Get("name") - - if name == "" { - name = src - } - - switch r.Method { - case "PUT": - streams.New(name, src) - return - case "DELETE": - streams.Delete(src) - return - } - - var v interface{} - if src != "" { - v = streams.Get(src) - } else { - v = streams.All() - } - - e := json.NewEncoder(w) - e.SetIndent("", " ") - _ = e.Encode(v) -} diff --git a/cmd/mjpeg/mjpeg.go b/cmd/mjpeg/mjpeg.go index fe68e17d..fa9b7fdf 100644 --- a/cmd/mjpeg/mjpeg.go +++ b/cmd/mjpeg/mjpeg.go @@ -27,7 +27,10 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { exit := make(chan []byte) - cons := &mjpeg.Consumer{} + cons := &mjpeg.Consumer{ + RemoteAddr: r.RemoteAddr, + UserAgent: r.UserAgent(), + } cons.Listen(func(msg interface{}) { switch msg := msg.(type) { case []byte: @@ -68,7 +71,10 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { flusher := w.(http.Flusher) - cons := &mjpeg.Consumer{} + cons := &mjpeg.Consumer{ + RemoteAddr: r.RemoteAddr, + UserAgent: r.UserAgent(), + } cons.Listen(func(msg interface{}) { switch msg := msg.(type) { case []byte: @@ -109,7 +115,10 @@ func handlerWS(tr *api.Transport, _ *api.Message) error { return errors.New(api.StreamNotFound) } - cons := &mjpeg.Consumer{} + cons := &mjpeg.Consumer{ + RemoteAddr: tr.Request.RemoteAddr, + UserAgent: tr.Request.UserAgent(), + } cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { tr.Write(data) diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index 55dfb047..7f50aeaa 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -80,7 +80,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { exit := make(chan error) - cons := &mp4.Consumer{} + cons := &mp4.Consumer{ + RemoteAddr: r.RemoteAddr, + UserAgent: r.UserAgent(), + } cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { if _, err := w.Write(data); err != nil && exit != nil { diff --git a/cmd/mp4/ws.go b/cmd/mp4/ws.go index 8b01b00f..e5c4dc04 100644 --- a/cmd/mp4/ws.go +++ b/cmd/mp4/ws.go @@ -18,7 +18,10 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error { return errors.New(api.StreamNotFound) } - cons := &mp4.Consumer{} + cons := &mp4.Consumer{ + RemoteAddr: tr.Request.RemoteAddr, + UserAgent: tr.Request.UserAgent(), + } cons.UserAgent = tr.Request.UserAgent() cons.RemoteAddr = tr.Request.RemoteAddr diff --git a/cmd/streams/consumer.go b/cmd/streams/consumer.go new file mode 100644 index 00000000..9a855af2 --- /dev/null +++ b/cmd/streams/consumer.go @@ -0,0 +1,15 @@ +package streams + +import ( + "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/streamer" +) + +type Consumer struct { + element streamer.Consumer + tracks []*streamer.Track +} + +func (c *Consumer) MarshalJSON() ([]byte, error) { + return json.Marshal(c.element) +} diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index 083bb143..22261ec4 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -1,6 +1,7 @@ package streams import ( + "encoding/json" "github.com/AlexxIT/go2rtc/pkg/streamer" "strings" "sync" @@ -91,6 +92,15 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea return track } +func (p *Producer) MarshalJSON() ([]byte, error) { + if p.element != nil { + return json.Marshal(p.element) + } + + info := streamer.Info{URL: p.url} + return json.Marshal(info) +} + // internals func (p *Producer) start() { diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 859e4f5b..82cbfaf7 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -10,11 +10,6 @@ import ( "sync/atomic" ) -type Consumer struct { - element streamer.Consumer - tracks []*streamer.Track -} - type Stream struct { producers []*Producer consumers []*Consumer @@ -199,24 +194,19 @@ producers: func (s *Stream) MarshalJSON() ([]byte, error) { if !s.mu.TryLock() { log.Warn().Msgf("[streams] json locked") - return []byte(`null`), nil + return json.Marshal(nil) } - var v []interface{} - for _, prod := range s.producers { - if prod.element != nil { - v = append(v, prod.element) - } - } - for _, cons := range s.consumers { - // cons.element always not nil - v = append(v, cons.element) + var info struct { + Producers []*Producer `json:"producers"` + Consumers []*Consumer `json:"consumers"` } + info.Producers = s.producers + info.Consumers = s.consumers + s.mu.Unlock() - if len(v) == 0 { - v = nil - } - return json.Marshal(v) + + return json.Marshal(info) } func (s *Stream) removeConsumer(i int) { diff --git a/cmd/streams/streams.go b/cmd/streams/streams.go index aeda7a7e..5132744b 100644 --- a/cmd/streams/streams.go +++ b/cmd/streams/streams.go @@ -1,9 +1,12 @@ package streams import ( + "encoding/json" + "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/app/store" "github.com/rs/zerolog" + "net/http" ) func Init() { @@ -22,6 +25,8 @@ func Init() { for name, item := range store.GetDict("streams") { streams[name] = NewStream(item) } + + api.HandleFunc("api/streams", streamsHandler) } func Get(name string) *Stream { @@ -48,19 +53,29 @@ func GetOrNew(src string) *Stream { return New(src, src) } -func Delete(name string) { - delete(streams, name) -} +func streamsHandler(w http.ResponseWriter, r *http.Request) { + src := r.URL.Query().Get("src") -func All() map[string]interface{} { - all := map[string]interface{}{} - for name, stream := range streams { - all[name] = stream - //if stream.Active() { - // all[name] = stream - //} + switch r.Method { + case "PUT": + name := r.URL.Query().Get("name") + if name == "" { + name = src + } + New(name, src) + return + case "DELETE": + delete(streams, src) + return + } + + if src != "" { + e := json.NewEncoder(w) + e.SetIndent("", " ") + _ = e.Encode(streams[src]) + } else { + _ = json.NewEncoder(w).Encode(streams) } - return all } var log zerolog.Logger diff --git a/pkg/homekit/client.go b/pkg/homekit/client.go index ce532698..5100865f 100644 --- a/pkg/homekit/client.go +++ b/pkg/homekit/client.go @@ -1,6 +1,7 @@ package homekit import ( + "encoding/json" "errors" "fmt" "github.com/AlexxIT/go2rtc/pkg/hap" @@ -11,6 +12,7 @@ import ( "github.com/brutella/hap/rtp" "net" "net/url" + "sync/atomic" ) type Client struct { @@ -263,3 +265,19 @@ func (c *Client) getMedias() []*streamer.Media { return medias } + +func (c *Client) MarshalJSON() ([]byte, error) { + var recv uint32 + for _, session := range c.sessions { + recv += atomic.LoadUint32(&session.Recv) + } + + info := &streamer.Info{ + Type: "HomeKit source", + URL: c.conn.URL(), + Medias: c.medias, + Tracks: c.tracks, + Recv: recv, + } + return json.Marshal(info) +} diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go index c09a7fb1..96121fba 100644 --- a/pkg/ivideon/client.go +++ b/pkg/ivideon/client.go @@ -15,6 +15,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" ) @@ -41,6 +42,8 @@ type Client struct { buffer chan []byte state State mu sync.Mutex + + recv uint32 } func NewClient(id string) *Client { @@ -109,6 +112,7 @@ func (c *Client) Handle() error { c.mu.Lock() if c.state == StateHandle { c.buffer <- data + atomic.AddUint32(&c.recv, uint32(len(data))) } c.mu.Unlock() } @@ -140,6 +144,7 @@ func (c *Client) Handle() error { c.mu.Lock() if c.state == StateHandle { c.buffer <- data + atomic.AddUint32(&c.recv, uint32(len(data))) } c.mu.Unlock() } diff --git a/pkg/ivideon/streamer.go b/pkg/ivideon/streamer.go index ea87a73f..bd40514a 100644 --- a/pkg/ivideon/streamer.go +++ b/pkg/ivideon/streamer.go @@ -1,8 +1,10 @@ package ivideon import ( + "encoding/json" "fmt" "github.com/AlexxIT/go2rtc/pkg/streamer" + "sync/atomic" ) func (c *Client) GetMedias() []*streamer.Media { @@ -29,3 +31,19 @@ func (c *Client) Start() error { func (c *Client) Stop() error { return c.Close() } + +func (c *Client) MarshalJSON() ([]byte, error) { + var tracks []*streamer.Track + for _, track := range c.tracks { + tracks = append(tracks, track) + } + + info := &streamer.Info{ + Type: "Ivideon source", + URL: c.ID, + Medias: c.medias, + Tracks: tracks, + Recv: atomic.LoadUint32(&c.recv), + } + return json.Marshal(info) +} diff --git a/pkg/mjpeg/client.go b/pkg/mjpeg/client.go index 12dbcbd7..02130d05 100644 --- a/pkg/mjpeg/client.go +++ b/pkg/mjpeg/client.go @@ -2,6 +2,7 @@ package mjpeg import ( "bufio" + "encoding/json" "errors" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/tcp" @@ -11,6 +12,7 @@ import ( "net/textproto" "strconv" "strings" + "sync/atomic" "time" ) @@ -24,6 +26,7 @@ type Client struct { res *http.Response track *streamer.Track + recv uint32 } func NewClient(res *http.Response) *Client { @@ -70,6 +73,17 @@ func (c *Client) Stop() error { return nil } +func (c *Client) MarshalJSON() ([]byte, error) { + info := &streamer.Info{ + Type: "MJPEG source", + URL: c.res.Request.URL.String(), + RemoteAddr: c.RemoteAddr, + UserAgent: c.UserAgent, + Recv: atomic.LoadUint32(&c.recv), + } + return json.Marshal(info) +} + func (c *Client) startJPEG() error { buf, err := io.ReadAll(c.res.Body) if err != nil { @@ -79,6 +93,8 @@ func (c *Client) startJPEG() error { packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} _ = c.track.WriteRTP(packet) + atomic.AddUint32(&c.recv, uint32(len(buf))) + req := c.res.Request for !c.closed { @@ -98,6 +114,8 @@ func (c *Client) startJPEG() error { packet = &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} _ = c.track.WriteRTP(packet) + + atomic.AddUint32(&c.recv, uint32(len(buf))) } return nil @@ -141,6 +159,8 @@ func (c *Client) startMJPEG(boundary string) error { packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} _ = c.track.WriteRTP(packet) + atomic.AddUint32(&c.recv, uint32(len(buf))) + if _, err = r.Discard(2); err != nil { return err } diff --git a/pkg/mjpeg/consumer.go b/pkg/mjpeg/consumer.go index 206136ab..df4aa680 100644 --- a/pkg/mjpeg/consumer.go +++ b/pkg/mjpeg/consumer.go @@ -1,8 +1,10 @@ package mjpeg import ( + "encoding/json" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/pion/rtp" + "sync/atomic" ) type Consumer struct { @@ -14,7 +16,7 @@ type Consumer struct { codecs []*streamer.Codec start bool - send int + send uint32 } func (c *Consumer) GetMedias() []*streamer.Media { @@ -28,6 +30,7 @@ func (c *Consumer) GetMedias() []*streamer.Media { func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { push := func(packet *rtp.Packet) error { c.Fire(packet.Payload) + atomic.AddUint32(&c.send, uint32(len(packet.Payload))) return nil } @@ -38,3 +41,13 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return track.Bind(push) } + +func (c *Consumer) MarshalJSON() ([]byte, error) { + info := &streamer.Info{ + Type: "MJPEG client", + RemoteAddr: c.RemoteAddr, + UserAgent: c.UserAgent, + Send: atomic.LoadUint32(&c.send), + } + return json.Marshal(info) +} diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index 342f4dca..3df6e355 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -7,6 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/pion/rtp" + "sync/atomic" ) type Consumer struct { @@ -20,7 +21,7 @@ type Consumer struct { codecs []*streamer.Codec wait byte - send int + send uint32 } const ( @@ -76,7 +77,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea } buf := c.muxer.Marshal(trackID, packet) - c.send += len(buf) + atomic.AddUint32(&c.send, uint32(len(buf))) c.Fire(buf) return nil @@ -108,7 +109,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea } buf := c.muxer.Marshal(trackID, packet) - c.send += len(buf) + atomic.AddUint32(&c.send, uint32(len(buf))) c.Fire(buf) return nil @@ -128,7 +129,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea } buf := c.muxer.Marshal(trackID, packet) - c.send += len(buf) + atomic.AddUint32(&c.send, uint32(len(buf))) c.Fire(buf) return nil @@ -163,12 +164,11 @@ func (c *Consumer) Start() { // func (c *Consumer) MarshalJSON() ([]byte, error) { - v := map[string]interface{}{ - "type": "MP4 server consumer", - "send": c.send, - "remote_addr": c.RemoteAddr, - "user_agent": c.UserAgent, + info := &streamer.Info{ + Type: "MP4 client", + RemoteAddr: c.RemoteAddr, + UserAgent: c.UserAgent, + Send: atomic.LoadUint32(&c.send), } - - return json.Marshal(v) + return json.Marshal(info) } diff --git a/pkg/mp4f/consumer.go b/pkg/mp4f/consumer.go index 465c0ea9..09b8757a 100644 --- a/pkg/mp4f/consumer.go +++ b/pkg/mp4f/consumer.go @@ -1,7 +1,6 @@ package mp4f import ( - "encoding/json" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/deepch/vdk/av" @@ -149,16 +148,3 @@ func (c *Consumer) Init() ([]byte, error) { func (c *Consumer) Start() { c.start = true } - -// - -func (c *Consumer) MarshalJSON() ([]byte, error) { - v := map[string]interface{}{ - "type": "MSE server consumer", - "send": c.send, - "remote_addr": c.RemoteAddr, - "user_agent": c.UserAgent, - } - - return json.Marshal(v) -} diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index 19f414cc..8d0eb85f 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -12,6 +12,7 @@ import ( "github.com/deepch/vdk/format/rtmp" "github.com/pion/rtp" "net/http" + "sync/atomic" "time" ) @@ -33,7 +34,7 @@ type Client struct { conn Conn closed bool - receive int + recv uint32 } func NewClient(uri string) *Client { @@ -138,7 +139,7 @@ func (c *Client) Handle() (err error) { return } - c.receive += len(pkt.Data) + atomic.AddUint32(&c.recv, uint32(len(pkt.Data))) track := c.tracks[int(pkt.Idx)] diff --git a/pkg/rtmp/streamer.go b/pkg/rtmp/streamer.go index b5be3472..2dc38238 100644 --- a/pkg/rtmp/streamer.go +++ b/pkg/rtmp/streamer.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "github.com/AlexxIT/go2rtc/pkg/streamer" - "strconv" + "sync/atomic" ) func (c *Client) GetMedias() []*streamer.Media { @@ -29,19 +29,12 @@ func (c *Client) Stop() error { } func (c *Client) MarshalJSON() ([]byte, error) { - v := map[string]interface{}{ - streamer.JSONReceive: c.receive, - streamer.JSONType: "RTMP client producer", - //streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), - "url": c.URI, + info := &streamer.Info{ + Type: "RTMP source", + URL: c.URI, + Medias: c.medias, + Tracks: c.tracks, + Recv: atomic.LoadUint32(&c.recv), } - for i, media := range c.medias { - k := "media:" + strconv.Itoa(i) - v[k] = media.String() - } - for i, track := range c.tracks { - k := "track:" + strconv.Itoa(i) - v[k] = track.String() - } - return json.Marshal(v) + return json.Marshal(info) } diff --git a/pkg/rtsp/streamer.go b/pkg/rtsp/streamer.go index fe4b4fb4..442be3ce 100644 --- a/pkg/rtsp/streamer.go +++ b/pkg/rtsp/streamer.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "github.com/AlexxIT/go2rtc/pkg/streamer" - "strconv" ) // Element Producer @@ -88,40 +87,30 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. // func (c *Conn) MarshalJSON() ([]byte, error) { - v := map[string]interface{}{ - streamer.JSONReceive: c.receive, - streamer.JSONSend: c.send, + info := &streamer.Info{ + UserAgent: c.UserAgent, + Medias: c.Medias, + Tracks: c.tracks, + Recv: uint32(c.receive), + Send: uint32(c.send), } + switch c.mode { case ModeUnknown: - v[streamer.JSONType] = "RTSP unknown" - case ModeClientProducer: - v[streamer.JSONType] = "RTSP client producer" - case ModeServerProducer: - v[streamer.JSONType] = "RTSP server producer" + info.Type = "RTSP unknown" + case ModeClientProducer, ModeServerProducer: + info.Type = "RTSP source" case ModeServerConsumer: - v[streamer.JSONType] = "RTSP server consumer" + info.Type = "RTSP client" } - //if c.URI != "" { - // v["uri"] = c.URI - //} + if c.URL != nil { - v["url"] = c.URL.String() + info.URL = c.URL.String() } if c.conn != nil { - v[streamer.JSONRemoteAddr] = c.conn.RemoteAddr().String() - } - if c.UserAgent != "" { - v[streamer.JSONUserAgent] = c.UserAgent - } - for i, media := range c.Medias { - k := "media:" + strconv.Itoa(i) - v[k] = media.String() - } - for i, track := range c.tracks { - k := "track:" + strconv.Itoa(i) - v[k] = track.String() + info.RemoteAddr = c.conn.RemoteAddr().String() } + //for i, track := range c.tracks { // k := "track:" + strconv.Itoa(i+1) // if track.MimeType() == streamer.MimeTypeH264 { @@ -130,5 +119,6 @@ func (c *Conn) MarshalJSON() ([]byte, error) { // v[k] = track.MimeType() // } //} - return json.Marshal(v) + + return json.Marshal(info) } diff --git a/pkg/srtp/server.go b/pkg/srtp/server.go index 264946a7..19a75d0a 100644 --- a/pkg/srtp/server.go +++ b/pkg/srtp/server.go @@ -3,6 +3,7 @@ package srtp import ( "encoding/binary" "net" + "sync/atomic" ) // Server using same UDP port for SRTP and for SRTCP as the iPhone does @@ -55,6 +56,8 @@ func (s *Server) Serve(conn net.PacketConn) error { } } + atomic.AddUint32(&session.Recv, uint32(n)) + if err = session.HandleRTP(buf[:n]); err != nil { return err } diff --git a/pkg/srtp/session.go b/pkg/srtp/session.go index 07321396..54795560 100644 --- a/pkg/srtp/session.go +++ b/pkg/srtp/session.go @@ -17,6 +17,7 @@ type Session struct { Write func(b []byte) (int, error) Track *streamer.Track + Recv uint32 lastSequence uint32 lastTimestamp uint32 diff --git a/pkg/streamer/helpers.go b/pkg/streamer/helpers.go index e1cf3e70..d376f605 100644 --- a/pkg/streamer/helpers.go +++ b/pkg/streamer/helpers.go @@ -4,13 +4,16 @@ import ( "strings" ) -const ( - JSONType = "type" - JSONRemoteAddr = "remote_addr" - JSONUserAgent = "user_agent" - JSONReceive = "receive" - JSONSend = "send" -) +type Info struct { + Type string `json:"type,omitempty"` + URL string `json:"url,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + UserAgent string `json:"user_agent,omitempty"` + Medias []*Media `json:"medias,omitempty"` + Tracks []*Track `json:"tracks,omitempty"` + Recv uint32 `json:"recv,omitempty"` + Send uint32 `json:"send,omitempty"` +} func Between(s, sub1, sub2 string) string { i := strings.Index(s, sub1) diff --git a/pkg/streamer/media.go b/pkg/streamer/media.go index d295fb53..4b2b783f 100644 --- a/pkg/streamer/media.go +++ b/pkg/streamer/media.go @@ -1,6 +1,7 @@ package streamer import ( + "encoding/json" "fmt" "github.com/pion/sdp/v3" "strconv" @@ -70,6 +71,10 @@ func (m *Media) String() string { return s } +func (m *Media) MarshalJSON() ([]byte, error) { + return json.Marshal(m.String()) +} + func (m *Media) Clone() *Media { clone := *m return &clone diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index 17e2c92a..1e71dc84 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -1,6 +1,7 @@ package streamer import ( + "encoding/json" "fmt" "github.com/pion/rtp" "sync" @@ -22,12 +23,19 @@ func NewTrack(codec *Codec, direction string) *Track { func (t *Track) String() string { s := t.Codec.String() - t.sinkMu.RLock() - s += fmt.Sprintf(", sinks=%d", len(t.sink)) - t.sinkMu.RUnlock() + if t.sinkMu.TryRLock() { + s += fmt.Sprintf(", sinks=%d", len(t.sink)) + t.sinkMu.RUnlock() + } else { + s += fmt.Sprintf(", sinks=?") + } return s } +func (t *Track) MarshalJSON() ([]byte, error) { + return json.Marshal(t.String()) +} + func (t *Track) WriteRTP(p *rtp.Packet) error { t.sinkMu.RLock() for _, f := range t.sink { diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index cea4c89f..a9019af9 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -113,20 +113,12 @@ func (c *Conn) AddCandidate(candidate string) { } func (c *Conn) MarshalJSON() ([]byte, error) { - v := map[string]interface{}{ - streamer.JSONType: "WebRTC server consumer", - streamer.JSONRemoteAddr: c.remote(), + info := &streamer.Info{ + Type: "WebRTC client", + RemoteAddr: c.remote(), + UserAgent: c.UserAgent, + Recv: uint32(c.receive), + Send: uint32(c.send), } - - if c.receive > 0 { - v[streamer.JSONReceive] = c.receive - } - if c.send > 0 { - v[streamer.JSONSend] = c.send - } - if c.UserAgent != "" { - v[streamer.JSONUserAgent] = c.UserAgent - } - - return json.Marshal(v) + return json.Marshal(info) } diff --git a/www/index.html b/www/index.html index f5f18e93..9bdc00c2 100644 --- a/www/index.html +++ b/www/index.html @@ -137,7 +137,7 @@ tbody.innerHTML = ""; for (const [name, value] of Object.entries(data)) { - const online = value ? value.length : 0; + const online = value && value.consumers ? value.consumers.length : 0; const links = templates.map(link => { return link.replace("{name}", encodeURIComponent(name)); }).join(" ");