diff --git a/handlers/init.go b/handlers/init.go new file mode 100644 index 0000000..ea5954a --- /dev/null +++ b/handlers/init.go @@ -0,0 +1,15 @@ +package handlers + +import ( + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +func Init() { + dag.AddHandler("start", func(id string) mq.Processor { return NewStartHandler(id) }) + dag.AddHandler("loop", func(id string) mq.Processor { return NewLoop(id) }) + dag.AddHandler("condition", func(id string) mq.Processor { return NewCondition(id) }) + dag.AddHandler("print", func(id string) mq.Processor { return NewPrintHandler(id) }) + dag.AddHandler("render", func(id string) mq.Processor { return NewRenderHTMLNode(id) }) + dag.AddHandler("log", func(id string) mq.Processor { return NewLogHandler(id) }) +} diff --git a/services/cmd/main.go b/services/cmd/main.go deleted file mode 100644 index 576932a..0000000 --- a/services/cmd/main.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "os" - - "github.com/oarkflow/cli" - "github.com/oarkflow/cli/console" - "github.com/oarkflow/cli/contracts" - "github.com/oarkflow/mq" - "github.com/oarkflow/mq/dag" - "github.com/oarkflow/mq/handlers" - "github.com/oarkflow/mq/services" - dagConsole "github.com/oarkflow/mq/services/console" -) - -func main() { - brokerAddr := ":5051" - loader := services.NewLoader("examples/config") - loader.Load() - cli.SetName("DAG CLI") - cli.SetVersion("v0.0.1") - app := cli.New() - client := app.Instance.Client() - client.Register([]contracts.Command{ - console.NewListCommand(client), - dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr), - }) - client.Run(os.Args, true) -} - -func init() { - dag.AddHandler("start", func(id string) mq.Processor { return handlers.NewStartHandler(id) }) - dag.AddHandler("loop", func(id string) mq.Processor { return handlers.NewLoop(id) }) - dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) }) - dag.AddHandler("print", func(id string) mq.Processor { return handlers.NewPrintHandler(id) }) - dag.AddHandler("render", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) }) - dag.AddHandler("log", func(id string) mq.Processor { return handlers.NewLogHandler(id) }) -} diff --git a/services/cmd/setup.go b/services/cmd/setup.go new file mode 100644 index 0000000..5a4dcdd --- /dev/null +++ b/services/cmd/setup.go @@ -0,0 +1,14 @@ +package cmd + +import ( + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/mq/services" +) + +func Setup(loader *services.Loader, serverApp *fiber.App, brokerAddr string) *fiber.App { + if loader.UserConfig == nil { + return nil + } + services.SetupServices(loader.Prefix(), serverApp, brokerAddr) + return serverApp +} diff --git a/services/console/run_api.go b/services/console/run_api.go new file mode 100644 index 0000000..7ef5fcc --- /dev/null +++ b/services/console/run_api.go @@ -0,0 +1,46 @@ +package console + +import ( + "errors" + + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/cli/contracts" +) + +type RunApiHandler struct { + server *fiber.App + addr string +} + +func NewRunApiHandler(server *fiber.App, addr string) *RunApiHandler { + return &RunApiHandler{ + server: server, + addr: addr, + } +} + +// Signature The name and signature of the console command. +func (receiver *RunApiHandler) Signature() string { + return "run:api-server" +} + +// Description The console command description. +func (receiver *RunApiHandler) Description() string { + return "Run API Server" +} + +// Extend The console command extend. +func (receiver *RunApiHandler) Extend() contracts.Extend { + return contracts.Extend{} +} + +// Handle Execute the console command. +func (receiver *RunApiHandler) Handle(ctx contracts.Context) error { + if receiver.server == nil { + return errors.New("API server is not configured") + } + if err := receiver.server.Listen(receiver.addr); err != nil { + return errors.New("Failed to start API server: " + err.Error()) + } + return nil +} diff --git a/services/contracts.go b/services/contracts.go new file mode 100644 index 0000000..cfe5128 --- /dev/null +++ b/services/contracts.go @@ -0,0 +1,32 @@ +package services + +import "github.com/gofiber/fiber/v2" + +type Option func(map[string]any) +type Validation interface { + Make(ctx *fiber.Ctx, data any, rules map[string]string, options ...Option) (Validator, error) + AddRules([]Rule) error + Rules() []Rule +} +type Validator interface { + Bind(ptr any) error + Errors() Errors + Fails() bool +} +type Errors interface { + One(key ...string) string + Get(key string) map[string]string + All() map[string]map[string]string + Has(key string) bool +} + +type ValidationData interface { + Get(key string) (val any, exist bool) + Set(key string, val any) error +} + +type Rule interface { + Signature() string + Passes(ctx *fiber.Ctx, data ValidationData, val any, options ...any) bool + Message() string +} diff --git a/services/examples/main.go b/services/examples/main.go index e521084..96ad8de 100644 --- a/services/examples/main.go +++ b/services/examples/main.go @@ -2,11 +2,37 @@ package main import ( "fmt" + "os" + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/cli" + "github.com/oarkflow/cli/console" + "github.com/oarkflow/cli/contracts" + "github.com/oarkflow/mq/handlers" "github.com/oarkflow/mq/services" + "github.com/oarkflow/mq/services/cmd" + dagConsole "github.com/oarkflow/mq/services/console" ) func main() { + handlers.Init() + brokerAddr := ":5051" + serverAddr := ":3000" + loader := services.NewLoader("config") + loader.Load() + serverApp := fiber.New() + cmd.Setup(loader, serverApp, brokerAddr) + app := cli.New() + client := app.Instance.Client() + client.Register([]contracts.Command{ + console.NewListCommand(client), + dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr), + dagConsole.NewRunApiHandler(serverApp, serverAddr), + }) + client.Run(os.Args, true) +} + +func mai1n() { loader := services.NewLoader("config") loader.Load() fmt.Println(loader.UserConfig) diff --git a/services/go.mod b/services/go.mod index 52d3258..d542e18 100644 --- a/services/go.mod +++ b/services/go.mod @@ -2,6 +2,8 @@ module github.com/oarkflow/mq/services go 1.24.2 +replace github.com/oarkflow/mq => ../ + require ( github.com/oarkflow/cli v0.0.0-20250313133305-8d14a63c1883 github.com/oarkflow/errors v0.0.6 @@ -17,6 +19,7 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/andeya/goutil v1.1.2 github.com/andybalholm/brotli v1.1.1 // indirect github.com/bytedance/gopkg v0.1.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect @@ -24,7 +27,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/goccy/go-reflect v1.2.0 // indirect github.com/goccy/go-yaml v1.18.0 // indirect - github.com/gofiber/fiber/v2 v2.52.6 // indirect + github.com/gofiber/fiber/v2 v2.52.9 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/uuid v1.6.0 // indirect @@ -54,9 +57,11 @@ require ( github.com/oarkflow/render v0.0.1 // indirect github.com/oarkflow/squealx v0.0.36 // indirect github.com/oarkflow/xid v1.2.8 // indirect + github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/tinylib/msgp v1.2.5 // indirect github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92 // indirect github.com/urfave/cli/v2 v2.27.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/services/go.sum b/services/go.sum index f2858a8..7a92761 100644 --- a/services/go.sum +++ b/services/go.sum @@ -12,6 +12,8 @@ github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0 h1:D3occ github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0/go.mod h1:bTSOgj05NGRuHHhQwAdPnYr9TOdNmKlZTgGLL6nyAdI= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/andeya/goutil v1.1.2 h1:RiFWFkL/9yXh2SjQkNWOHqErU1x+RauHmeR23eNUzSg= +github.com/andeya/goutil v1.1.2/go.mod h1:jEG5/QnnhG7yGxwFUX6Q+JGMif7sjdHmmNVjn7nhJDo= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/bytedance/gopkg v0.1.1 h1:3azzgSkiaw79u24a+w9arfH8OfnQQ4MHUt9lJFREEaE= @@ -34,6 +36,8 @@ github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI= github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= +github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= @@ -106,8 +110,6 @@ github.com/oarkflow/log v1.0.83 h1:T/38wvjuNeVJ9PDo0wJDTnTUQZ5XeqlcvpbCItuFFJo= github.com/oarkflow/log v1.0.83/go.mod h1:dMn57z9uq11Y264cx9c9Ac7ska9qM+EBhn4qf9CNlsM= github.com/oarkflow/metadata v0.0.78 h1:ciKbtzQGXYvSlxaFYtDX1CocCkchHskreAldVIkHIMg= github.com/oarkflow/metadata v0.0.78/go.mod h1:T6Bcsq2FVjrJYMJpMluQTw+/xkqUwax7m/qGHTDCyaw= -github.com/oarkflow/mq v0.0.17 h1:krNZW4Gi3CO90HYhAhsskVhNoObWhGjmsMLqcTuNjLQ= -github.com/oarkflow/mq v0.0.17/go.mod h1:nD3C1f4qniuGKl6pmp+BrzKcjYOZ8d+gmEUkDSOrG0Y= github.com/oarkflow/protocol v0.0.16 h1:3qNn9gwoJOpdz+owyAmW4fNMpQplqHVIjzsWM4r0pcA= github.com/oarkflow/protocol v0.0.16/go.mod h1:iKP/I+3/FIWlZ6OphAo8c60JO2qgwethOMR+NMsMI28= github.com/oarkflow/render v0.0.1 h1:Caw74Yu8OE/tjCjurhbUkS0Fi9zE/mzVvQa1Cw7m7R4= @@ -118,6 +120,8 @@ github.com/oarkflow/xid v1.2.8 h1:uCIX61Binq2RPMsqImZM6pPGzoZTmRyD6jguxF9aAA0= github.com/oarkflow/xid v1.2.8/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1GshSTtih8C2gDs04w8dReiOGXrGLNoY= +github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -134,6 +138,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= +github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/toorop/go-dkim v0.0.0-20201103131630-e1cd1a0a5208/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns= github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92 h1:flbMkdl6HxQkLs6DDhH1UkcnFpNBOu70391STjMS0O4= github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns= diff --git a/services/http/responses/responses.go b/services/http/responses/responses.go new file mode 100644 index 0000000..cb05dd2 --- /dev/null +++ b/services/http/responses/responses.go @@ -0,0 +1,60 @@ +package responses + +import ( + "os" + "strings" + + "github.com/andeya/goutil" + "github.com/gofiber/fiber/v2" +) + +type Response struct { + Additional any `json:"additional,omitempty"` + Data any `json:"data"` + Message string `json:"message,omitempty"` + StackTrace string `json:"stack_trace,omitempty"` + Code int `json:"code"` + Success bool `json:"success"` +} + +func getResponse(code int, message string, additional any, stackTrace ...string) Response { + var trace string + response := Response{ + Code: code, + Message: message, + Success: false, + Additional: additional, + } + + if len(stackTrace) > 0 { + dir, _ := os.Getwd() + trace = stackTrace[0] + trace = strings.ReplaceAll(trace, dir, "/root") + for _, t := range goutil.GetGopaths() { + trace = strings.ReplaceAll(trace, t+"pkg/mod/", "/root/") + trace = strings.ReplaceAll(trace, t, "/root/") + } + response.StackTrace = trace + } + return response +} + +func Abort(ctx *fiber.Ctx, code int, message string, additional any, stackTrace ...string) error { + return ctx.Status(fiber.StatusOK).JSON(getResponse(code, message, additional, stackTrace...)) +} + +func Failed(ctx *fiber.Ctx, code int, message string, additional any, stackTrace ...string) error { + return ctx.Status(fiber.StatusOK).JSON(getResponse(code, message, additional, stackTrace...)) +} + +func Success(ctx *fiber.Ctx, code int, data any, message ...string) error { + response := Response{ + Code: code, + Data: data, + Success: true, + } + if len(message) > 0 { + response.Message = message[0] + } + return ctx.Status(fiber.StatusOK).JSON(response) +} diff --git a/services/loader.go b/services/loader.go index 49919b8..c7c2483 100644 --- a/services/loader.go +++ b/services/loader.go @@ -14,6 +14,8 @@ import ( "gopkg.in/yaml.v3" ) +var userConfig *UserConfig + type Loader struct { path string configFile string @@ -43,6 +45,7 @@ func (l *Loader) Load() { panic(err) } l.UserConfig = cfg + userConfig = cfg // Set the global userConfig variable } func (l *Loader) prepareConfigPath() string { diff --git a/services/middlewares/schema.go b/services/middlewares/schema.go new file mode 100644 index 0000000..a5f0d29 --- /dev/null +++ b/services/middlewares/schema.go @@ -0,0 +1,108 @@ +package middlewares + +import ( + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/json" + "github.com/oarkflow/jsonschema/request" + "github.com/oarkflow/mq/services/utils" +) + +var ServerApp *fiber.App + +func MatchRoute(pattern, path string) (bool, map[string]string) { + params := make(map[string]string) + pi, ti := 0, 0 + pLen, tLen := len(pattern), len(path) + skipSlash := func(s string, i int) int { + for i < len(s) && s[i] == '/' { + i++ + } + return i + } + pi = skipSlash(pattern, pi) + ti = skipSlash(path, ti) + for pi < pLen && ti < tLen { + switch pattern[pi] { + case ':': + startName := pi + 1 + for pi < pLen && pattern[pi] != '/' { + pi++ + } + paramName := pattern[startName:pi] + startVal := ti + for ti < tLen && path[ti] != '/' { + ti++ + } + paramVal := path[startVal:ti] + params[paramName] = paramVal + case '*': + pi++ + if pi < pLen && pattern[pi] == '/' { + pi++ + } + paramName := pattern[pi:] + paramVal := path[ti:] + params[paramName] = paramVal + ti = tLen + pi = pLen + break + default: + for pi < pLen && ti < tLen && pattern[pi] != '/' && path[ti] != '/' { + if pattern[pi] != path[ti] { + return false, nil + } + pi++ + ti++ + } + } + pi = skipSlash(pattern, pi) + ti = skipSlash(path, ti) + } + if pi == pLen && ti == tLen { + return true, params + } + return false, nil +} + +func MatchRouterPath(method, path string) (fiber.Route, bool, map[string]string) { + if ServerApp == nil { + return fiber.Route{}, false, nil + } + for _, route := range ServerApp.GetRoutes() { + if route.Method == method { + matched, params := MatchRoute(route.Path, path) + if matched { + return route, matched, params + } + } + } + return fiber.Route{}, false, nil +} + +// ValidateRequestBySchema - validates each request that has schema validation +func ValidateRequestBySchema(c *fiber.Ctx) error { + route, matched, _ := MatchRouterPath(c.Method(), c.Path()) + if !matched { + return c.Next() + } + + key := route.Method + ":" + route.Path + schema, exists := utils.GetSchema(key) + if !exists { + return c.Next() + } + body := c.Body() + if len(body) == 0 { + return c.Next() + } + var intermediate any + if err := request.UnmarshalFiberCtx(schema, c, &intermediate); err != nil { + return err + } + mergedBytes, err := json.Marshal(intermediate) + if err != nil { + return err + } + c.Request().SetBody(mergedBytes) + return c.Next() +} diff --git a/services/setup.go b/services/setup.go index 35e995b..c9c6fa9 100644 --- a/services/setup.go +++ b/services/setup.go @@ -1,15 +1,33 @@ package services import ( + "context" "errors" "fmt" + "os" + "path/filepath" + "slices" + "strings" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/basicauth" + "github.com/gofiber/fiber/v2/middleware/cors" + "github.com/gofiber/fiber/v2/middleware/limiter" "github.com/oarkflow/filters" + "github.com/oarkflow/json" + v2 "github.com/oarkflow/jsonschema" "github.com/oarkflow/log" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/services/http/responses" + "github.com/oarkflow/mq/services/middlewares" + "github.com/oarkflow/mq/services/utils" + "github.com/oarkflow/protocol/utils/str" ) +var ValidationInstance Validation + func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG { syncMode := true if len(async) > 0 { @@ -116,3 +134,411 @@ func mapProviders(dataProviders interface{}) []dag.Provider { } return providers } + +func SetupServices(prefix string, router fiber.Router, brokerAddr string) { + if router == nil { + return + } + SetupAPI(prefix, router, brokerAddr) +} + +func SetupAPI(prefix string, router fiber.Router, brokerAddr string) { + if prefix != "" { + prefix = "/" + prefix + } + api := router.Group(prefix) + for _, configRoute := range userConfig.Policy.Web.Apis { + routeGroup := api.Group(configRoute.Prefix) + mws := setupMiddlewares(configRoute.Middlewares) + if len(mws) > 0 { + routeGroup.Use(mws...) + } + for _, route := range configRoute.Routes { + switch route.Operation { + case "custom": + flow := setupFlow(route, routeGroup, brokerAddr) + routeMiddlewares := setupMiddlewares(route.Middlewares) + if len(routeMiddlewares) > 0 { + routeGroup.Use(routeMiddlewares...) + } + routeGroup.Add("GET", CleanAndMergePaths(route.Uri, "/metadata"), func(ctx *fiber.Ctx) error { + return getDAGPage(ctx, flow, route.Handler) + }) + routeGroup.Add(strings.ToUpper(route.Method), route.Uri, + requestMiddleware(CleanAndMergePaths(prefix, configRoute.Prefix), route), + ruleMiddleware(route.Rules), + customRuleMiddleware(route, route.CustomRules), + customHandler(flow), + ) + } + } + } +} + +// GetRulesFromKeys returns the custom rules from the provided keys. +// It is used by the CustomRuleMiddleware to get the custom rules from the provided keys. +func GetRulesFromKeys(ruleKeys []string) (rulesArray []*filters.RuleRequest) { + for _, ruleKey := range ruleKeys { + appRules := userConfig.GetApplicationRule(ruleKey) + if appRules == nil { + panic(fmt.Sprintf("Rule %v not found", ruleKey)) + } + if appRules.Rule != nil { + rulesArray = append(rulesArray, appRules.Rule) + } + } + return +} + +// customRuleMiddleware validates the request body with the provided custom rules. +// It is passed after the ruleMiddleware to validate the request body with the custom rules. +func customRuleMiddleware(route *Route, ruleKeys []string) fiber.Handler { + rules := GetRulesFromKeys(ruleKeys) + return func(ctx *fiber.Ctx) error { + c, requestData, err := getLessRequestData(ctx, route) + ctx.SetUserContext(c) + if err != nil { + return responses.Abort(ctx, 400, "invalid request", err.Error()) + } + if len(requestData) > 0 { + header, ok := ctx.Context().Value("header").(map[string]any) + if ok { + requestData["header"] = header + } + data := map[string]any{ + "data": requestData, + } + for _, r := range rules { + _, err := r.Validate(data) + if err != nil { + var errResponse *filters.ErrorResponse + errors.As(err, &errResponse) + if slices.Contains([]string{"DENY", "DENY_WITH_WARNING"}, errResponse.ErrorAction) { + return responses.Abort(ctx, 400, "Invalid data for the request", err.Error()) + } else { + ctx.Set("error_msg", errResponse.ErrorMsg) + } + } + } + } + return ctx.Next() + } +} + +// getLessRequestData returns request data with param, query, body, enums, consts except +// restricted_field, scopes and queues +func getLessRequestData(ctx *fiber.Ctx, route *Route) (context.Context, map[string]any, error) { + request, header, err := prepareHeader(ctx, route) + if header != nil { + header["route_model"] = route.Model + } + ctx.Set("route_model", route.Model) + if err != nil { + return ctx.UserContext(), nil, err + } + c := context.WithValue(ctx.UserContext(), "header", header) + return c, request, nil +} + +func prepareHeader(ctx *fiber.Ctx, route *Route) (map[string]any, map[string]any, error) { + var request map[string]any + bodyRaw := ctx.BodyRaw() + if str.FromByte(bodyRaw) != "" { + err := json.Unmarshal(bodyRaw, &request) + if err != nil { + form, err := ctx.MultipartForm() + if err == nil || form != nil { + return nil, nil, errors.New("invalid json request") + } + } + } + if request == nil { + request = make(map[string]any) + } + requiredBody := make(map[string]bool) + header := make(map[string]any) + param := make(map[string]any) + query := make(map[string]any) + if route.Schema != nil { + schema := route.GetSchema() + if schema != nil { + if schema.Properties != nil { + for key, property := range *schema.Properties { + if property.In != nil { + for _, in := range property.In { + switch in { + case "param": + param[key] = ctx.Params(key) + case "query": + query[key] = ctx.Query(key) + case "body": + requiredBody[key] = true + } + } + } + } + } + } + } + header["param"] = param + header["query"] = query + header["route_model"] = route.Model + ctx.Set("route_model", route.Model) + for k := range requiredBody { + if _, ok := request[k]; !ok { + delete(request, k) + } + } + header["request_id"] = ctx.Get("X-Schema-Id") + // add consts and enums to request + header["consts"] = userConfig.Core.Consts + header["enums"] = userConfig.Core.Enums + return request, header, nil +} + +func customHandler(flow *dag.DAG) fiber.Handler { + return func(ctx *fiber.Ctx) error { + result := flow.Process(ctx.UserContext(), ctx.BodyRaw()) + if result.Error != nil { + return result.Error + } + contentType := result.Ctx.Value(consts.ContentType) + if contentType == nil { + return ctx.JSON(result) + } + if contentType == fiber.MIMEApplicationJSON || contentType == fiber.MIMEApplicationJSONCharsetUTF8 { + return ctx.JSON(result) + } + ctx.Set(consts.ContentType, contentType.(string)) + return ctx.Send(result.Payload) + } +} + +func getDAGPage(ctx *fiber.Ctx, flow *dag.DAG, handler Handler) error { + // Save the SVG to a temporary file + image := fmt.Sprintf("%s.svg", mq.NewID()) + if err := flow.SaveSVG(image); err != nil { + return err + } + // Ensure the file is removed after reading its content + defer func() { + _ = os.Remove(image) + }() + + // Read the SVG file bytes + svgBytes, err := os.ReadFile(image) + if err != nil { + return err + } + + // Marshal the handler details into pretty printed JSON + handlerData, err := json.MarshalIndent(handler, "", " ") + if err != nil { + return err + } + + // Build an HTML page with a two-column layout: + // Left column: SVG, Right column: Handler details (displayed as preformatted text) + html := fmt.Sprintf(` + + + + DAG Visualization and Handler Details + + + +
+

