This commit is contained in:
sujit
2025-08-08 10:50:24 +05:45
parent d1197b5c63
commit 97f45165a3
14 changed files with 828 additions and 41 deletions

15
handlers/init.go Normal file
View File

@@ -0,0 +1,15 @@
package handlers
import (
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
func Init() {
dag.AddHandler("start", func(id string) mq.Processor { return NewStartHandler(id) })
dag.AddHandler("loop", func(id string) mq.Processor { return NewLoop(id) })
dag.AddHandler("condition", func(id string) mq.Processor { return NewCondition(id) })
dag.AddHandler("print", func(id string) mq.Processor { return NewPrintHandler(id) })
dag.AddHandler("render", func(id string) mq.Processor { return NewRenderHTMLNode(id) })
dag.AddHandler("log", func(id string) mq.Processor { return NewLogHandler(id) })
}

View File

@@ -1,38 +0,0 @@
package main
import (
"os"
"github.com/oarkflow/cli"
"github.com/oarkflow/cli/console"
"github.com/oarkflow/cli/contracts"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/handlers"
"github.com/oarkflow/mq/services"
dagConsole "github.com/oarkflow/mq/services/console"
)
func main() {
brokerAddr := ":5051"
loader := services.NewLoader("examples/config")
loader.Load()
cli.SetName("DAG CLI")
cli.SetVersion("v0.0.1")
app := cli.New()
client := app.Instance.Client()
client.Register([]contracts.Command{
console.NewListCommand(client),
dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr),
})
client.Run(os.Args, true)
}
func init() {
dag.AddHandler("start", func(id string) mq.Processor { return handlers.NewStartHandler(id) })
dag.AddHandler("loop", func(id string) mq.Processor { return handlers.NewLoop(id) })
dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) })
dag.AddHandler("print", func(id string) mq.Processor { return handlers.NewPrintHandler(id) })
dag.AddHandler("render", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) })
dag.AddHandler("log", func(id string) mq.Processor { return handlers.NewLogHandler(id) })
}

14
services/cmd/setup.go Normal file
View File

@@ -0,0 +1,14 @@
package cmd
import (
"github.com/gofiber/fiber/v2"
"github.com/oarkflow/mq/services"
)
func Setup(loader *services.Loader, serverApp *fiber.App, brokerAddr string) *fiber.App {
if loader.UserConfig == nil {
return nil
}
services.SetupServices(loader.Prefix(), serverApp, brokerAddr)
return serverApp
}

View File

@@ -0,0 +1,46 @@
package console
import (
"errors"
"github.com/gofiber/fiber/v2"
"github.com/oarkflow/cli/contracts"
)
type RunApiHandler struct {
server *fiber.App
addr string
}
func NewRunApiHandler(server *fiber.App, addr string) *RunApiHandler {
return &RunApiHandler{
server: server,
addr: addr,
}
}
// Signature The name and signature of the console command.
func (receiver *RunApiHandler) Signature() string {
return "run:api-server"
}
// Description The console command description.
func (receiver *RunApiHandler) Description() string {
return "Run API Server"
}
// Extend The console command extend.
func (receiver *RunApiHandler) Extend() contracts.Extend {
return contracts.Extend{}
}
// Handle Execute the console command.
func (receiver *RunApiHandler) Handle(ctx contracts.Context) error {
if receiver.server == nil {
return errors.New("API server is not configured")
}
if err := receiver.server.Listen(receiver.addr); err != nil {
return errors.New("Failed to start API server: " + err.Error())
}
return nil
}

32
services/contracts.go Normal file
View File

@@ -0,0 +1,32 @@
package services
import "github.com/gofiber/fiber/v2"
type Option func(map[string]any)
type Validation interface {
Make(ctx *fiber.Ctx, data any, rules map[string]string, options ...Option) (Validator, error)
AddRules([]Rule) error
Rules() []Rule
}
type Validator interface {
Bind(ptr any) error
Errors() Errors
Fails() bool
}
type Errors interface {
One(key ...string) string
Get(key string) map[string]string
All() map[string]map[string]string
Has(key string) bool
}
type ValidationData interface {
Get(key string) (val any, exist bool)
Set(key string, val any) error
}
type Rule interface {
Signature() string
Passes(ctx *fiber.Ctx, data ValidationData, val any, options ...any) bool
Message() string
}

