From b554886d8d8b8929beedfd155b0d5ccc77f172e7 Mon Sep 17 00:00:00 2001 From: sujit Date: Fri, 11 Oct 2024 21:14:51 +0545 Subject: [PATCH] feat: resize --- broker.go | 4 +- consumer.go | 6 +- ctx.go | 8 +- dag/dag.go | 168 --------------------------------------- dag/ui.go | 203 ++++++++++++++++++++++++++++++++++++++++++++++++ examples/dag.go | 7 +- options.go | 24 +++--- pool.go | 14 ++-- queue.go | 2 +- 9 files changed, 236 insertions(+), 200 deletions(-) create mode 100644 dag/ui.go diff --git a/broker.go b/broker.go index 1e59c64..9349e59 100644 --- a/broker.go +++ b/broker.go @@ -24,14 +24,14 @@ type QueuedTask struct { } type consumer struct { + conn net.Conn id string state consts.ConsumerState - conn net.Conn } type publisher struct { - id string conn net.Conn + id string } type Broker struct { diff --git a/consumer.go b/consumer.go index e0180a2..5a0fb6a 100644 --- a/consumer.go +++ b/consumer.go @@ -21,12 +21,12 @@ type Processor interface { } type Consumer struct { - id string - handler Handler conn net.Conn + handler Handler + pool *Pool + id string queue string opts Options - pool *Pool } func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer { diff --git a/ctx.go b/ctx.go index 0e0bb01..f739a44 100644 --- a/ctx.go +++ b/ctx.go @@ -17,13 +17,13 @@ import ( ) type Task struct { - ID string `json:"id"` - Topic string `json:"topic"` - Payload json.RawMessage `json:"payload"` CreatedAt time.Time `json:"created_at"` ProcessedAt time.Time `json:"processed_at"` - Status string `json:"status"` Error error `json:"error"` + ID string `json:"id"` + Topic string `json:"topic"` + Status string `json:"status"` + Payload json.RawMessage `json:"payload"` } type Handler func(context.Context, *Task) Result diff --git a/dag/dag.go b/dag/dag.go index e914484..d0cbac7 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "net/http" - "strings" "sync" "time" @@ -82,173 +81,6 @@ func NewDAG(name string, opts ...mq.Option) *DAG { return d } -func (tm *DAG) PrintGraph() { - tm.mu.RLock() - defer tm.mu.RUnlock() - - fmt.Println("DAG Graph structure:") - for _, node := range tm.nodes { - fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key) - if conditions, ok := tm.conditions[FromNode(node.Key)]; ok { - var c []string - for when, then := range conditions { - if target, ok := tm.nodes[string(then)]; ok { - c = append(c, fmt.Sprintf("If [%s] Then %s (%s)", when, target.Name, target.Key)) - } - } - fmt.Println(strings.Join(c, ", ")) - } - var c []string - for _, edge := range node.Edges { - for _, target := range edge.To { - c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key)) - } - } - fmt.Println(strings.Join(c, ", ")) - } -} - -func (tm *DAG) ClassifyEdges(startNodes ...string) { - startNode := tm.GetStartNode() - tm.mu.RLock() - defer tm.mu.RUnlock() - if len(startNodes) > 0 && startNodes[0] != "" { - startNode = startNodes[0] - } - visited := make(map[string]bool) - discoveryTime := make(map[string]int) - finishedTime := make(map[string]int) - timeVal := 0 - if startNode == "" { - firstNode := tm.findStartNode() - if firstNode != nil { - startNode = firstNode.Key - } - } - if startNode != "" { - tm.dfs(startNode, visited, discoveryTime, finishedTime, &timeVal) - } -} - -func (tm *DAG) dfs(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, timeVal *int) { - visited[v] = true - *timeVal++ - discoveryTime[v] = *timeVal - node := tm.nodes[v] - for _, edge := range node.Edges { - for _, adj := range edge.To { - switch edge.Type { - case Simple: - if !visited[adj.Key] { - fmt.Printf("Simple Edge: %s -> %s\n", v, adj.Key) - tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal) - } - case Iterator: - if !visited[adj.Key] { - fmt.Printf("Iterator Edge: %s -> %s\n", v, adj.Key) - tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal) - } - } - - } - } - tm.handleConditionalEdges(v, visited, discoveryTime, finishedTime, timeVal) - *timeVal++ - finishedTime[v] = *timeVal -} - -func (tm *DAG) handleConditionalEdges(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, time *int) { - node := tm.nodes[v] - for when, then := range tm.conditions[FromNode(node.Key)] { - if targetNodeKey, ok := tm.nodes[string(then)]; ok { - if !visited[targetNodeKey.Key] { - fmt.Printf("Conditional Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key) - tm.dfs(targetNodeKey.Key, visited, discoveryTime, finishedTime, time) - } else { - if discoveryTime[v] > discoveryTime[targetNodeKey.Key] { - fmt.Printf("Conditional Loop Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key) - } - } - } - } -} - -func (tm *DAG) ExportDOT() string { - var sb strings.Builder - sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name)) - sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n") - sortedNodes := tm.TopologicalSort() - for _, nodeKey := range sortedNodes { - node := tm.nodes[nodeKey] - nodeColor := "lightblue" - sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor)) - } - for _, nodeKey := range sortedNodes { - node := tm.nodes[nodeKey] - for _, edge := range node.Edges { - var edgeStyle string - switch edge.Type { - case Iterator: - edgeStyle = "dashed" - default: - edgeStyle = "solid" - } - for _, to := range edge.To { - edgeColor := "black" - if edge.Label == "Iterate" { - edgeColor = "blue" - } else if edge.Label == "PASS" { - edgeColor = "green" - } else if edge.Label == "FAIL" { - edgeColor = "red" - } - sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle)) - } - } - } - for fromNodeKey, conditions := range tm.conditions { - for when, then := range conditions { - if toNode, ok := tm.nodes[string(then)]; ok { - - sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when)) - } - } - } - sb.WriteString("}\n") - return sb.String() -} - -func (tm *DAG) TopologicalSort() []string { - visited := make(map[string]bool) - stack := []string{} - for _, node := range tm.nodes { - if !visited[node.Key] { - tm.topologicalSortUtil(node.Key, visited, &stack) - } - } - - for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 { - stack[i], stack[j] = stack[j], stack[i] - } - - return stack -} - -func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) { - visited[v] = true - node := tm.nodes[v] - - for _, edge := range node.Edges { - for _, to := range edge.To { - if !visited[to.Key] { - tm.topologicalSortUtil(to.Key, visited, stack) - } - } - } - - *stack = append(*stack, v) -} - func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result { if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" { return taskContext.handleCallback(ctx, result) diff --git a/dag/ui.go b/dag/ui.go new file mode 100644 index 0000000..18e41dc --- /dev/null +++ b/dag/ui.go @@ -0,0 +1,203 @@ +package dag + +import ( + "fmt" + "os" + "os/exec" + "strings" +) + +func (tm *DAG) PrintGraph() { + tm.mu.RLock() + defer tm.mu.RUnlock() + + fmt.Println("DAG Graph structure:") + for _, node := range tm.nodes { + fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key) + if conditions, ok := tm.conditions[FromNode(node.Key)]; ok { + var c []string + for when, then := range conditions { + if target, ok := tm.nodes[string(then)]; ok { + c = append(c, fmt.Sprintf("If [%s] Then %s (%s)", when, target.Name, target.Key)) + } + } + fmt.Println(strings.Join(c, ", ")) + } + var c []string + for _, edge := range node.Edges { + for _, target := range edge.To { + c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key)) + } + } + fmt.Println(strings.Join(c, ", ")) + } +} + +func (tm *DAG) ClassifyEdges(startNodes ...string) { + startNode := tm.GetStartNode() + tm.mu.RLock() + defer tm.mu.RUnlock() + if len(startNodes) > 0 && startNodes[0] != "" { + startNode = startNodes[0] + } + visited := make(map[string]bool) + discoveryTime := make(map[string]int) + finishedTime := make(map[string]int) + timeVal := 0 + if startNode == "" { + firstNode := tm.findStartNode() + if firstNode != nil { + startNode = firstNode.Key + } + } + if startNode != "" { + tm.dfs(startNode, visited, discoveryTime, finishedTime, &timeVal) + } +} + +func (tm *DAG) dfs(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, timeVal *int) { + visited[v] = true + *timeVal++ + discoveryTime[v] = *timeVal + node := tm.nodes[v] + for _, edge := range node.Edges { + for _, adj := range edge.To { + switch edge.Type { + case Simple: + if !visited[adj.Key] { + fmt.Printf("Simple Edge: %s -> %s\n", v, adj.Key) + tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal) + } + case Iterator: + if !visited[adj.Key] { + fmt.Printf("Iterator Edge: %s -> %s\n", v, adj.Key) + tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal) + } + } + + } + } + tm.handleConditionalEdges(v, visited, discoveryTime, finishedTime, timeVal) + *timeVal++ + finishedTime[v] = *timeVal +} + +func (tm *DAG) handleConditionalEdges(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, time *int) { + node := tm.nodes[v] + for when, then := range tm.conditions[FromNode(node.Key)] { + if targetNodeKey, ok := tm.nodes[string(then)]; ok { + if !visited[targetNodeKey.Key] { + fmt.Printf("Conditional Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key) + tm.dfs(targetNodeKey.Key, visited, discoveryTime, finishedTime, time) + } else { + if discoveryTime[v] > discoveryTime[targetNodeKey.Key] { + fmt.Printf("Conditional Loop Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key) + } + } + } + } +} + +func (tm *DAG) SaveDOTFile(filename string) error { + dotContent := tm.ExportDOT() + return os.WriteFile(filename, []byte(dotContent), 0644) +} + +func (tm *DAG) SaveSVG(svgFile string) error { + return tm.saveImage(svgFile, "-Tsvg") +} + +func (tm *DAG) SavePNG(pngFile string) error { + return tm.saveImage(pngFile, "-Tpng") +} + +func (tm *DAG) saveImage(fileName string, arg string) error { + dotFile := fileName[:len(fileName)-4] + ".dot" + if err := tm.SaveDOTFile(dotFile); err != nil { + return err + } + defer func() { + os.Remove(dotFile) + }() + cmd := exec.Command("dot", arg, dotFile, "-o", fileName) + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to convert image: %w", err) + } + return nil +} + +func (tm *DAG) ExportDOT() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name)) + sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n") + sortedNodes := tm.TopologicalSort() + for _, nodeKey := range sortedNodes { + node := tm.nodes[nodeKey] + nodeColor := "lightblue" + sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor)) + } + for _, nodeKey := range sortedNodes { + node := tm.nodes[nodeKey] + for _, edge := range node.Edges { + var edgeStyle string + switch edge.Type { + case Iterator: + edgeStyle = "dashed" + default: + edgeStyle = "solid" + } + for _, to := range edge.To { + edgeColor := "black" + if edge.Label == "Iterate" { + edgeColor = "blue" + } else if edge.Label == "PASS" { + edgeColor = "green" + } else if edge.Label == "FAIL" { + edgeColor = "red" + } + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle)) + } + } + } + for fromNodeKey, conditions := range tm.conditions { + for when, then := range conditions { + if toNode, ok := tm.nodes[string(then)]; ok { + + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when)) + } + } + } + sb.WriteString("}\n") + return sb.String() +} + +func (tm *DAG) TopologicalSort() []string { + visited := make(map[string]bool) + stack := []string{} + for _, node := range tm.nodes { + if !visited[node.Key] { + tm.topologicalSortUtil(node.Key, visited, &stack) + } + } + + for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 { + stack[i], stack[j] = stack[j], stack[i] + } + + return stack +} + +func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) { + visited[v] = true + node := tm.nodes[v] + + for _, edge := range node.Edges { + for _, to := range edge.To { + if !visited[to.Key] { + tm.topologicalSortUtil(to.Key, visited, stack) + } + } + } + + *stack = append(*stack, v) +} diff --git a/examples/dag.go b/examples/dag.go index 57a81b2..9cc604b 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "io" @@ -42,9 +43,9 @@ func main() { // Classify edges // d.ClassifyEdges() - fmt.Println(d.ExportDOT()) + // fmt.Println(d.ExportDOT()) - /*http.HandleFunc("POST /publish", requestHandler("publish")) + http.HandleFunc("POST /publish", requestHandler("publish")) http.HandleFunc("POST /request", requestHandler("request")) http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { id := request.PathValue("id") @@ -67,7 +68,7 @@ func main() { err := d.Start(context.TODO(), ":8083") if err != nil { panic(err) - }*/ + } } func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { diff --git a/options.go b/options.go index 92de05f..f56e53a 100644 --- a/options.go +++ b/options.go @@ -9,13 +9,13 @@ import ( ) type Result struct { - Payload json.RawMessage `json:"payload"` - Topic string `json:"topic"` CreatedAt time.Time `json:"created_at"` ProcessedAt time.Time `json:"processed_at,omitempty"` - TaskID string `json:"task_id"` Error error `json:"error,omitempty"` + Topic string `json:"topic"` + TaskID string `json:"task_id"` Status string `json:"status"` + Payload json.RawMessage `json:"payload"` } func (r Result) Unmarshal(data any) error { @@ -51,30 +51,30 @@ func (r Result) WithData(status string, data []byte) Result { } type TLSConfig struct { - UseTLS bool CertPath string KeyPath string CAPath string + UseTLS bool } type Options struct { - syncMode bool - brokerAddr string - callback []func(context.Context, Result) Result - maxRetries int consumerOnSubscribe func(ctx context.Context, topic, consumerName string) consumerOnClose func(ctx context.Context, topic, consumerName string) notifyResponse func(context.Context, Result) + tlsConfig TLSConfig + brokerAddr string + callback []func(context.Context, Result) Result + aesKey json.RawMessage + hmacKey json.RawMessage + maxRetries int initialDelay time.Duration maxBackoff time.Duration jitterPercent float64 - tlsConfig TLSConfig - aesKey json.RawMessage - hmacKey json.RawMessage - enableEncryption bool queueSize int numOfWorkers int maxMemoryLoad int64 + syncMode bool + enableEncryption bool enableWorkerPool bool } diff --git a/pool.go b/pool.go index 5c03dd4..7f617d8 100644 --- a/pool.go +++ b/pool.go @@ -18,19 +18,19 @@ type QueueTask struct { type Callback func(ctx context.Context, result Result) error type Pool struct { + conn net.Conn + taskQueue chan QueueTask + stop chan struct{} + handler Handler + callback Callback + workerAdjust chan int // Channel for adjusting workers dynamically + wg sync.WaitGroup totalMemoryUsed int64 completedTasks int errorCount, maxMemoryLoad int64 totalTasks int numOfWorkers int32 // Change to int32 for atomic operations - taskQueue chan QueueTask - wg sync.WaitGroup paused bool - stop chan struct{} - handler Handler - callback Callback - conn net.Conn - workerAdjust chan int // Channel for adjusting workers dynamically } func NewPool( diff --git a/queue.go b/queue.go index bde4805..2a190de 100644 --- a/queue.go +++ b/queue.go @@ -5,9 +5,9 @@ import ( ) type Queue struct { - name string consumers xsync.IMap[string, *consumer] tasks chan *QueuedTask // channel to hold tasks + name string } func newQueue(name string, queueSize int) *Queue {