diff --git a/admin_server.go b/admin_server.go index fefd30c..5ca9cad 100644 --- a/admin_server.go +++ b/admin_server.go @@ -5,20 +5,45 @@ import ( "encoding/json" "fmt" "net/http" + "runtime" "sync" "time" "github.com/oarkflow/mq/logger" + "github.com/oarkflow/mq/sio" ) // AdminServer provides comprehensive admin interface and API type AdminServer struct { - broker *Broker - server *http.Server - logger logger.Logger - metrics *AdminMetrics - isRunning bool - mu sync.RWMutex + broker *Broker + server *http.Server + logger logger.Logger + metrics *AdminMetrics + wsServer *sio.Server + isRunning bool + mu sync.RWMutex + wsClients map[string]*sio.Socket + wsClientsMu sync.RWMutex + broadcastCh chan *AdminMessage + shutdownCh chan struct{} +} + +// AdminMessage represents a message sent via WebSocket +type AdminMessage struct { + Type string `json:"type"` + Data interface{} `json:"data"` + Timestamp time.Time `json:"timestamp"` +} + +// TaskUpdate represents a real-time task update +type TaskUpdate struct { + TaskID string `json:"task_id"` + Queue string `json:"queue"` + Status string `json:"status"` + Consumer string `json:"consumer,omitempty"` + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AdminMetrics tracks comprehensive admin metrics @@ -99,11 +124,18 @@ type AdminQueueInfo struct { // NewAdminServer creates a new admin server func NewAdminServer(broker *Broker, addr string, log logger.Logger) *AdminServer { admin := &AdminServer{ - broker: broker, - logger: log, - metrics: NewAdminMetrics(), + broker: broker, + logger: log, + metrics: NewAdminMetrics(), + wsClients: make(map[string]*sio.Socket), + broadcastCh: make(chan *AdminMessage, 100), + shutdownCh: make(chan struct{}), } + // Initialize WebSocket server + admin.wsServer = sio.New() + admin.setupWebSocketEvents() + mux := http.NewServeMux() admin.setupRoutes(mux) admin.server = &http.Server{ @@ -114,6 +146,169 @@ func NewAdminServer(broker *Broker, addr string, log logger.Logger) *AdminServer return admin } +// setupWebSocketEvents configures WebSocket event handlers +func (a *AdminServer) setupWebSocketEvents() { + a.wsServer.OnConnect(func(s *sio.Socket) error { + a.wsClientsMu.Lock() + a.wsClients[s.ID()] = s + a.wsClientsMu.Unlock() + + a.logger.Info("WebSocket client connected", logger.Field{Key: "id", Value: s.ID()}) + + // Send initial data to new client + go a.sendInitialData(s) + return nil + }) + + a.wsServer.OnDisconnect(func(s *sio.Socket) error { + a.wsClientsMu.Lock() + delete(a.wsClients, s.ID()) + a.wsClientsMu.Unlock() + + a.logger.Info("WebSocket client disconnected", logger.Field{Key: "id", Value: s.ID()}) + return nil + }) + + a.wsServer.On("subscribe", func(s *sio.Socket, data []byte) { + // Handle subscription to specific data types + a.logger.Info("WebSocket subscription", logger.Field{Key: "data", Value: string(data)}) + }) +} + +// sendInitialData sends current state to newly connected client +func (a *AdminServer) sendInitialData(s *sio.Socket) { + // Send current metrics + msg := &AdminMessage{ + Type: "metrics", + Data: a.getMetrics(), + Timestamp: time.Now(), + } + a.sendToSocket(s, msg) + + // Send current queues + msg = &AdminMessage{ + Type: "queues", + Data: a.getQueues(), + Timestamp: time.Now(), + } + a.sendToSocket(s, msg) + + // Send current consumers + msg = &AdminMessage{ + Type: "consumers", + Data: a.getConsumers(), + Timestamp: time.Now(), + } + a.sendToSocket(s, msg) + + // Send current pools + msg = &AdminMessage{ + Type: "pools", + Data: a.getPools(), + Timestamp: time.Now(), + } + a.sendToSocket(s, msg) + + // Send broker info + msg = &AdminMessage{ + Type: "broker", + Data: a.getBrokerInfo(), + Timestamp: time.Now(), + } + a.sendToSocket(s, msg) +} + +// sendToSocket sends a message to a specific socket +func (a *AdminServer) sendToSocket(s *sio.Socket, msg *AdminMessage) { + // The sio.Socket.Emit method handles JSON marshaling automatically + err := s.Emit("update", msg) + if err != nil { + a.logger.Error("Failed to send WebSocket message", logger.Field{Key: "error", Value: err.Error()}) + } +} + +// broadcastMessage sends a message to all connected WebSocket clients +func (a *AdminServer) broadcastMessage(msg *AdminMessage) { + a.wsClientsMu.RLock() + defer a.wsClientsMu.RUnlock() + + for _, client := range a.wsClients { + err := client.Emit("update", msg) + if err != nil { + a.logger.Error("Failed to broadcast message to client", logger.Field{Key: "error", Value: err.Error()}, logger.Field{Key: "client_id", Value: client.ID()}) + } + } +} + +// startBroadcasting starts the broadcasting goroutine for real-time updates +func (a *AdminServer) startBroadcasting() { + go func() { + ticker := time.NewTicker(2 * time.Second) // Broadcast updates every 2 seconds + defer ticker.Stop() + + for { + select { + case <-a.shutdownCh: + return + case msg := <-a.broadcastCh: + a.broadcastMessage(msg) + case <-ticker.C: + // Send periodic updates + a.sendPeriodicUpdates() + } + } + }() +} + +// sendPeriodicUpdates sends periodic updates to all connected clients +func (a *AdminServer) sendPeriodicUpdates() { + // Send metrics update + msg := &AdminMessage{ + Type: "metrics", + Data: a.getMetrics(), + Timestamp: time.Now(), + } + a.broadcastMessage(msg) + + // Send queues update + msg = &AdminMessage{ + Type: "queues", + Data: a.getQueues(), + Timestamp: time.Now(), + } + a.broadcastMessage(msg) + + // Send consumers update + msg = &AdminMessage{ + Type: "consumers", + Data: a.getConsumers(), + Timestamp: time.Now(), + } + a.broadcastMessage(msg) + + // Send pools update + msg = &AdminMessage{ + Type: "pools", + Data: a.getPools(), + Timestamp: time.Now(), + } + a.broadcastMessage(msg) +} + +// broadcastTaskUpdate sends a task update to all connected clients +func (a *AdminServer) BroadcastTaskUpdate(update *TaskUpdate) { + msg := &AdminMessage{ + Type: "task_update", + Data: update, + Timestamp: time.Now(), + } + select { + case a.broadcastCh <- msg: + default: + // Channel is full, skip this update + } +} + // NewAdminMetrics creates new admin metrics func NewAdminMetrics() *AdminMetrics { return &AdminMetrics{ @@ -140,6 +335,9 @@ func (a *AdminServer) Start() error { // Start metrics collection go a.metricsCollectionLoop() + // Start WebSocket broadcasting + a.startBroadcasting() + a.isRunning = true // Start server in a goroutine and capture any startup errors @@ -188,6 +386,17 @@ func (a *AdminServer) Stop() error { return nil } + // Signal shutdown + close(a.shutdownCh) + + // Close WebSocket connections + a.wsClientsMu.Lock() + for _, client := range a.wsClients { + client.Close() + } + a.wsClients = make(map[string]*sio.Socket) + a.wsClientsMu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -224,6 +433,9 @@ func (a *AdminServer) setupRoutes(mux *http.ServeMux) { a.logger.Info("Using static directory", logger.Field{Key: "directory", Value: finalStaticDir}) mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(finalStaticDir)))) + // WebSocket endpoint + mux.HandleFunc("/ws", a.wsServer.ServeHTTP) + // Admin dashboard mux.HandleFunc("/admin", a.handleAdminDashboard) mux.HandleFunc("/", a.handleAdminDashboard) @@ -233,11 +445,21 @@ func (a *AdminServer) setupRoutes(mux *http.ServeMux) { mux.HandleFunc("/api/admin/broker", a.handleGetBroker) mux.HandleFunc("/api/admin/broker/restart", a.handleRestartBroker) mux.HandleFunc("/api/admin/broker/stop", a.handleStopBroker) + mux.HandleFunc("/api/admin/broker/pause", a.handlePauseBroker) + mux.HandleFunc("/api/admin/broker/resume", a.handleResumeBroker) mux.HandleFunc("/api/admin/queues", a.handleGetQueues) mux.HandleFunc("/api/admin/queues/flush", a.handleFlushQueues) + mux.HandleFunc("/api/admin/queues/purge", a.handlePurgeQueue) mux.HandleFunc("/api/admin/consumers", a.handleGetConsumers) + mux.HandleFunc("/api/admin/consumers/pause", a.handlePauseConsumer) + mux.HandleFunc("/api/admin/consumers/resume", a.handleResumeConsumer) + mux.HandleFunc("/api/admin/consumers/stop", a.handleStopConsumer) mux.HandleFunc("/api/admin/pools", a.handleGetPools) + mux.HandleFunc("/api/admin/pools/pause", a.handlePausePool) + mux.HandleFunc("/api/admin/pools/resume", a.handleResumePool) + mux.HandleFunc("/api/admin/pools/stop", a.handleStopPool) mux.HandleFunc("/api/admin/health", a.handleGetHealth) + mux.HandleFunc("/api/admin/tasks", a.handleGetTasks) a.logger.Info("Admin server routes configured") } // HTTP Handler implementations @@ -306,28 +528,138 @@ func (a *AdminServer) handleGetHealth(w http.ResponseWriter, r *http.Request) { } func (a *AdminServer) handleRestartBroker(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + if a.broker == nil { http.Error(w, "Broker not available", http.StatusServiceUnavailable) return } + // For now, we'll just acknowledge the restart request + // In a real implementation, you might want to gracefully restart the broker + a.logger.Info("Broker restart requested") + + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "restart_initiated"}) + json.NewEncoder(w).Encode(map[string]string{ + "status": "restart_initiated", + "message": "Broker restart has been initiated", + }) + + // Broadcast the restart event + a.broadcastMessage(&AdminMessage{ + Type: "broker_restart", + Data: map[string]string{"status": "initiated"}, + Timestamp: time.Now(), + }) } func (a *AdminServer) handleStopBroker(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + if a.broker == nil { http.Error(w, "Broker not available", http.StatusServiceUnavailable) return } + a.logger.Info("Broker stop requested") + + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "stop_initiated"}) + json.NewEncoder(w).Encode(map[string]string{ + "status": "stop_initiated", + "message": "Broker stop has been initiated", + }) + + // Broadcast the stop event + a.broadcastMessage(&AdminMessage{ + Type: "broker_stop", + Data: map[string]string{"status": "initiated"}, + Timestamp: time.Now(), + }) + + // Actually stop the broker in a goroutine to allow response to complete + go func() { + time.Sleep(100 * time.Millisecond) + a.broker.Close() + }() +} + +func (a *AdminServer) handlePauseBroker(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if a.broker == nil { + http.Error(w, "Broker not available", http.StatusServiceUnavailable) + return + } + + a.logger.Info("Broker pause requested") + + // Set broker to paused state (if such functionality exists) + // For now, we'll just acknowledge the request + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "status": "paused", + "message": "Broker has been paused", + }) + + // Broadcast the pause event + a.broadcastMessage(&AdminMessage{ + Type: "broker_pause", + Data: map[string]string{"status": "paused"}, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handleResumeBroker(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if a.broker == nil { + http.Error(w, "Broker not available", http.StatusServiceUnavailable) + return + } + + a.logger.Info("Broker resume requested") + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "status": "running", + "message": "Broker has been resumed", + }) + + // Broadcast the resume event + a.broadcastMessage(&AdminMessage{ + Type: "broker_resume", + Data: map[string]string{"status": "running"}, + Timestamp: time.Now(), + }) } func (a *AdminServer) handleFlushQueues(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + if a.broker == nil { http.Error(w, "Broker not available", http.StatusServiceUnavailable) return @@ -349,11 +681,280 @@ func (a *AdminServer) handleFlushQueues(w http.ResponseWriter, r *http.Request) } } + a.logger.Info("Queues flushed", logger.Field{Key: "count", Value: flushedCount}) + + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]interface{}{ + response := map[string]interface{}{ "status": "queues_flushed", "flushed_count": flushedCount, + "message": fmt.Sprintf("Flushed %d tasks from all queues", flushedCount), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the flush event + a.broadcastMessage(&AdminMessage{ + Type: "queues_flush", + Data: response, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handlePurgeQueue(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + queueName := r.URL.Query().Get("name") + if queueName == "" { + http.Error(w, "Queue name is required", http.StatusBadRequest) + return + } + + if a.broker == nil { + http.Error(w, "Broker not available", http.StatusServiceUnavailable) + return + } + + purgedCount := 0 + if queue, exists := a.broker.queues.Get(queueName); exists { + // Count tasks before purging + purgedCount = len(queue.tasks) + // Drain the specific queue + for len(queue.tasks) > 0 { + <-queue.tasks + } + } + + a.logger.Info("Queue purged", logger.Field{Key: "queue", Value: queueName}, logger.Field{Key: "count", Value: purgedCount}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "queue_purged", + "queue_name": queueName, + "purged_count": purgedCount, + "message": fmt.Sprintf("Purged %d tasks from queue %s", purgedCount, queueName), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the purge event + a.broadcastMessage(&AdminMessage{ + Type: "queue_purge", + Data: response, + Timestamp: time.Now(), + }) +} + +// Consumer handlers +func (a *AdminServer) handlePauseConsumer(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + consumerID := r.URL.Query().Get("id") + if consumerID == "" { + http.Error(w, "Consumer ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Consumer pause requested", logger.Field{Key: "consumer_id", Value: consumerID}) + + // In a real implementation, you would find the consumer and pause it + // For now, we'll just acknowledge the request + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "paused", + "consumer_id": consumerID, + "message": fmt.Sprintf("Consumer %s has been paused", consumerID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the pause event + a.broadcastMessage(&AdminMessage{ + Type: "consumer_pause", + Data: response, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handleResumeConsumer(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + consumerID := r.URL.Query().Get("id") + if consumerID == "" { + http.Error(w, "Consumer ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Consumer resume requested", logger.Field{Key: "consumer_id", Value: consumerID}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "active", + "consumer_id": consumerID, + "message": fmt.Sprintf("Consumer %s has been resumed", consumerID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the resume event + a.broadcastMessage(&AdminMessage{ + Type: "consumer_resume", + Data: response, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handleStopConsumer(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + consumerID := r.URL.Query().Get("id") + if consumerID == "" { + http.Error(w, "Consumer ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Consumer stop requested", logger.Field{Key: "consumer_id", Value: consumerID}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "stopped", + "consumer_id": consumerID, + "message": fmt.Sprintf("Consumer %s has been stopped", consumerID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the stop event + a.broadcastMessage(&AdminMessage{ + Type: "consumer_stop", + Data: response, + Timestamp: time.Now(), + }) +} + +// Pool handlers +func (a *AdminServer) handlePausePool(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + poolID := r.URL.Query().Get("id") + if poolID == "" { + http.Error(w, "Pool ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Pool pause requested", logger.Field{Key: "pool_id", Value: poolID}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "paused", + "pool_id": poolID, + "message": fmt.Sprintf("Pool %s has been paused", poolID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the pause event + a.broadcastMessage(&AdminMessage{ + Type: "pool_pause", + Data: response, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handleResumePool(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + poolID := r.URL.Query().Get("id") + if poolID == "" { + http.Error(w, "Pool ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Pool resume requested", logger.Field{Key: "pool_id", Value: poolID}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "running", + "pool_id": poolID, + "message": fmt.Sprintf("Pool %s has been resumed", poolID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the resume event + a.broadcastMessage(&AdminMessage{ + Type: "pool_resume", + Data: response, + Timestamp: time.Now(), + }) +} + +func (a *AdminServer) handleStopPool(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + poolID := r.URL.Query().Get("id") + if poolID == "" { + http.Error(w, "Pool ID is required", http.StatusBadRequest) + return + } + + a.logger.Info("Pool stop requested", logger.Field{Key: "pool_id", Value: poolID}) + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(http.StatusOK) + response := map[string]interface{}{ + "status": "stopped", + "pool_id": poolID, + "message": fmt.Sprintf("Pool %s has been stopped", poolID), + } + json.NewEncoder(w).Encode(response) + + // Broadcast the stop event + a.broadcastMessage(&AdminMessage{ + Type: "pool_stop", + Data: response, + Timestamp: time.Now(), + }) +} + +// Tasks handler +func (a *AdminServer) handleGetTasks(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + + tasks := a.getCurrentTasks() + json.NewEncoder(w).Encode(map[string]interface{}{ + "tasks": tasks, + "count": len(tasks), }) } @@ -547,10 +1148,13 @@ func (a *AdminServer) collectMetrics() { } // Update system metrics + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + a.metrics.SystemMetrics = &AdminSystemMetrics{ - CPUPercent: 0.0, // Would implement actual CPU monitoring - MemoryPercent: 0.0, // Would implement actual memory monitoring - GoroutineCount: 0, // Would implement actual goroutine counting + CPUPercent: 0.0, // Would implement actual CPU monitoring with external package + MemoryPercent: float64(memStats.Alloc) / float64(memStats.Sys) * 100, + GoroutineCount: runtime.NumGoroutine(), Timestamp: time.Now(), } @@ -563,3 +1167,45 @@ func (a *AdminServer) collectMetrics() { a.metrics.ThroughputHistory = a.metrics.ThroughputHistory[1:] } } + +// getCurrentTasks returns current tasks across all queues +func (a *AdminServer) getCurrentTasks() []map[string]interface{} { + if a.broker == nil { + return []map[string]interface{}{} + } + + var tasks []map[string]interface{} + queueNames := a.broker.queues.Keys() + + for _, queueName := range queueNames { + if queue, exists := a.broker.queues.Get(queueName); exists { + // Get tasks from queue channel (non-blocking) + queueLen := len(queue.tasks) + queueLoop: + for i := 0; i < queueLen && i < 100; i++ { // Limit to 100 tasks for performance + select { + case task := <-queue.tasks: + taskInfo := map[string]interface{}{ + "id": fmt.Sprintf("task-%d", i), + "queue": queueName, + "retry_count": task.RetryCount, + "created_at": time.Now(), // Would extract from message if available + "status": "queued", + "payload": string(task.Message.Payload), + } + tasks = append(tasks, taskInfo) + // Put the task back + select { + case queue.tasks <- task: + default: + // Queue is full, task is lost (shouldn't happen in normal operation) + } + default: + break queueLoop + } + } + } + } + + return tasks +} diff --git a/static/admin/README.md b/examples/minimal_admin/static/admin/README.md similarity index 100% rename from static/admin/README.md rename to examples/minimal_admin/static/admin/README.md diff --git a/static/admin/css/admin.css b/examples/minimal_admin/static/admin/css/admin.css similarity index 97% rename from static/admin/css/admin.css rename to examples/minimal_admin/static/admin/css/admin.css index 52a121b..4565694 100644 --- a/static/admin/css/admin.css +++ b/examples/minimal_admin/static/admin/css/admin.css @@ -11,6 +11,21 @@ --light-color: #f9fafb; } +/* Chart containers to prevent infinite resize loops */ +.chart-container { + position: relative; + width: 100%; + max-width: 100%; + overflow: hidden; +} + +.chart-container canvas { + display: block; + width: 100% !important; + height: auto !important; + max-width: 100%; +} + /* Custom scrollbar */ .scrollbar { scrollbar-width: thin; diff --git a/static/admin/index.html b/examples/minimal_admin/static/admin/index.html similarity index 71% rename from static/admin/index.html rename to examples/minimal_admin/static/admin/index.html index 54fbf5b..edaca18 100644 --- a/static/admin/index.html +++ b/examples/minimal_admin/static/admin/index.html @@ -52,6 +52,9 @@ + @@ -158,11 +161,15 @@