View File

@@ -2,11 +2,37 @@ package main
import (
"fmt"
"os"
"github.com/gofiber/fiber/v2"
"github.com/oarkflow/cli"
"github.com/oarkflow/cli/console"
"github.com/oarkflow/cli/contracts"
"github.com/oarkflow/mq/handlers"
"github.com/oarkflow/mq/services"
"github.com/oarkflow/mq/services/cmd"
dagConsole "github.com/oarkflow/mq/services/console"
)
func main() {
handlers.Init()
brokerAddr := ":5051"
serverAddr := ":3000"
loader := services.NewLoader("config")
loader.Load()
serverApp := fiber.New()
cmd.Setup(loader, serverApp, brokerAddr)
app := cli.New()
client := app.Instance.Client()
client.Register([]contracts.Command{
console.NewListCommand(client),
dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr),
dagConsole.NewRunApiHandler(serverApp, serverAddr),
})
client.Run(os.Args, true)
}
func mai1n() {
loader := services.NewLoader("config")
loader.Load()
fmt.Println(loader.UserConfig)

View File

@@ -2,6 +2,8 @@ module github.com/oarkflow/mq/services
go 1.24.2
replace github.com/oarkflow/mq => ../
require (
github.com/oarkflow/cli v0.0.0-20250313133305-8d14a63c1883
github.com/oarkflow/errors v0.0.6
@@ -17,6 +19,7 @@ require (
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/andeya/goutil v1.1.2
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/bytedance/gopkg v0.1.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
@@ -24,7 +27,7 @@ require (
github.com/goccy/go-json v0.10.5 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/gofiber/fiber/v2 v2.52.6 // indirect
github.com/gofiber/fiber/v2 v2.52.9 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
@@ -54,9 +57,11 @@ require (
github.com/oarkflow/render v0.0.1 // indirect
github.com/oarkflow/squealx v0.0.36 // indirect
github.com/oarkflow/xid v1.2.8 // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/tinylib/msgp v1.2.5 // indirect
github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92 // indirect
github.com/urfave/cli/v2 v2.27.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect

View File

@@ -12,6 +12,8 @@ github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0 h1:D3occ
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.0.0/go.mod h1:bTSOgj05NGRuHHhQwAdPnYr9TOdNmKlZTgGLL6nyAdI=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/andeya/goutil v1.1.2 h1:RiFWFkL/9yXh2SjQkNWOHqErU1x+RauHmeR23eNUzSg=
github.com/andeya/goutil v1.1.2/go.mod h1:jEG5/QnnhG7yGxwFUX6Q+JGMif7sjdHmmNVjn7nhJDo=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/bytedance/gopkg v0.1.1 h1:3azzgSkiaw79u24a+w9arfH8OfnQQ4MHUt9lJFREEaE=
@@ -34,6 +36,8 @@ github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI=
github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw=
github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
@@ -106,8 +110,6 @@ github.com/oarkflow/log v1.0.83 h1:T/38wvjuNeVJ9PDo0wJDTnTUQZ5XeqlcvpbCItuFFJo=
github.com/oarkflow/log v1.0.83/go.mod h1:dMn57z9uq11Y264cx9c9Ac7ska9qM+EBhn4qf9CNlsM=
github.com/oarkflow/metadata v0.0.78 h1:ciKbtzQGXYvSlxaFYtDX1CocCkchHskreAldVIkHIMg=
github.com/oarkflow/metadata v0.0.78/go.mod h1:T6Bcsq2FVjrJYMJpMluQTw+/xkqUwax7m/qGHTDCyaw=
github.com/oarkflow/mq v0.0.17 h1:krNZW4Gi3CO90HYhAhsskVhNoObWhGjmsMLqcTuNjLQ=
github.com/oarkflow/mq v0.0.17/go.mod h1:nD3C1f4qniuGKl6pmp+BrzKcjYOZ8d+gmEUkDSOrG0Y=
github.com/oarkflow/protocol v0.0.16 h1:3qNn9gwoJOpdz+owyAmW4fNMpQplqHVIjzsWM4r0pcA=
github.com/oarkflow/protocol v0.0.16/go.mod h1:iKP/I+3/FIWlZ6OphAo8c60JO2qgwethOMR+NMsMI28=
github.com/oarkflow/render v0.0.1 h1:Caw74Yu8OE/tjCjurhbUkS0Fi9zE/mzVvQa1Cw7m7R4=
@@ -118,6 +120,8 @@ github.com/oarkflow/xid v1.2.8 h1:uCIX61Binq2RPMsqImZM6pPGzoZTmRyD6jguxF9aAA0=
github.com/oarkflow/xid v1.2.8/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1GshSTtih8C2gDs04w8dReiOGXrGLNoY=
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -134,6 +138,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po=
github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/toorop/go-dkim v0.0.0-20201103131630-e1cd1a0a5208/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns=
github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92 h1:flbMkdl6HxQkLs6DDhH1UkcnFpNBOu70391STjMS0O4=
github.com/toorop/go-dkim v0.0.0-20240103092955-90b7d1423f92/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns=

View File

@@ -0,0 +1,60 @@
package responses
import (
"os"
"strings"
"github.com/andeya/goutil"
"github.com/gofiber/fiber/v2"
)
type Response struct {
Additional any `json:"additional,omitempty"`
Data any `json:"data"`
Message string `json:"message,omitempty"`
StackTrace string `json:"stack_trace,omitempty"`
Code int `json:"code"`
Success bool `json:"success"`
}
func getResponse(code int, message string, additional any, stackTrace ...string) Response {
var trace string
response := Response{
Code: code,
Message: message,
Success: false,
Additional: additional,
}
if len(stackTrace) > 0 {
dir, _ := os.Getwd()
trace = stackTrace[0]
trace = strings.ReplaceAll(trace, dir, "/root")
for _, t := range goutil.GetGopaths() {
trace = strings.ReplaceAll(trace, t+"pkg/mod/", "/root/")
trace = strings.ReplaceAll(trace, t, "/root/")
}
response.StackTrace = trace
}
return response
}
func Abort(ctx *fiber.Ctx, code int, message string, additional any, stackTrace ...string) error {
return ctx.Status(fiber.StatusOK).JSON(getResponse(code, message, additional, stackTrace...))
}
func Failed(ctx *fiber.Ctx, code int, message string, additional any, stackTrace ...string) error {
return ctx.Status(fiber.StatusOK).JSON(getResponse(code, message, additional, stackTrace...))
}
func Success(ctx *fiber.Ctx, code int, data any, message ...string) error {
response := Response{
Code: code,
Data: data,
Success: true,
}
if len(message) > 0 {
response.Message = message[0]
}
return ctx.Status(fiber.StatusOK).JSON(response)
}

View File

@@ -14,6 +14,8 @@ import (
"gopkg.in/yaml.v3"
)
var userConfig *UserConfig
type Loader struct {
path string
configFile string
@@ -43,6 +45,7 @@ func (l *Loader) Load() {
panic(err)
}
l.UserConfig = cfg
userConfig = cfg // Set the global userConfig variable
}
func (l *Loader) prepareConfigPath() string {

View File

@@ -0,0 +1,108 @@
package middlewares
import (
"github.com/gofiber/fiber/v2"
"github.com/oarkflow/json"
"github.com/oarkflow/jsonschema/request"
"github.com/oarkflow/mq/services/utils"
)
var ServerApp *fiber.App
func MatchRoute(pattern, path string) (bool, map[string]string) {
params := make(map[string]string)
pi, ti := 0, 0
pLen, tLen := len(pattern), len(path)
skipSlash := func(s string, i int) int {
for i < len(s) && s[i] == '/' {
i++
}
return i
}
pi = skipSlash(pattern, pi)
ti = skipSlash(path, ti)
for pi < pLen && ti < tLen {
switch pattern[pi] {
case ':':
startName := pi + 1
for pi < pLen && pattern[pi] != '/' {
pi++
}
paramName := pattern[startName:pi]
startVal := ti
for ti < tLen && path[ti] != '/' {
ti++
}
paramVal := path[startVal:ti]
params[paramName] = paramVal
case '*':
pi++
if pi < pLen && pattern[pi] == '/' {
pi++
}
paramName := pattern[pi:]
paramVal := path[ti:]
params[paramName] = paramVal
ti = tLen
pi = pLen
break
default:
for pi < pLen && ti < tLen && pattern[pi] != '/' && path[ti] != '/' {
if pattern[pi] != path[ti] {
return false, nil
}
pi++
ti++
}
}
pi = skipSlash(pattern, pi)
ti = skipSlash(path, ti)
}
if pi == pLen && ti == tLen {
return true, params
}
return false, nil
}
func MatchRouterPath(method, path string) (fiber.Route, bool, map[string]string) {
if ServerApp == nil {
return fiber.Route{}, false, nil
}
for _, route := range ServerApp.GetRoutes() {
if route.Method == method {
matched, params := MatchRoute(route.Path, path)
if matched {
return route, matched, params
}
}
}
return fiber.Route{}, false, nil
}
// ValidateRequestBySchema - validates each request that has schema validation
func ValidateRequestBySchema(c *fiber.Ctx) error {
route, matched, _ := MatchRouterPath(c.Method(), c.Path())
if !matched {
return c.Next()
}
key := route.Method + ":" + route.Path
schema, exists := utils.GetSchema(key)
if !exists {
return c.Next()
}
body := c.Body()
if len(body) == 0 {
return c.Next()
}
var intermediate any
if err := request.UnmarshalFiberCtx(schema, c, &intermediate); err != nil {
return err
}
mergedBytes, err := json.Marshal(intermediate)
if err != nil {
return err
}
c.Request().SetBody(mergedBytes)
return c.Next()
}

View File

@@ -1,15 +1,33 @@
package services
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"slices"
"strings"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/basicauth"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/limiter"
"github.com/oarkflow/filters"
"github.com/oarkflow/json"
v2 "github.com/oarkflow/jsonschema"
"github.com/oarkflow/log"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/services/http/responses"
"github.com/oarkflow/mq/services/middlewares"
"github.com/oarkflow/mq/services/utils"
"github.com/oarkflow/protocol/utils/str"
)
var ValidationInstance Validation
func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG {
syncMode := true
if len(async) > 0 {
@@ -116,3 +134,411 @@ func mapProviders(dataProviders interface{}) []dag.Provider {
}
return providers
}
func SetupServices(prefix string, router fiber.Router, brokerAddr string) {
if router == nil {
return
}
SetupAPI(prefix, router, brokerAddr)
}
func SetupAPI(prefix string, router fiber.Router, brokerAddr string) {
if prefix != "" {
prefix = "/" + prefix
}
api := router.Group(prefix)
for _, configRoute := range userConfig.Policy.Web.Apis {
routeGroup := api.Group(configRoute.Prefix)
mws := setupMiddlewares(configRoute.Middlewares)
if len(mws) > 0 {
routeGroup.Use(mws...)
}
for _, route := range configRoute.Routes {
switch route.Operation {
case "custom":
flow := setupFlow(route, routeGroup, brokerAddr)
routeMiddlewares := setupMiddlewares(route.Middlewares)
if len(routeMiddlewares) > 0 {
routeGroup.Use(routeMiddlewares...)
}
routeGroup.Add("GET", CleanAndMergePaths(route.Uri, "/metadata"), func(ctx *fiber.Ctx) error {
return getDAGPage(ctx, flow, route.Handler)
})
routeGroup.Add(strings.ToUpper(route.Method), route.Uri,
requestMiddleware(CleanAndMergePaths(prefix, configRoute.Prefix), route),
ruleMiddleware(route.Rules),
customRuleMiddleware(route, route.CustomRules),
customHandler(flow),
)
}
}
}
}
// GetRulesFromKeys returns the custom rules from the provided keys.
// It is used by the CustomRuleMiddleware to get the custom rules from the provided keys.
func GetRulesFromKeys(ruleKeys []string) (rulesArray []*filters.RuleRequest) {
for _, ruleKey := range ruleKeys {
appRules := userConfig.GetApplicationRule(ruleKey)
if appRules == nil {
panic(fmt.Sprintf("Rule %v not found", ruleKey))
}
if appRules.Rule != nil {
rulesArray = append(rulesArray, appRules.Rule)
}
}
return
}
// customRuleMiddleware validates the request body with the provided custom rules.
// It is passed after the ruleMiddleware to validate the request body with the custom rules.
func customRuleMiddleware(route *Route, ruleKeys []string) fiber.Handler {
rules := GetRulesFromKeys(ruleKeys)
return func(ctx *fiber.Ctx) error {
c, requestData, err := getLessRequestData(ctx, route)
ctx.SetUserContext(c)
if err != nil {
return responses.Abort(ctx, 400, "invalid request", err.Error())
}
if len(requestData) > 0 {
header, ok := ctx.Context().Value("header").(map[string]any)
if ok {
requestData["header"] = header
}
data := map[string]any{
"data": requestData,
}
for _, r := range rules {
_, err := r.Validate(data)
if err != nil {
var errResponse *filters.ErrorResponse
errors.As(err, &errResponse)
if slices.Contains([]string{"DENY", "DENY_WITH_WARNING"}, errResponse.ErrorAction) {
return responses.Abort(ctx, 400, "Invalid data for the request", err.Error())
} else {
ctx.Set("error_msg", errResponse.ErrorMsg)
}
}
}
}
return ctx.Next()
}
}
// getLessRequestData returns request data with param, query, body, enums, consts except
// restricted_field, scopes and queues
func getLessRequestData(ctx *fiber.Ctx, route *Route) (context.Context, map[string]any, error) {
request, header, err := prepareHeader(ctx, route)
if header != nil {
header["route_model"] = route.Model
}
ctx.Set("route_model", route.Model)
if err != nil {
return ctx.UserContext(), nil, err
}
c := context.WithValue(ctx.UserContext(), "header", header)
return c, request, nil
}
func prepareHeader(ctx *fiber.Ctx, route *Route) (map[string]any, map[string]any, error) {
var request map[string]any
bodyRaw := ctx.BodyRaw()
if str.FromByte(bodyRaw) != "" {
err := json.Unmarshal(bodyRaw, &request)
if err != nil {
form, err := ctx.MultipartForm()
if err == nil || form != nil {
return nil, nil, errors.New("invalid json request")
}
}
}
if request == nil {
request = make(map[string]any)
}
requiredBody := make(map[string]bool)
header := make(map[string]any)
param := make(map[string]any)
query := make(map[string]any)
if route.Schema != nil {
schema := route.GetSchema()
if schema != nil {
if schema.Properties != nil {
for key, property := range *schema.Properties {
if property.In != nil {
for _, in := range property.In {
switch in {
case "param":
param[key] = ctx.Params(key)
case "query":
query[key] = ctx.Query(key)
case "body":
requiredBody[key] = true
}
}
}
}
}
}
}
header["param"] = param
header["query"] = query
header["route_model"] = route.Model
ctx.Set("route_model", route.Model)
for k := range requiredBody {
if _, ok := request[k]; !ok {
delete(request, k)
}
}
header["request_id"] = ctx.Get("X-Schema-Id")
// add consts and enums to request
header["consts"] = userConfig.Core.Consts
header["enums"] = userConfig.Core.Enums
return request, header, nil
}
func customHandler(flow *dag.DAG) fiber.Handler {
return func(ctx *fiber.Ctx) error {
result := flow.Process(ctx.UserContext(), ctx.BodyRaw())
if result.Error != nil {
return result.Error
}
contentType := result.Ctx.Value(consts.ContentType)
if contentType == nil {
return ctx.JSON(result)
}
if contentType == fiber.MIMEApplicationJSON || contentType == fiber.MIMEApplicationJSONCharsetUTF8 {
return ctx.JSON(result)
}
ctx.Set(consts.ContentType, contentType.(string))
return ctx.Send(result.Payload)
}
}
func getDAGPage(ctx *fiber.Ctx, flow *dag.DAG, handler Handler) error {
// Save the SVG to a temporary file
image := fmt.Sprintf("%s.svg", mq.NewID())
if err := flow.SaveSVG(image); err != nil {
return err
}
// Ensure the file is removed after reading its content
defer func() {
_ = os.Remove(image)
}()
// Read the SVG file bytes
svgBytes, err := os.ReadFile(image)
if err != nil {
return err
}
// Marshal the handler details into pretty printed JSON
handlerData, err := json.MarshalIndent(handler, "", " ")
if err != nil {
return err
}
// Build an HTML page with a two-column layout:
// Left column: SVG, Right column: Handler details (displayed as preformatted text)
html := fmt.Sprintf(`<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DAG Visualization and Handler Details</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background: #f4f4f4;
}
.header {
text-align: center;
margin-bottom: 20px;
}
.container {
display: flex;
flex-wrap: wrap;
gap: 20px;
}
.column {
flex: 1;
min-width: 300px;
background: #fff;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.svg-container {
text-align: center;
}
h1, h2 {
color: #333;
}
pre {
background: #f7f7f7;
padding: 10px;
border-radius: 4px;
overflow-x: auto;
}
a {
color: #007BFF;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
</style>
</head>
<body>
<div class="header">
<h1>DAG Visualization and Handler Details</h1>
<p>URI: <a href="%s">%s</a></p>
</div>
<div class="container">
<div class="column">
<div class="svg-container">
%s
</div>
</div>
<div class="column">
<h2>Handler Details</h2>
<pre>%s</pre>
</div>
</div>
</body>
</html>`, flow.BaseURI(), flow.BaseURI(), svgBytes, string(handlerData))
// Set the content type as HTML and send the response
ctx.Set("Content-Type", "text/html")
return ctx.SendString(html)
}
// ruleMiddleware validates the request body with the provided rules.
// It is passed after the requestMiddleware to ensure that the request body is valid.
func ruleMiddleware(rules map[string]string) fiber.Handler {
return func(ctx *fiber.Ctx) error {
body := ctx.Body()
if len(body) == 0 {
return ctx.Next()
}
var requestData map[string]any
err := ctx.BodyParser(&requestData)
if err != nil && body != nil {
return responses.Abort(ctx, 400, "Invalid request bind", nil)
}
if len(rules) > 0 && ValidationInstance != nil {
validator, err := ValidationInstance.Make(ctx, requestData, rules)
if err != nil {
return responses.Abort(ctx, 400, "Validation Error", err.Error())
}
if validator.Fails() {
return responses.Abort(ctx, 400, "Validation Error", validator.Errors().All())
}
}
return ctx.Next()
}
}
// requestMiddleware validates the request body in the original form of byte array
// against the provided request JSON schema to ensure that the request body is valid.
func requestMiddleware(prefix string, route *Route) fiber.Handler {
path := CleanAndMergePaths(prefix, route.Uri)
var schema *v2.Schema
var err error
if route.Schema != nil {
schema, err = utils.CompileSchema(path, strings.ToUpper(route.Method), route.Schema)
if err != nil {
panic(err)
}
route.SetSchema(schema)
}
return func(ctx *fiber.Ctx) error {
if route.Schema == nil {
return ctx.Next()
}
requestSchema := ctx.Query("request-schema")
if requestSchema != "" {
return ctx.JSON(fiber.Map{
"success": true,
"code": 200,
"data": fiber.Map{
"schema": schema,
"rules": route.Rules,
},
})
}
for _, r := range userConfig.Policy.Models {
if r.Name == route.Model {
db := r.Database
source := route.Model
ctx.Locals("database_connection", db)
ctx.Locals("database_source", source)
break
}
}
form, _ := ctx.MultipartForm()
if form != nil {
return ctx.Next()
}
return middlewares.ValidateRequestBySchema(ctx)
}
}
func setupMiddlewares(middlewares []Middleware) (mid []any) {
for _, middleware := range middlewares {
switch middleware.Name {
case "cors":
mid = append(mid, cors.New(cors.Config{ExposeHeaders: "frame-session"}))
case "basic-auth":
options := struct {
Users map[string]string `json:"users"`
}{}
err := json.Unmarshal(middleware.Options, &options)
if err != nil {
panic(err)
}
mid = append(mid, basicauth.New(basicauth.Config{Users: options.Users}))
case "rate-limit":
options := struct {
Max int `json:"max"`
Expiration string `json:"expiration"`
}{}
err := json.Unmarshal(middleware.Options, &options)
if err != nil {
panic(err)
}
expiration, err := utils.ParseDuration(options.Expiration)
if err != nil {
panic(err)
}
throttle := limiter.New(limiter.Config{Max: options.Max, Expiration: expiration})
mid = append(mid, throttle)
}
}
return
}
func setupFlow(route *Route, group fiber.Router, brokerAddr string) *dag.DAG {
if route.Handler.Key == "" && route.HandlerKey != "" {
handler := userConfig.GetHandler(route.HandlerKey)
if handler == nil {
panic(fmt.Sprintf("Handler not found %s", route.HandlerKey))
}
route.Handler = *handler
}
flow := SetupHandler(route.Handler, brokerAddr)
if flow.Error != nil {
panic(flow.Error)
}
return flow
}
func CleanAndMergePaths(uri ...string) string {
paths := make([]string, 0)
for _, u := range uri {
if u != "" {
paths = append(paths, strings.TrimPrefix(u, "/"))
}
}
return "/" + filepath.Clean(strings.Join(paths, "/"))
}

48
services/utils/schema.go Normal file
View File

@@ -0,0 +1,48 @@
package utils
import (
"log"
"sync"
"github.com/oarkflow/json"
v2 "github.com/oarkflow/jsonschema"
)
type Schema struct {
m sync.RWMutex
items map[string]*v2.Schema
}
var (
CompiledSchemas *Schema
compiler *v2.Compiler
)
func init() {
compiler = v2.NewCompiler()
CompiledSchemas = &Schema{items: make(map[string]*v2.Schema)}
}
func AddSchema(key string, schema *v2.Schema) {
CompiledSchemas.m.Lock()
defer CompiledSchemas.m.Unlock()
CompiledSchemas.items[key] = schema
}
func GetSchema(key string) (*v2.Schema, bool) {
CompiledSchemas.m.Lock()
defer CompiledSchemas.m.Unlock()
schema, ok := CompiledSchemas.items[key]
return schema, ok
}
func CompileSchema(uri, method string, schema json.RawMessage) (*v2.Schema, error) {
s, err := compiler.Compile(schema)
if err != nil {
log.Printf("Error compiling schema for %s %s: %v", method, uri, err)
return nil, err
}
key := method + ":" + uri
AddSchema(key, s)
return s, nil
}

36
services/utils/time.go Normal file
View File

@@ -0,0 +1,36 @@
package utils
import (
"errors"
"fmt"
"strconv"
"time"
)
func IsEmptyJSON(data []byte) bool {
return len(data) == 0
}
func ParseDuration(input string) (time.Duration, error) {
if len(input) < 2 {
return 0, errors.New("input string is too short")
}
numberPart := input[:len(input)-1]
unitPart := input[len(input)-1]
number, err := strconv.Atoi(numberPart)
if err != nil {
return 0, fmt.Errorf("invalid number part: %v", err)
}
var duration time.Duration
switch unitPart {
case 's':
duration = time.Duration(number) * time.Second
case 'm':
duration = time.Duration(number) * time.Minute
case 'h':
duration = time.Duration(number) * time.Hour
default:
return 0, errors.New("invalid unit part; use 's', 'm', or 'h'")
}
return duration, nil
}