diff --git a/.gitignore b/.gitignore index c139adf..4ecdac9 100644 --- a/.gitignore +++ b/.gitignore @@ -13,13 +13,15 @@ bin *.flv pullcf.yaml *.zip +*.mp4 !plugin/hls/hls.js.zip __debug* .cursorrules example/default/* !example/default/main.go !example/default/config.yaml +!example/default/test.flv +!example/default/test.mp4 shutdown.sh !example/test/test.db -*.mp4 shutdown.bat \ No newline at end of file diff --git a/example/default/config.yaml b/example/default/config.yaml index 33184a8..b66b0c4 100755 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -1,7 +1,7 @@ global: location: "^/hdl/(.*)": "/flv/$1" # 兼容 v4 - "^/stress/(.*)": "/test/$1" # 5.0.x + "^/stress/api/(.*)": "/test/api/stress/$1" # 5.0.x "^/monitor/(.*)": "/debug/$1" # 5.0.x loglevel: debug admin: diff --git a/example/default/main.go b/example/default/main.go index 344d3aa..cd71320 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -16,6 +16,7 @@ import ( _ "m7s.live/v5/plugin/onvif" _ "m7s.live/v5/plugin/preview" _ "m7s.live/v5/plugin/rtmp" + _ "m7s.live/v5/plugin/rtp" _ "m7s.live/v5/plugin/rtsp" _ "m7s.live/v5/plugin/sei" _ "m7s.live/v5/plugin/snap" diff --git a/example/default/test.flv b/example/default/test.flv new file mode 100644 index 0000000..d898168 Binary files /dev/null and b/example/default/test.flv differ diff --git a/example/default/test.mp4 b/example/default/test.mp4 new file mode 100644 index 0000000..cb0f777 Binary files /dev/null and b/example/default/test.mp4 differ diff --git a/pkg/config/http.go b/pkg/config/http.go index 68c5ca3..a20f581 100644 --- a/pkg/config/http.go +++ b/pkg/config/http.go @@ -1,6 +1,7 @@ package config import ( + "log/slog" "net/http" "m7s.live/v5/pkg/util" @@ -10,8 +11,6 @@ import ( "time" ) -var _ HTTPConfig = (*HTTP)(nil) - type Middleware func(string, http.Handler) http.Handler type HTTP struct { ListenAddr string `desc:"监听地址"` @@ -28,16 +27,27 @@ type HTTP struct { grpcMux *runtime.ServeMux middlewares []Middleware } -type HTTPConfig interface { - GetHTTPConfig() *HTTP - // Handle(string, http.Handler) - // Handler(*http.Request) (http.Handler, string) - // AddMiddleware(Middleware) + +func (config *HTTP) logHandler(logger *slog.Logger, handler http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + logger.Debug("visit", "path", r.URL.String(), "remote", r.RemoteAddr) + handler.ServeHTTP(rw, r) + }) } -func (config *HTTP) GetHandler() http.Handler { +func (config *HTTP) GetHandler(logger *slog.Logger) (h http.Handler) { if config.grpcMux != nil { - return config.grpcMux + h = config.grpcMux + if logger != nil { + h = config.logHandler(logger, h) + } + if config.CORS { + h = util.CORS(h) + } + if config.UserName != "" && config.Password != "" { + h = util.BasicAuth(config.UserName, config.Password, h) + } + return } return config.mux } @@ -79,11 +89,3 @@ func (config *HTTP) Handle(path string, f http.Handler, last bool) { } config.mux.Handle(path, f) } - -func (config *HTTP) GetHTTPConfig() *HTTP { - return config -} - -// func (config *HTTP) Handler(r *http.Request) (h http.Handler, pattern string) { -// return config.mux.Handler(r) -// } diff --git a/pkg/http_server_std.go b/pkg/http_server_std.go index f098b66..58b8f4b 100644 --- a/pkg/http_server_std.go +++ b/pkg/http_server_std.go @@ -35,7 +35,7 @@ func (task *ListenHTTPWork) Start() (err error) { ReadTimeout: task.HTTP.ReadTimeout, WriteTimeout: task.HTTP.WriteTimeout, IdleTimeout: task.HTTP.IdleTimeout, - Handler: task.GetHandler(), + Handler: task.GetHandler(task.Logger), } return } @@ -61,7 +61,7 @@ func (task *ListenHTTPSWork) Start() (err error) { ReadTimeout: task.HTTP.ReadTimeout, WriteTimeout: task.HTTP.WriteTimeout, IdleTimeout: task.HTTP.IdleTimeout, - Handler: task.HTTP.GetHandler(), + Handler: task.HTTP.GetHandler(task.Logger), TLSConfig: &tls.Config{ Certificates: []tls.Certificate{cer}, CipherSuites: []uint16{ diff --git a/pkg/util/net.go b/pkg/util/net.go index 76f7dba..9b192e5 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -220,6 +220,7 @@ func CORS(next http.Handler) http.Handler { header.Set("Access-Control-Allow-Credentials", "true") header.Set("Cross-Origin-Resource-Policy", "cross-origin") header.Set("Access-Control-Allow-Headers", "Content-Type,Access-Token,Authorization") + header.Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") header.Set("Access-Control-Allow-Private-Network", "true") origin := r.Header["Origin"] if len(origin) == 0 { diff --git a/plugin/cascade/client.go b/plugin/cascade/client.go index 9a897c1..9e3c3ce 100644 --- a/plugin/cascade/client.go +++ b/plugin/cascade/client.go @@ -72,7 +72,7 @@ func (task *CascadeClient) Run() (err error) { if s, err = task.AcceptStream(task.Task.Context); err == nil { task.AddTask(&cascade.ReceiveRequestTask{ Stream: s, - Handler: task.cfg.GetGlobalCommonConf().GetHandler(), + Handler: task.cfg.GetGlobalCommonConf().GetHandler(task.Logger), Connection: task.Connection, Plugin: &task.cfg.Plugin, }) diff --git a/plugin/cascade/server.go b/plugin/cascade/server.go index d201802..bf5d14b 100644 --- a/plugin/cascade/server.go +++ b/plugin/cascade/server.go @@ -125,7 +125,7 @@ func (task *CascadeServer) Go() (err error) { var receiveRequestTask cascade.ReceiveRequestTask receiveRequestTask.Connection = task.Connection receiveRequestTask.Plugin = &task.conf.Plugin - receiveRequestTask.Handler = task.conf.GetGlobalCommonConf().GetHandler() + receiveRequestTask.Handler = task.conf.GetGlobalCommonConf().GetHandler(task.Logger) if receiveRequestTask.Stream, err = task.AcceptStream(task); err == nil { task.AddTask(&receiveRequestTask) } diff --git a/plugin/rtp/pkg/reader.go b/plugin/rtp/pkg/reader.go index 337dd87..589a466 100644 --- a/plugin/rtp/pkg/reader.go +++ b/plugin/rtp/pkg/reader.go @@ -29,7 +29,7 @@ func (r *RTPUDPReader) Read(packet *rtp.Packet) error { if ordered != nil { break } - var buf [MTUSize]byte + var buf [ReceiveMTU]byte var pack rtp.Packet n, err := r.Reader.Read(buf[:]) if err != nil { diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index e20bc09..1033829 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -59,6 +59,7 @@ const ( startBit = 1 << 7 endBit = 1 << 6 MTUSize = 1460 + ReceiveMTU = 1500 ) func (r *VideoFrame) Recycle() { diff --git a/plugin/webrtc/pkg/connection.go b/plugin/webrtc/pkg/connection.go index 112096e..10588ce 100644 --- a/plugin/webrtc/pkg/connection.go +++ b/plugin/webrtc/pkg/connection.go @@ -137,7 +137,7 @@ func (IO *MultipleConnection) Receive() { } packet := frame.Packets.GetNextPointer() for { - buf := mem.Malloc(mrtp.MTUSize) + buf := mem.Malloc(mrtp.ReceiveMTU) if n, _, err = track.Read(buf); err == nil { mem.FreeRest(&buf, n) err = packet.Unmarshal(buf) @@ -200,7 +200,7 @@ func (IO *MultipleConnection) Receive() { lastPLISent = time.Now() } - buf := mem.Malloc(mrtp.MTUSize) + buf := mem.Malloc(mrtp.ReceiveMTU) if n, _, err = track.Read(buf); err == nil { mem.FreeRest(&buf, n) err = packet.Unmarshal(buf) @@ -212,6 +212,7 @@ func (IO *MultipleConnection) Receive() { mem.Free(buf) continue } + if packet.Timestamp == writer.VideoFrame.Packets[0].Timestamp { writer.VideoFrame.AddRecycleBytes(buf) packet = writer.VideoFrame.Packets.GetNextPointer() diff --git a/server.go b/server.go index f3f1229..f7849bb 100644 --- a/server.go +++ b/server.go @@ -17,7 +17,6 @@ import ( "gopkg.in/yaml.v3" "github.com/shirou/gopsutil/v4/cpu" - "google.golang.org/protobuf/proto" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" @@ -234,16 +233,6 @@ func (s *Server) Start() (err error) { var httpMux http.Handler = httpConf.CreateHttpMux() mux := runtime.NewServeMux( runtime.WithMarshalerOption("text/plain", &pb.TextPlain{}), - runtime.WithForwardResponseOption(func(ctx context.Context, w http.ResponseWriter, m proto.Message) error { - header := w.Header() - header.Set("Access-Control-Allow-Credentials", "true") - header.Set("Cross-Origin-Resource-Policy", "cross-origin") - header.Set("Access-Control-Allow-Headers", "Content-Type,Access-Token,Authorization") - header.Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") - header.Set("Access-Control-Allow-Private-Network", "true") - header.Set("Access-Control-Allow-Origin", "*") - return nil - }), runtime.WithRoutingErrorHandler(func(_ context.Context, _ *runtime.ServeMux, _ runtime.Marshaler, w http.ResponseWriter, r *http.Request, _ int) { httpMux.ServeHTTP(w, r) }), @@ -658,7 +647,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Rewrite the URL path and handle locally r.URL.Path = pattern.ReplaceAllString(r.URL.Path, target) // Forward to local handler - s.config.HTTP.GetHandler().ServeHTTP(w, r) + s.config.HTTP.GetHandler(s.Logger).ServeHTTP(w, r) return } }