Move WS API to separate module

This commit is contained in:
Alexey Khit
2023-05-23 14:21:39 +03:00
parent 82a8e07b66
commit 59555cfe1d
9 changed files with 97 additions and 62 deletions

View File

@@ -38,12 +38,10 @@ func Init() {
log = app.GetLogger("api") log = app.GetLogger("api")
initStatic(cfg.Mod.StaticDir) initStatic(cfg.Mod.StaticDir)
initWS(cfg.Mod.Origin)
HandleFunc("api", apiHandler) HandleFunc("api", apiHandler)
HandleFunc("api/config", configHandler) HandleFunc("api/config", configHandler)
HandleFunc("api/exit", exitHandler) HandleFunc("api/exit", exitHandler)
HandleFunc("api/ws", apiWS)
// ensure we can listen without errors // ensure we can listen without errors
listener, err := net.Listen("tcp", cfg.Mod.Listen) listener, err := net.Listen("tcp", cfg.Mod.Listen)

View File

@@ -1,7 +1,9 @@
package api package ws
import ( import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@@ -9,6 +11,18 @@ import (
"time" "time"
) )
func Init() {
var cfg struct {
Mod struct {
Origin string `yaml:"origin"`
} `yaml:"api"`
}
initWS(cfg.Mod.Origin)
api.HandleFunc("api/ws", apiWS)
}
// Message - struct for data exchange in Web API // Message - struct for data exchange in Web API
type Message struct { type Message struct {
Type string `json:"type"` Type string `json:"type"`
@@ -33,7 +47,7 @@ func (m *Message) GetString(key string) string {
type WSHandler func(tr *Transport, msg *Message) error type WSHandler func(tr *Transport, msg *Message) error
func HandleWS(msgType string, handler WSHandler) { func HandleFunc(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler wsHandlers[msgType] = handler
} }

View File

@@ -3,6 +3,7 @@ package mjpeg
import ( import (
"errors" "errors"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/ffmpeg" "github.com/AlexxIT/go2rtc/internal/ffmpeg"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
@@ -20,7 +21,7 @@ func Init() {
api.HandleFunc("api/frame.jpeg", handlerKeyframe) api.HandleFunc("api/frame.jpeg", handlerKeyframe)
api.HandleFunc("api/stream.mjpeg", handlerStream) api.HandleFunc("api/stream.mjpeg", handlerStream)
api.HandleWS("mjpeg", handlerWS) ws.HandleFunc("mjpeg", handlerWS)
} }
func handlerKeyframe(w http.ResponseWriter, r *http.Request) { func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
@@ -156,7 +157,7 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
stream.RemoveProducer(client) stream.RemoveProducer(client)
} }
func handlerWS(tr *api.Transport, _ *api.Message) error { func handlerWS(tr *ws.Transport, _ *ws.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
@@ -178,7 +179,7 @@ func handlerWS(tr *api.Transport, _ *api.Message) error {
return err return err
} }
tr.Write(&api.Message{Type: "mjpeg"}) tr.Write(&ws.Message{Type: "mjpeg"})
tr.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)

View File

@@ -1,6 +1,7 @@
package mp4 package mp4
import ( import (
"github.com/AlexxIT/go2rtc/internal/api/ws"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@@ -17,8 +18,8 @@ import (
func Init() { func Init() {
log = app.GetLogger("mp4") log = app.GetLogger("mp4")
api.HandleWS("mse", handlerWSMSE) ws.HandleFunc("mse", handlerWSMSE)
api.HandleWS("mp4", handlerWSMP4) ws.HandleFunc("mp4", handlerWSMP4)
api.HandleFunc("api/frame.mp4", handlerKeyframe) api.HandleFunc("api/frame.mp4", handlerKeyframe)
api.HandleFunc("api/stream.mp4", handlerMP4) api.HandleFunc("api/stream.mp4", handlerMP4)

View File

@@ -3,6 +3,7 @@ package mp4
import ( import (
"errors" "errors"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mp4"
@@ -10,7 +11,7 @@ import (
"strings" "strings"
) )
func handlerWSMSE(tr *api.Transport, msg *api.Message) error { func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
@@ -42,7 +43,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
tr.Write(&api.Message{Type: "mse", Value: cons.MimeType()}) tr.Write(&ws.Message{Type: "mse", Value: cons.MimeType()})
data, err := cons.Init() data, err := cons.Init()
if err != nil { if err != nil {
@@ -57,7 +58,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
return nil return nil
} }
func handlerWSMP4(tr *api.Transport, msg *api.Message) error { func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
@@ -86,7 +87,7 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error {
return err return err
} }
tr.Write(&api.Message{Type: "mp4", Value: cons.MimeType}) tr.Write(&ws.Message{Type: "mp4", Value: cons.MimeType})
tr.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)

View File

@@ -1,7 +1,7 @@
package webrtc package webrtc
import ( import (
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"strconv" "strconv"
@@ -56,7 +56,7 @@ func GetCandidates() (candidates []string) {
return return
} }
func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) { func asyncCandidates(tr *ws.Transport, cons *webrtc.Conn) {
tr.WithContext(func(ctx map[any]any) { tr.WithContext(func(ctx map[any]any) {
if candidates, ok := ctx["candidate"].([]string); ok { if candidates, ok := ctx["candidate"].([]string); ok {
// process candidates that receive before this moment // process candidates that receive before this moment
@@ -74,7 +74,7 @@ func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) {
for _, candidate := range GetCandidates() { for _, candidate := range GetCandidates() {
log.Trace().Str("candidate", candidate).Msg("[webrtc] config") log.Trace().Str("candidate", candidate).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: candidate}) tr.Write(&ws.Message{Type: "webrtc/candidate", Value: candidate})
} }
} }
@@ -102,7 +102,7 @@ func syncCanditates(answer string) (string, error) {
return string(data), nil return string(data), nil
} }
func candidateHandler(tr *api.Transport, msg *api.Message) error { func candidateHandler(tr *ws.Transport, msg *ws.Message) error {
// process incoming candidate in sync function // process incoming candidate in sync function
tr.WithContext(func(ctx map[any]any) { tr.WithContext(func(ctx map[any]any) {
candidate := msg.String() candidate := msg.String()

View File

@@ -2,7 +2,7 @@ package webrtc
import ( import (
"errors" "errors"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@@ -30,13 +30,13 @@ func streamsHandler(url string) (core.Producer, error) {
// ex: ws://localhost:1984/api/ws?src=camera1 // ex: ws://localhost:1984/api/ws?src=camera1
func asyncClient(url string) (core.Producer, error) { func asyncClient(url string) (core.Producer, error) {
// 1. Connect to signalign server // 1. Connect to signalign server
ws, _, err := websocket.DefaultDialer.Dial(url, nil) conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() { defer func() {
if err != nil { if err != nil {
_ = ws.Close() _ = conn.Close()
} }
}() }()
@@ -55,14 +55,14 @@ func asyncClient(url string) (core.Producer, error) {
prod.Listen(func(msg any) { prod.Listen(func(msg any) {
switch msg := msg.(type) { switch msg := msg.(type) {
case pion.PeerConnectionState: case pion.PeerConnectionState:
_ = ws.Close() _ = conn.Close()
case *pion.ICECandidate: case *pion.ICECandidate:
sendOffer.Wait() sendOffer.Wait()
s := msg.ToJSON().Candidate s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local") log.Trace().Str("candidate", s).Msg("[webrtc] local")
_ = ws.WriteJSON(&api.Message{Type: "webrtc/candidate", Value: s}) _ = conn.WriteJSON(&ws.Message{Type: "webrtc/candidate", Value: s})
} }
}) })
@@ -79,15 +79,15 @@ func asyncClient(url string) (core.Producer, error) {
} }
// 4. Send offer // 4. Send offer
msg := &api.Message{Type: "webrtc/offer", Value: offer} msg := &ws.Message{Type: "webrtc/offer", Value: offer}
if err = ws.WriteJSON(msg); err != nil { if err = conn.WriteJSON(msg); err != nil {
return nil, err return nil, err
} }
sendOffer.Done() sendOffer.Done()
// 5. Get answer // 5. Get answer
if err = ws.ReadJSON(msg); err != nil { if err = conn.ReadJSON(msg); err != nil {
return nil, err return nil, err
} }
@@ -104,8 +104,8 @@ func asyncClient(url string) (core.Producer, error) {
go func() { go func() {
for { for {
// receive data from remote // receive data from remote
msg := new(api.Message) msg := new(ws.Message)
if err = ws.ReadJSON(msg); err != nil { if err = conn.ReadJSON(msg); err != nil {
if cerr, ok := err.(*websocket.CloseError); ok { if cerr, ok := err.(*websocket.CloseError); ok {
log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr) log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr)
} }
@@ -120,7 +120,7 @@ func asyncClient(url string) (core.Producer, error) {
} }
} }
_ = ws.Close() _ = conn.Close()
}() }()
return prod, nil return prod, nil

View File

@@ -3,6 +3,7 @@ package webrtc
import ( import (
"errors" "errors"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
@@ -68,9 +69,9 @@ func Init() {
} }
// async WebRTC server (two API versions) // async WebRTC server (two API versions)
api.HandleWS("webrtc", asyncHandler) ws.HandleFunc("webrtc", asyncHandler)
api.HandleWS("webrtc/offer", asyncHandler) ws.HandleFunc("webrtc/offer", asyncHandler)
api.HandleWS("webrtc/candidate", candidateHandler) ws.HandleFunc("webrtc/candidate", candidateHandler)
// sync WebRTC server (two API versions) // sync WebRTC server (two API versions)
api.HandleFunc("api/webrtc", syncHandler) api.HandleFunc("api/webrtc", syncHandler)
@@ -84,7 +85,7 @@ var log zerolog.Logger
var PeerConnection func(active bool) (*pion.PeerConnection, error) var PeerConnection func(active bool) (*pion.PeerConnection, error)
func asyncHandler(tr *api.Transport, msg *api.Message) error { func asyncHandler(tr *ws.Transport, msg *ws.Message) error {
var stream *streams.Stream var stream *streams.Stream
var mode core.Mode var mode core.Mode
@@ -134,7 +135,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
s := msg.ToJSON().Candidate s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local") log.Trace().Str("candidate", s).Msg("[webrtc] local")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: s}) tr.Write(&ws.Message{Type: "webrtc/candidate", Value: s})
} }
}) })
@@ -179,9 +180,9 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
if apiV2 { if apiV2 {
desc := pion.SessionDescription{Type: pion.SDPTypeAnswer, SDP: answer} desc := pion.SessionDescription{Type: pion.SDPTypeAnswer, SDP: answer}
tr.Write(&api.Message{Type: "webrtc", Value: desc}) tr.Write(&ws.Message{Type: "webrtc", Value: desc})
} else { } else {
tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) tr.Write(&ws.Message{Type: "webrtc/answer", Value: answer})
} }
sendAnswer.Done() sendAnswer.Done()

73
main.go
View File

@@ -2,6 +2,7 @@ package main
import ( import (
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/debug" "github.com/AlexxIT/go2rtc/internal/debug"
"github.com/AlexxIT/go2rtc/internal/dvrip" "github.com/AlexxIT/go2rtc/internal/dvrip"
@@ -32,37 +33,55 @@ import (
) )
func main() { func main() {
app.Init() // init config and logs // 1. Core modules: app, api/ws, streams
api.Init() // init HTTP API server
streams.Init() // load streams list
onvif.Init()
rtsp.Init() // add support RTSP client and RTSP server app.Init() // init config and logs
rtmp.Init() // add support RTMP client
exec.Init() // add support exec scheme (depends on RTSP server)
ffmpeg.Init() // add support ffmpeg scheme (depends on exec scheme)
hass.Init() // add support hass scheme
echo.Init()
ivideon.Init()
http.Init()
dvrip.Init()
tapo.Init()
isapi.Init()
mpegts.Init()
roborock.Init()
nest.Init()
srtp.Init() api.Init() // init API before all others
homekit.Init() ws.Init() // init WS API endpoint
webrtc.Init() streams.Init() // streams module
mp4.Init()
hls.Init()
mjpeg.Init()
webtorrent.Init() // 2. Main sources and servers
ngrok.Init()
debug.Init() rtsp.Init() // rtsp source, RTSP server
webrtc.Init() // webrtc source, WebRTC server
// 3. Main API
mp4.Init() // MP4 API
hls.Init() // HLS API
mjpeg.Init() // MJPEG API
// 4. Other sources and servers
hass.Init() // hass source, Hass API server
onvif.Init() // onvif source, ONVIF API server
webtorrent.Init() // webtorrent source, WebTorrent module
// 5. Other sources
rtmp.Init() // rtmp source
exec.Init() // exec source
ffmpeg.Init() // ffmpeg source
echo.Init() // echo source
ivideon.Init() // ivideon source
http.Init() // http/tcp source
dvrip.Init() // dvrip source
tapo.Init() // tapo source
isapi.Init() // isapi source
mpegts.Init() // mpegts passive source
roborock.Init() // roborock source
homekit.Init() // homekit source
nest.Init() // nest source
// 6. Helper modules
ngrok.Init() // Ngrok module
srtp.Init() // SRTP server
debug.Init() // debug API
// 7. Go
shell.RunUntilSignal() shell.RunUntilSignal()
} }