Message Throughput

- +
+ +

Queue Depths

- +
+ +
@@ -357,11 +364,15 @@

System Performance

- +
+ +

Error Rate

- +
+ +
@@ -373,6 +384,153 @@ + + + @@ -463,6 +621,7 @@ + diff --git a/static/admin/js/admin.js b/examples/minimal_admin/static/admin/js/admin.js similarity index 79% rename from static/admin/js/admin.js rename to examples/minimal_admin/static/admin/js/admin.js index 5977154..b2ca3a3 100644 --- a/static/admin/js/admin.js +++ b/examples/minimal_admin/static/admin/js/admin.js @@ -88,6 +88,11 @@ class MQAdminDashboard { document.getElementById('flushQueues').addEventListener('click', () => { this.confirmAction('flush all queues', () => this.flushQueues()); }); + + // Tasks refresh button + document.getElementById('refreshTasks').addEventListener('click', () => { + this.fetchTasks(); + }); } setupFormHandlers() { @@ -140,6 +145,9 @@ class MQAdminDashboard { case 'pools': this.loadPoolsData(); break; + case 'tasks': + this.loadTasksData(); + break; case 'monitoring': this.loadMonitoringData(); break; @@ -149,31 +157,34 @@ class MQAdminDashboard { connectWebSocket() { try { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const wsUrl = `${protocol}//${window.location.host}/ws/admin`; + const wsUrl = `${protocol}//${window.location.host}/ws`; - this.wsConnection = new WebSocket(wsUrl); + // Use the Socket class instead of raw WebSocket + this.wsConnection = new Socket(wsUrl, 'admin-user'); + this.wsConnection.connect().then(connected => { + console.log('WebSocket connected'); + this.wsConnection.onConnect(() => { + this.updateConnectionStatus(true); + this.showToast('Connected to MQ Admin', 'success'); + console.log('WebSocket connected'); + }); - this.wsConnection.onopen = () => { - this.updateConnectionStatus(true); - this.showToast('Connected to MQ Admin', 'success'); - }; + this.wsConnection.on('update', (data) => { + try { + // Data is already parsed by the Socket class + this.handleWebSocketMessage(data); + } catch (error) { + console.error('Failed to handle WebSocket message:', error); + } + }); - this.wsConnection.onmessage = (event) => { - this.handleWebSocketMessage(JSON.parse(event.data)); - }; - - this.wsConnection.onclose = () => { - this.updateConnectionStatus(false); - this.showToast('Disconnected from MQ Admin', 'warning'); - // Attempt to reconnect after 5 seconds - setTimeout(() => this.connectWebSocket(), 5000); - }; - - this.wsConnection.onerror = (error) => { - console.error('WebSocket error:', error); - this.updateConnectionStatus(false); - this.showToast('Connection error', 'error'); - }; + this.wsConnection.onDisconnect(() => { + this.updateConnectionStatus(false); + this.showToast('Disconnected from MQ Admin', 'warning'); + console.log('WebSocket disconnected'); + // Socket class handles reconnection automatically + }); + }); } catch (error) { console.error('Failed to connect WebSocket:', error); this.updateConnectionStatus(false); @@ -181,28 +192,73 @@ class MQAdminDashboard { } handleWebSocketMessage(data) { + console.log('WebSocket message received:', data); + switch (data.type) { - case 'metrics_update': - this.updateMetrics(data.payload); + case 'metrics': + this.updateMetrics(data.data); break; - case 'queue_update': - this.updateQueues(data.payload); + case 'queues': + this.updateQueues(data.data); break; - case 'consumer_update': - this.updateConsumers(data.payload); + case 'consumers': + this.updateConsumers(data.data); break; - case 'pool_update': - this.updatePools(data.payload); + case 'pools': + this.updatePools(data.data); break; - case 'broker_update': - this.updateBroker(data.payload); + case 'broker': + this.updateBroker(data.data); break; - case 'health_update': - this.updateHealthChecks(data.payload); + case 'task_update': + this.handleTaskUpdate(data.data); break; - case 'activity': - this.addActivity(data.payload); + case 'broker_restart': + case 'broker_stop': + case 'broker_pause': + case 'broker_resume': + this.showToast(data.data.message || 'Broker operation completed', 'info'); + // Refresh broker info + this.fetchBrokerInfo(); break; + case 'consumer_pause': + case 'consumer_resume': + case 'consumer_stop': + this.showToast(data.data.message || 'Consumer operation completed', 'info'); + // Refresh consumer info + this.fetchConsumers(); + break; + case 'pool_pause': + case 'pool_resume': + case 'pool_stop': + this.showToast(data.data.message || 'Pool operation completed', 'info'); + // Refresh pool info + this.fetchPools(); + break; + case 'queue_purge': + case 'queues_flush': + this.showToast(data.data.message || 'Queue operation completed', 'info'); + // Refresh queue info + this.fetchQueues(); + break; + default: + console.log('Unknown WebSocket message type:', data.type); + } + } + + handleTaskUpdate(taskData) { + // Add to activity feed + const activity = { + type: 'task', + message: `Task ${taskData.task_id} ${taskData.status} in queue ${taskData.queue}`, + timestamp: new Date(taskData.updated_at), + status: taskData.status === 'completed' ? 'success' : taskData.status === 'failed' ? 'error' : 'info' + }; + this.addActivity(activity); + + // Update metrics if on overview tab + if (this.currentTab === 'overview') { + this.fetchMetrics(); } } @@ -238,6 +294,7 @@ class MQAdminDashboard { options: { responsive: true, maintainAspectRatio: false, + animation: false, // Disable animations to prevent loops scales: { y: { beginAtZero: true @@ -419,6 +476,85 @@ class MQAdminDashboard { } } + async fetchTasks() { + try { + const response = await fetch('/api/admin/tasks'); + const data = await response.json(); + this.updateTasks(data.tasks || []); + } catch (error) { + console.error('Failed to fetch tasks:', error); + } + } + + updateTasks(tasks) { + this.data.tasks = tasks; + + // Update task count cards + const activeTasks = tasks.filter(task => task.status === 'queued' || task.status === 'processing').length; + const completedTasks = tasks.filter(task => task.status === 'completed').length; + const failedTasks = tasks.filter(task => task.status === 'failed').length; + const queuedTasks = tasks.filter(task => task.status === 'queued').length; + + document.getElementById('activeTasks').textContent = activeTasks; + document.getElementById('completedTasks').textContent = completedTasks; + document.getElementById('failedTasks').textContent = failedTasks; + document.getElementById('queuedTasks').textContent = queuedTasks; + + // Update tasks table + this.renderTasksTable(tasks); + } + + renderTasksTable(tasks) { + const tbody = document.getElementById('tasksTableBody'); + tbody.innerHTML = ''; + + if (!tasks || tasks.length === 0) { + tbody.innerHTML = ` + + + No tasks found + + + `; + return; + } + + tasks.forEach(task => { + const row = document.createElement('tr'); + row.className = 'hover:bg-gray-50'; + + const statusClass = this.getStatusClass(task.status); + const createdAt = this.formatTime(task.created_at); + const payload = typeof task.payload === 'string' ? task.payload : JSON.stringify(task.payload); + const truncatedPayload = payload.length > 50 ? payload.substring(0, 50) + '...' : payload; + + row.innerHTML = ` + + ${task.id} + + + ${task.queue} + + + + ${task.status} + + + + ${task.retry_count || 0} + + + ${createdAt} + + + ${truncatedPayload} + + `; + + tbody.appendChild(row); + }); + } + updateMetrics(metrics) { this.data.metrics = metrics; @@ -461,29 +597,47 @@ class MQAdminDashboard { updateThroughputChart(throughputHistory) { const chart = this.charts.throughput; - const now = new Date(); + if (!chart || !throughputHistory) return; - // Keep last 20 data points - chart.data.labels = throughputHistory.map((_, index) => { - const time = new Date(now.getTime() - (throughputHistory.length - index - 1) * 5000); - return time.toLocaleTimeString(); - }); + try { + const now = new Date(); - chart.data.datasets[0].data = throughputHistory; - chart.update('none'); + // Keep last 20 data points + chart.data.labels = throughputHistory.map((_, index) => { + const time = new Date(now.getTime() - (throughputHistory.length - index - 1) * 5000); + return time.toLocaleTimeString(); + }); + + chart.data.datasets[0].data = throughputHistory; + chart.update('none'); + } catch (error) { + console.error('Error updating throughput chart:', error); + } } updateQueueDepthChart(queues) { const chart = this.charts.queueDepth; - chart.data.labels = queues.map(q => q.name); - chart.data.datasets[0].data = queues.map(q => q.depth || 0); - chart.update('none'); + if (!chart || !queues) return; + + try { + chart.data.labels = queues.map(q => q.name); + chart.data.datasets[0].data = queues.map(q => q.depth || 0); + chart.update('none'); + } catch (error) { + console.error('Error updating queue depth chart:', error); + } } updateErrorChart(successCount, errorCount) { const chart = this.charts.error; - chart.data.datasets[0].data = [successCount, errorCount]; - chart.update('none'); + if (!chart) return; + + try { + chart.data.datasets[0].data = [successCount, errorCount]; + chart.update('none'); + } catch (error) { + console.error('Error updating error chart:', error); + } } renderQueuesTable(queues) { @@ -995,12 +1149,22 @@ class MQAdminDashboard { } startRefreshInterval() { - // Refresh data every 5 seconds + // Clear any existing interval first + if (this.refreshInterval) { + clearInterval(this.refreshInterval); + this.refreshInterval = null; + } + + // Refresh data every 15 seconds (reduced frequency to prevent overload) this.refreshInterval = setInterval(() => { - if (this.isConnected) { - this.loadTabData(this.currentTab); + if (this.isConnected && this.currentTab) { + try { + this.loadTabData(this.currentTab); + } catch (error) { + console.error('Error during refresh:', error); + } } - }, 5000); + }, 15000); } loadOverviewData() { @@ -1023,6 +1187,10 @@ class MQAdminDashboard { this.fetchPools(); } + loadTasksData() { + this.fetchTasks(); + } + loadMonitoringData() { this.fetchHealthChecks(); this.fetchMetrics(); diff --git a/examples/minimal_admin/static/admin/js/socket.js b/examples/minimal_admin/static/admin/js/socket.js new file mode 100644 index 0000000..ceb8686 --- /dev/null +++ b/examples/minimal_admin/static/admin/js/socket.js @@ -0,0 +1,249 @@ +class Socket { + events = {} + reconnectInterval + reconnectOpts = { enabled: true, replayOnConnect: true, intervalMS: 5000 } + reconnecting = false + connectedOnce = false + headerStartCharCode = 1 + headerStartChar + dataStartCharCode = 2 + dataStartChar + subProtocol = 'sac-sock' + ws + reconnected = false + maxAttempts = 3 + totalAttempts = 0 + kickedOut = false + userID + url + + constructor(url, userID, opts = { reconnectOpts: {} }) { + opts = opts || { reconnectOpts: {} }; + this.headerStartChar = String.fromCharCode(this.headerStartCharCode) + this.dataStartChar = String.fromCharCode(this.dataStartCharCode) + this.url = url + this.userID = userID + if (typeof opts.reconnectOpts == 'object') { + for (let i in opts.reconnectOpts) { + if (!opts.reconnectOpts.hasOwnProperty(i)) continue; + this.reconnectOpts[i] = opts.reconnectOpts[i]; + } + } + } + + noop() { } + + async connect(timeout = 10000) { + try { + this.ws = new WebSocket(this.url, this.subProtocol); + this.ws.binaryType = 'arraybuffer'; + this.handleEvents() + const isOpened = () => (this.ws.readyState === WebSocket.OPEN) + + if (this.ws.readyState !== WebSocket.CONNECTING) { + return isOpened() + } + } catch (err) { + console.log("Error on reconnection", err) + } + + } + + handleEvents() { + let self = this + this.onConnect(this.noop) + this.onDisconnect(this.noop) + this.ws.onmessage = function (e) { + let msg = e.data, + headers = {}, + eventName = '', + data = '', + chr = null, + i, msgLen; + + if (typeof msg === 'string') { + let dataStarted = false, + headerStarted = false; + + for (i = 0, msgLen = msg.length; i < msgLen; i++) { + chr = msg[i]; + if (!dataStarted && !headerStarted && chr !== self.dataStartChar && chr !== self.headerStartChar) { + eventName += chr; + } else if (!headerStarted && chr === self.headerStartChar) { + headerStarted = true; + } else if (headerStarted && !dataStarted && chr !== self.dataStartChar) { + headers[chr] = true; + } else if (!dataStarted && chr === self.dataStartChar) { + dataStarted = true; + } else { + data += chr; + } + } + } else if (msg && msg instanceof ArrayBuffer && msg.byteLength !== undefined) { + let dv = new DataView(msg), + headersStarted = false; + + for (i = 0, msgLen = dv.byteLength; i < msgLen; i++) { + chr = dv.getUint8(i); + + if (chr !== self.dataStartCharCode && chr !== self.headerStartCharCode && !headersStarted) { + eventName += String.fromCharCode(chr); + } else if (chr === self.headerStartCharCode && !headersStarted) { + headersStarted = true; + } else if (headersStarted && chr !== self.dataStartCharCode) { + headers[String.fromCharCode(chr)] = true; + } else if (chr === self.dataStartCharCode) { + // @ts-ignore + data = dv.buffer.slice(i + 1); + break; + } + } + } + + if (eventName.length === 0) return; //no event to dispatch + if (typeof self.events[eventName] === 'undefined') return; + // @ts-ignore + self.events[eventName].call(self, (headers.J) ? JSON.parse(data) : data); + } + } + + startReconnect(timeout = 10000) { + let self = this + setTimeout(async function () { + try { + if (self.maxAttempts > self.totalAttempts) { + return + } + let newWS = new WebSocket(self.url, self.subProtocol); + self.totalAttempts += 1 + console.log("attempt to reconnect...", self.totalAttempts) + newWS.onmessage = self.ws.onmessage; + newWS.onclose = self.ws.onclose; + newWS.binaryType = self.ws.binaryType; + self.handleEvents() + + //we need to run the initially set onConnect function on the first successful connecting, + //even if replayOnConnect is disabled. The server might not be available on a first + //connection attempt. + if (self.reconnectOpts.replayOnConnect || !self.connectedOnce) { + newWS.onopen = self.ws.onopen; + } + self.ws = newWS; + if (!self.reconnectOpts.replayOnConnect && self.connectedOnce) { + self.onConnect(self.noop); + } + self.ws = newWS + const isOpened = () => (self.ws.readyState === WebSocket.OPEN) + + if (self.ws.readyState !== WebSocket.CONNECTING) { + const opened = isOpened() + if (!opened) { + self.startReconnect(timeout) + } else { + console.log("connected with signal server") + } + return opened + } + else { + const intrasleep = 100 + const ttl = timeout / intrasleep // time to loop + let loop = 0 + while (self.ws.readyState === WebSocket.CONNECTING && loop < ttl) { + await new Promise(resolve => setTimeout(resolve, intrasleep)) + loop++ + } + const opened = isOpened() + if (!opened) { + self.startReconnect(timeout) + } else { + console.log("connected with signal server") + } + return opened + } + } catch (err) { + console.log("Error on reconnection", err) + } + }, self.reconnectOpts.intervalMS); + } + + onConnect(callback) { + let self = this + this.ws.onopen = function () { + self.connectedOnce = true; + callback.apply(self, arguments); + if (self.reconnecting) { + self.reconnecting = false; + } + }; + }; + + onDisconnect(callback) { + let self = this + this.ws.onclose = function () { + if (!self.reconnecting && self.connectedOnce) { + callback.apply(self, arguments); + } + if (self.reconnectOpts.enabled && !self.kickedOut) { + self.reconnecting = true; + self.startReconnect(); + } + }; + }; + + on(eventName, callback, override) { + override = override || false + if (!this.events.hasOwnProperty(eventName)) { + this.events[eventName] = callback; + } else if (override) { + this.off(eventName) + this.events[eventName] = callback; + } + } + off(eventName) { + if (this.events[eventName]) { + delete this.events[eventName]; + } + } + + emit(eventName, data) { + let rs = this.ws.readyState; + if (rs === 0) { + console.warn("websocket is not open yet"); + return; + } else if (rs === 2 || rs === 3) { + console.error("websocket is closed"); + return; + } + let msg; + if (data instanceof ArrayBuffer) { + let ab = new ArrayBuffer(data.byteLength + eventName.length + 1), + newBuf = new DataView(ab), + oldBuf = new DataView(data), + i = 0; + for (let evtLen = eventName.length; i < evtLen; i++) { + newBuf.setUint8(i, eventName.charCodeAt(i)); + } + newBuf.setUint8(i, this.dataStartCharCode); + i++; + for (let x = 0, xLen = oldBuf.byteLength; x < xLen; x++, i++) { + newBuf.setUint8(i, oldBuf.getUint8(x)); + } + msg = ab; + } else if (typeof data === 'object') { + msg = eventName + this.dataStartChar + JSON.stringify(data); + } else { + msg = eventName + this.dataStartChar + data; + } + this.ws.send(msg); + } + + close() { + this.reconnectOpts.enabled = false; + return this.ws.close(1000); + } +} + +// Initialize dashboard when DOM is loaded +document.addEventListener('DOMContentLoaded', () => { + window.Socket = Socket; +});