DAG Visualization and Handler Details

+

URI: %s

+
+
+
+
+ %s +
+
+
+

Handler Details

+
%s
+
+
+ +`, flow.BaseURI(), flow.BaseURI(), svgBytes, string(handlerData)) + + // Set the content type as HTML and send the response + ctx.Set("Content-Type", "text/html") + return ctx.SendString(html) +} + +// ruleMiddleware validates the request body with the provided rules. +// It is passed after the requestMiddleware to ensure that the request body is valid. +func ruleMiddleware(rules map[string]string) fiber.Handler { + return func(ctx *fiber.Ctx) error { + body := ctx.Body() + if len(body) == 0 { + return ctx.Next() + } + var requestData map[string]any + err := ctx.BodyParser(&requestData) + if err != nil && body != nil { + return responses.Abort(ctx, 400, "Invalid request bind", nil) + } + if len(rules) > 0 && ValidationInstance != nil { + validator, err := ValidationInstance.Make(ctx, requestData, rules) + if err != nil { + return responses.Abort(ctx, 400, "Validation Error", err.Error()) + } + if validator.Fails() { + return responses.Abort(ctx, 400, "Validation Error", validator.Errors().All()) + } + } + return ctx.Next() + } +} + +// requestMiddleware validates the request body in the original form of byte array +// against the provided request JSON schema to ensure that the request body is valid. +func requestMiddleware(prefix string, route *Route) fiber.Handler { + path := CleanAndMergePaths(prefix, route.Uri) + var schema *v2.Schema + var err error + if route.Schema != nil { + schema, err = utils.CompileSchema(path, strings.ToUpper(route.Method), route.Schema) + if err != nil { + panic(err) + } + route.SetSchema(schema) + } + return func(ctx *fiber.Ctx) error { + if route.Schema == nil { + return ctx.Next() + } + requestSchema := ctx.Query("request-schema") + if requestSchema != "" { + return ctx.JSON(fiber.Map{ + "success": true, + "code": 200, + "data": fiber.Map{ + "schema": schema, + "rules": route.Rules, + }, + }) + } + for _, r := range userConfig.Policy.Models { + if r.Name == route.Model { + db := r.Database + source := route.Model + ctx.Locals("database_connection", db) + ctx.Locals("database_source", source) + break + } + } + form, _ := ctx.MultipartForm() + if form != nil { + return ctx.Next() + } + return middlewares.ValidateRequestBySchema(ctx) + } +} + +func setupMiddlewares(middlewares []Middleware) (mid []any) { + for _, middleware := range middlewares { + switch middleware.Name { + case "cors": + mid = append(mid, cors.New(cors.Config{ExposeHeaders: "frame-session"})) + case "basic-auth": + options := struct { + Users map[string]string `json:"users"` + }{} + err := json.Unmarshal(middleware.Options, &options) + if err != nil { + panic(err) + } + mid = append(mid, basicauth.New(basicauth.Config{Users: options.Users})) + case "rate-limit": + options := struct { + Max int `json:"max"` + Expiration string `json:"expiration"` + }{} + err := json.Unmarshal(middleware.Options, &options) + if err != nil { + panic(err) + } + + expiration, err := utils.ParseDuration(options.Expiration) + if err != nil { + panic(err) + } + throttle := limiter.New(limiter.Config{Max: options.Max, Expiration: expiration}) + mid = append(mid, throttle) + } + } + return +} + +func setupFlow(route *Route, group fiber.Router, brokerAddr string) *dag.DAG { + if route.Handler.Key == "" && route.HandlerKey != "" { + handler := userConfig.GetHandler(route.HandlerKey) + if handler == nil { + panic(fmt.Sprintf("Handler not found %s", route.HandlerKey)) + } + route.Handler = *handler + } + flow := SetupHandler(route.Handler, brokerAddr) + if flow.Error != nil { + panic(flow.Error) + } + return flow +} + +func CleanAndMergePaths(uri ...string) string { + paths := make([]string, 0) + for _, u := range uri { + if u != "" { + paths = append(paths, strings.TrimPrefix(u, "/")) + } + } + return "/" + filepath.Clean(strings.Join(paths, "/")) +} diff --git a/services/utils/schema.go b/services/utils/schema.go new file mode 100644 index 0000000..d87d355 --- /dev/null +++ b/services/utils/schema.go @@ -0,0 +1,48 @@ +package utils + +import ( + "log" + "sync" + + "github.com/oarkflow/json" + v2 "github.com/oarkflow/jsonschema" +) + +type Schema struct { + m sync.RWMutex + items map[string]*v2.Schema +} + +var ( + CompiledSchemas *Schema + compiler *v2.Compiler +) + +func init() { + compiler = v2.NewCompiler() + CompiledSchemas = &Schema{items: make(map[string]*v2.Schema)} +} + +func AddSchema(key string, schema *v2.Schema) { + CompiledSchemas.m.Lock() + defer CompiledSchemas.m.Unlock() + CompiledSchemas.items[key] = schema +} + +func GetSchema(key string) (*v2.Schema, bool) { + CompiledSchemas.m.Lock() + defer CompiledSchemas.m.Unlock() + schema, ok := CompiledSchemas.items[key] + return schema, ok +} + +func CompileSchema(uri, method string, schema json.RawMessage) (*v2.Schema, error) { + s, err := compiler.Compile(schema) + if err != nil { + log.Printf("Error compiling schema for %s %s: %v", method, uri, err) + return nil, err + } + key := method + ":" + uri + AddSchema(key, s) + return s, nil +} diff --git a/services/utils/time.go b/services/utils/time.go new file mode 100644 index 0000000..067df1e --- /dev/null +++ b/services/utils/time.go @@ -0,0 +1,36 @@ +package utils + +import ( + "errors" + "fmt" + "strconv" + "time" +) + +func IsEmptyJSON(data []byte) bool { + return len(data) == 0 +} + +func ParseDuration(input string) (time.Duration, error) { + if len(input) < 2 { + return 0, errors.New("input string is too short") + } + numberPart := input[:len(input)-1] + unitPart := input[len(input)-1] + number, err := strconv.Atoi(numberPart) + if err != nil { + return 0, fmt.Errorf("invalid number part: %v", err) + } + var duration time.Duration + switch unitPart { + case 's': + duration = time.Duration(number) * time.Second + case 'm': + duration = time.Duration(number) * time.Minute + case 'h': + duration = time.Duration(number) * time.Hour + default: + return 0, errors.New("invalid unit part; use 's', 'm', or 'h'") + } + return duration, nil +}