feat: resize

This commit is contained in:
sujit
2024-10-11 21:14:51 +05:45
parent 1a8a5547fa
commit b554886d8d
9 changed files with 236 additions and 200 deletions

View File

@@ -24,14 +24,14 @@ type QueuedTask struct {
} }
type consumer struct { type consumer struct {
conn net.Conn
id string id string
state consts.ConsumerState state consts.ConsumerState
conn net.Conn
} }
type publisher struct { type publisher struct {
id string
conn net.Conn conn net.Conn
id string
} }
type Broker struct { type Broker struct {

View File

@@ -21,12 +21,12 @@ type Processor interface {
} }
type Consumer struct { type Consumer struct {
id string
handler Handler
conn net.Conn conn net.Conn
handler Handler
pool *Pool
id string
queue string queue string
opts Options opts Options
pool *Pool
} }
func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer { func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer {

8
ctx.go
View File

@@ -17,13 +17,13 @@ import (
) )
type Task struct { type Task struct {
ID string `json:"id"`
Topic string `json:"topic"`
Payload json.RawMessage `json:"payload"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at"` ProcessedAt time.Time `json:"processed_at"`
Status string `json:"status"`
Error error `json:"error"` Error error `json:"error"`
ID string `json:"id"`
Topic string `json:"topic"`
Status string `json:"status"`
Payload json.RawMessage `json:"payload"`
} }
type Handler func(context.Context, *Task) Result type Handler func(context.Context, *Task) Result

View File

@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"strings"
"sync" "sync"
"time" "time"
@@ -82,173 +81,6 @@ func NewDAG(name string, opts ...mq.Option) *DAG {
return d return d
} }
func (tm *DAG) PrintGraph() {
tm.mu.RLock()
defer tm.mu.RUnlock()
fmt.Println("DAG Graph structure:")
for _, node := range tm.nodes {
fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key)
if conditions, ok := tm.conditions[FromNode(node.Key)]; ok {
var c []string
for when, then := range conditions {
if target, ok := tm.nodes[string(then)]; ok {
c = append(c, fmt.Sprintf("If [%s] Then %s (%s)", when, target.Name, target.Key))
}
}
fmt.Println(strings.Join(c, ", "))
}
var c []string
for _, edge := range node.Edges {
for _, target := range edge.To {
c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key))
}
}
fmt.Println(strings.Join(c, ", "))
}
}
func (tm *DAG) ClassifyEdges(startNodes ...string) {
startNode := tm.GetStartNode()
tm.mu.RLock()
defer tm.mu.RUnlock()
if len(startNodes) > 0 && startNodes[0] != "" {
startNode = startNodes[0]
}
visited := make(map[string]bool)
discoveryTime := make(map[string]int)
finishedTime := make(map[string]int)
timeVal := 0
if startNode == "" {
firstNode := tm.findStartNode()
if firstNode != nil {
startNode = firstNode.Key
}
}
if startNode != "" {
tm.dfs(startNode, visited, discoveryTime, finishedTime, &timeVal)
}
}
func (tm *DAG) dfs(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, timeVal *int) {
visited[v] = true
*timeVal++
discoveryTime[v] = *timeVal
node := tm.nodes[v]
for _, edge := range node.Edges {
for _, adj := range edge.To {
switch edge.Type {
case Simple:
if !visited[adj.Key] {
fmt.Printf("Simple Edge: %s -> %s\n", v, adj.Key)
tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal)
}
case Iterator:
if !visited[adj.Key] {
fmt.Printf("Iterator Edge: %s -> %s\n", v, adj.Key)
tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal)
}
}
}
}
tm.handleConditionalEdges(v, visited, discoveryTime, finishedTime, timeVal)
*timeVal++
finishedTime[v] = *timeVal
}
func (tm *DAG) handleConditionalEdges(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, time *int) {
node := tm.nodes[v]
for when, then := range tm.conditions[FromNode(node.Key)] {
if targetNodeKey, ok := tm.nodes[string(then)]; ok {
if !visited[targetNodeKey.Key] {
fmt.Printf("Conditional Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key)
tm.dfs(targetNodeKey.Key, visited, discoveryTime, finishedTime, time)
} else {
if discoveryTime[v] > discoveryTime[targetNodeKey.Key] {
fmt.Printf("Conditional Loop Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key)
}
}
}
}
}
func (tm *DAG) ExportDOT() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n")
sortedNodes := tm.TopologicalSort()
for _, nodeKey := range sortedNodes {
node := tm.nodes[nodeKey]
nodeColor := "lightblue"
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
}
for _, nodeKey := range sortedNodes {
node := tm.nodes[nodeKey]
for _, edge := range node.Edges {
var edgeStyle string
switch edge.Type {
case Iterator:
edgeStyle = "dashed"
default:
edgeStyle = "solid"
}
for _, to := range edge.To {
edgeColor := "black"
if edge.Label == "Iterate" {
edgeColor = "blue"
} else if edge.Label == "PASS" {
edgeColor = "green"
} else if edge.Label == "FAIL" {
edgeColor = "red"
}
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
}
}
}
for fromNodeKey, conditions := range tm.conditions {
for when, then := range conditions {
if toNode, ok := tm.nodes[string(then)]; ok {
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when))
}
}
}
sb.WriteString("}\n")
return sb.String()
}
func (tm *DAG) TopologicalSort() []string {
visited := make(map[string]bool)
stack := []string{}
for _, node := range tm.nodes {
if !visited[node.Key] {
tm.topologicalSortUtil(node.Key, visited, &stack)
}
}
for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 {
stack[i], stack[j] = stack[j], stack[i]
}
return stack
}
func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) {
visited[v] = true
node := tm.nodes[v]
for _, edge := range node.Edges {
for _, to := range edge.To {
if !visited[to.Key] {
tm.topologicalSortUtil(to.Key, visited, stack)
}
}
}
*stack = append(*stack, v)
}
func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result { func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result {
if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" { if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" {
return taskContext.handleCallback(ctx, result) return taskContext.handleCallback(ctx, result)

203
dag/ui.go Normal file
View File

@@ -0,0 +1,203 @@
package dag
import (
"fmt"
"os"
"os/exec"
"strings"
)
func (tm *DAG) PrintGraph() {
tm.mu.RLock()
defer tm.mu.RUnlock()
fmt.Println("DAG Graph structure:")
for _, node := range tm.nodes {
fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key)
if conditions, ok := tm.conditions[FromNode(node.Key)]; ok {
var c []string
for when, then := range conditions {
if target, ok := tm.nodes[string(then)]; ok {
c = append(c, fmt.Sprintf("If [%s] Then %s (%s)", when, target.Name, target.Key))
}
}
fmt.Println(strings.Join(c, ", "))
}
var c []string
for _, edge := range node.Edges {
for _, target := range edge.To {
c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key))
}
}
fmt.Println(strings.Join(c, ", "))
}
}
func (tm *DAG) ClassifyEdges(startNodes ...string) {
startNode := tm.GetStartNode()
tm.mu.RLock()
defer tm.mu.RUnlock()
if len(startNodes) > 0 && startNodes[0] != "" {
startNode = startNodes[0]
}
visited := make(map[string]bool)
discoveryTime := make(map[string]int)
finishedTime := make(map[string]int)
timeVal := 0
if startNode == "" {
firstNode := tm.findStartNode()
if firstNode != nil {
startNode = firstNode.Key
}
}
if startNode != "" {
tm.dfs(startNode, visited, discoveryTime, finishedTime, &timeVal)
}
}
func (tm *DAG) dfs(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, timeVal *int) {
visited[v] = true
*timeVal++
discoveryTime[v] = *timeVal
node := tm.nodes[v]
for _, edge := range node.Edges {
for _, adj := range edge.To {
switch edge.Type {
case Simple:
if !visited[adj.Key] {
fmt.Printf("Simple Edge: %s -> %s\n", v, adj.Key)
tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal)
}
case Iterator:
if !visited[adj.Key] {
fmt.Printf("Iterator Edge: %s -> %s\n", v, adj.Key)
tm.dfs(adj.Key, visited, discoveryTime, finishedTime, timeVal)
}
}
}
}
tm.handleConditionalEdges(v, visited, discoveryTime, finishedTime, timeVal)
*timeVal++
finishedTime[v] = *timeVal
}
func (tm *DAG) handleConditionalEdges(v string, visited map[string]bool, discoveryTime, finishedTime map[string]int, time *int) {
node := tm.nodes[v]
for when, then := range tm.conditions[FromNode(node.Key)] {
if targetNodeKey, ok := tm.nodes[string(then)]; ok {
if !visited[targetNodeKey.Key] {
fmt.Printf("Conditional Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key)
tm.dfs(targetNodeKey.Key, visited, discoveryTime, finishedTime, time)
} else {
if discoveryTime[v] > discoveryTime[targetNodeKey.Key] {
fmt.Printf("Conditional Loop Edge [%s]: %s -> %s\n", when, v, targetNodeKey.Key)
}
}
}
}
}
func (tm *DAG) SaveDOTFile(filename string) error {
dotContent := tm.ExportDOT()
return os.WriteFile(filename, []byte(dotContent), 0644)
}
func (tm *DAG) SaveSVG(svgFile string) error {
return tm.saveImage(svgFile, "-Tsvg")
}
func (tm *DAG) SavePNG(pngFile string) error {
return tm.saveImage(pngFile, "-Tpng")
}
func (tm *DAG) saveImage(fileName string, arg string) error {
dotFile := fileName[:len(fileName)-4] + ".dot"
if err := tm.SaveDOTFile(dotFile); err != nil {
return err
}
defer func() {
os.Remove(dotFile)
}()
cmd := exec.Command("dot", arg, dotFile, "-o", fileName)
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to convert image: %w", err)
}
return nil
}
func (tm *DAG) ExportDOT() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n")
sortedNodes := tm.TopologicalSort()
for _, nodeKey := range sortedNodes {
node := tm.nodes[nodeKey]
nodeColor := "lightblue"
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
}
for _, nodeKey := range sortedNodes {
node := tm.nodes[nodeKey]
for _, edge := range node.Edges {
var edgeStyle string
switch edge.Type {
case Iterator:
edgeStyle = "dashed"
default:
edgeStyle = "solid"
}
for _, to := range edge.To {
edgeColor := "black"
if edge.Label == "Iterate" {
edgeColor = "blue"
} else if edge.Label == "PASS" {
edgeColor = "green"
} else if edge.Label == "FAIL" {
edgeColor = "red"
}
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
}
}
}
for fromNodeKey, conditions := range tm.conditions {
for when, then := range conditions {
if toNode, ok := tm.nodes[string(then)]; ok {
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when))
}
}
}
sb.WriteString("}\n")
return sb.String()
}
func (tm *DAG) TopologicalSort() []string {
visited := make(map[string]bool)
stack := []string{}
for _, node := range tm.nodes {
if !visited[node.Key] {
tm.topologicalSortUtil(node.Key, visited, &stack)
}
}
for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 {
stack[i], stack[j] = stack[j], stack[i]
}
return stack
}
func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) {
visited[v] = true
node := tm.nodes[v]
for _, edge := range node.Edges {
for _, to := range edge.To {
if !visited[to.Key] {
tm.topologicalSortUtil(to.Key, visited, stack)
}
}
}
*stack = append(*stack, v)
}

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -42,9 +43,9 @@ func main() {
// Classify edges // Classify edges
// d.ClassifyEdges() // d.ClassifyEdges()
fmt.Println(d.ExportDOT()) // fmt.Println(d.ExportDOT())
/*http.HandleFunc("POST /publish", requestHandler("publish")) http.HandleFunc("POST /publish", requestHandler("publish"))
http.HandleFunc("POST /request", requestHandler("request")) http.HandleFunc("POST /request", requestHandler("request"))
http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
id := request.PathValue("id") id := request.PathValue("id")
@@ -67,7 +68,7 @@ func main() {
err := d.Start(context.TODO(), ":8083") err := d.Start(context.TODO(), ":8083")
if err != nil { if err != nil {
panic(err) panic(err)
}*/ }
} }
func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) {

View File

@@ -9,13 +9,13 @@ import (
) )
type Result struct { type Result struct {
Payload json.RawMessage `json:"payload"`
Topic string `json:"topic"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at,omitempty"` ProcessedAt time.Time `json:"processed_at,omitempty"`
TaskID string `json:"task_id"`
Error error `json:"error,omitempty"` Error error `json:"error,omitempty"`
Topic string `json:"topic"`
TaskID string `json:"task_id"`
Status string `json:"status"` Status string `json:"status"`
Payload json.RawMessage `json:"payload"`
} }
func (r Result) Unmarshal(data any) error { func (r Result) Unmarshal(data any) error {
@@ -51,30 +51,30 @@ func (r Result) WithData(status string, data []byte) Result {
} }
type TLSConfig struct { type TLSConfig struct {
UseTLS bool
CertPath string CertPath string
KeyPath string KeyPath string
CAPath string CAPath string
UseTLS bool
} }
type Options struct { type Options struct {
syncMode bool
brokerAddr string
callback []func(context.Context, Result) Result
maxRetries int
consumerOnSubscribe func(ctx context.Context, topic, consumerName string) consumerOnSubscribe func(ctx context.Context, topic, consumerName string)
consumerOnClose func(ctx context.Context, topic, consumerName string) consumerOnClose func(ctx context.Context, topic, consumerName string)
notifyResponse func(context.Context, Result) notifyResponse func(context.Context, Result)
tlsConfig TLSConfig
brokerAddr string
callback []func(context.Context, Result) Result
aesKey json.RawMessage
hmacKey json.RawMessage
maxRetries int
initialDelay time.Duration initialDelay time.Duration
maxBackoff time.Duration maxBackoff time.Duration
jitterPercent float64 jitterPercent float64
tlsConfig TLSConfig
aesKey json.RawMessage
hmacKey json.RawMessage
enableEncryption bool
queueSize int queueSize int
numOfWorkers int numOfWorkers int
maxMemoryLoad int64 maxMemoryLoad int64
syncMode bool
enableEncryption bool
enableWorkerPool bool enableWorkerPool bool
} }

14
pool.go
View File

@@ -18,19 +18,19 @@ type QueueTask struct {
type Callback func(ctx context.Context, result Result) error type Callback func(ctx context.Context, result Result) error
type Pool struct { type Pool struct {
conn net.Conn
taskQueue chan QueueTask
stop chan struct{}
handler Handler
callback Callback
workerAdjust chan int // Channel for adjusting workers dynamically
wg sync.WaitGroup
totalMemoryUsed int64 totalMemoryUsed int64
completedTasks int completedTasks int
errorCount, maxMemoryLoad int64 errorCount, maxMemoryLoad int64
totalTasks int totalTasks int
numOfWorkers int32 // Change to int32 for atomic operations numOfWorkers int32 // Change to int32 for atomic operations
taskQueue chan QueueTask
wg sync.WaitGroup
paused bool paused bool
stop chan struct{}
handler Handler
callback Callback
conn net.Conn
workerAdjust chan int // Channel for adjusting workers dynamically
} }
func NewPool( func NewPool(

View File

@@ -5,9 +5,9 @@ import (
) )
type Queue struct { type Queue struct {
name string
consumers xsync.IMap[string, *consumer] consumers xsync.IMap[string, *consumer]
tasks chan *QueuedTask // channel to hold tasks tasks chan *QueuedTask // channel to hold tasks
name string
} }
func newQueue(name string, queueSize int) *Queue { func newQueue(name string, queueSize int) *Queue {