This commit is contained in:
Oarkflow
2025-09-19 16:24:45 +05:45
parent a5523fe030
commit b7ca2a8aeb
2 changed files with 1262 additions and 168 deletions

View File

@@ -5,10 +5,13 @@ import (
"fmt"
"regexp"
"strings"
"time"
"github.com/oarkflow/json"
"github.com/oarkflow/jet"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/dag"
)
@@ -28,75 +31,52 @@ func loginSubDAG() *dag.DAG {
return login
}
// phoneProcessingSubDAG creates a sub-DAG for processing phone numbers
func phoneProcessingSubDAG() *dag.DAG {
phone := dag.NewDAG("Phone Processing Sub DAG", "phone-processing-sub-dag", func(taskID string, result mq.Result) {
fmt.Printf("Phone Processing Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
phone.
AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}).
AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}).
AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}).
AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}).
AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}).
AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}).
AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop").
AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone").
AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}).
AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid").
AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid").
AddEdge(dag.Simple, "Loop to Collect", "phone-loop", "collect-valid")
return phone
}
func main() {
flow := dag.NewDAG("Complex Phone Processing DAG with Pages", "complex-phone-dag", func(taskID string, result mq.Result) {
fmt.Printf("Complex DAG Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.ConfigureMemoryStorage()
// Main nodes
flow.AddNode(dag.Function, "Initialize", "init", &Initialize{}, true)
flow.AddDAGNode(dag.Function, "Login Process", "login", loginSubDAG())
flow.AddNode(dag.Function, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
flow.AddDAGNode(dag.Function, "Process Phones", "process-phones", phoneProcessingSubDAG())
// Main nodes - Login process as individual nodes (not sub-DAG) for proper page serving
flow.AddNode(dag.Page, "Initialize", "init", &Initialize{}, true)
flow.AddNode(dag.Page, "Login Page", "login-page", &LoginPage{})
flow.AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{})
flow.AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{})
flow.AddNode(dag.Page, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
flow.AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{})
flow.AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{})
flow.AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{})
flow.AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{})
flow.AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{})
flow.AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{})
flow.AddNode(dag.Function, "Generate Report", "generate-report", &GenerateReport{})
flow.AddNode(dag.Function, "Send Summary Email", "send-summary", &SendSummaryEmail{})
flow.AddNode(dag.Function, "Final Cleanup", "cleanup", &FinalCleanup{})
// Edges
flow.AddEdge(dag.Simple, "Init to Login", "init", "login")
flow.AddEdge(dag.Simple, "Login to Upload", "login", "upload-page")
flow.AddEdge(dag.Simple, "Upload to Process", "upload-page", "process-phones")
flow.AddEdge(dag.Simple, "Process to Report", "process-phones", "generate-report")
// Edges - Connect login flow individually
flow.AddEdge(dag.Simple, "Init to Login", "init", "login-page")
flow.AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials")
flow.AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token")
flow.AddEdge(dag.Simple, "Token to Upload", "generate-token", "upload-page")
flow.AddEdge(dag.Simple, "Upload to Parse", "upload-page", "parse-phones")
flow.AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop")
flow.AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone")
flow.AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"})
flow.AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid")
flow.AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid")
flow.AddEdge(dag.Simple, "Loop to Report", "phone-loop", "generate-report")
flow.AddEdge(dag.Simple, "Report to Summary", "generate-report", "send-summary")
flow.AddEdge(dag.Simple, "Summary to Cleanup", "send-summary", "cleanup")
// Sample data for testing
data := map[string]interface{}{
"user_id": "user123",
"session_data": map[string]interface{}{
"authenticated": false,
},
"phone_data": map[string]interface{}{
"format": "csv",
"content": "name,phone\nJohn Doe,+1234567890\nJane Smith,+1987654321\nBob Johnson,invalid-phone\nAlice Brown,+1555123456",
},
}
// Check for DAG errors
// if flow.Error != nil {
// fmt.Printf("DAG Error: %v\n", flow.Error)
// panic(flow.Error)
// }
jsonData, _ := json.Marshal(data)
if flow.Error != nil {
panic(flow.Error)
}
rs := flow.Process(context.Background(), jsonData)
if rs.Error != nil {
panic(rs.Error)
}
fmt.Println("Complex Phone DAG Status:", rs.Status, "Topic:", rs.Topic)
fmt.Println("Final Payload:", string(rs.Payload))
fmt.Println("Starting Complex Phone Processing DAG server on http://0.0.0.0:8080")
fmt.Println("Navigate to the URL to access the login page")
flow.Start(context.Background(), ":8080")
}
// Task implementations
@@ -108,12 +88,151 @@ type Initialize struct {
func (p *Initialize) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("Initialize Error: %s", err.Error()), Ctx: ctx}
data = make(map[string]interface{})
}
data["initialized"] = true
data["timestamp"] = "2025-09-19T12:00:00Z"
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
// Add sample phone data for testing
sampleCSV := `name,phone
John Doe,+1234567890
Jane Smith,0987654321
Bob Johnson,1555123456
Alice Brown,invalid-phone
Charlie Wilson,+441234567890`
data["phone_data"] = map[string]interface{}{
"content": sampleCSV,
"format": "csv",
"source": "sample_data",
"created_at": "2025-09-19T12:00:00Z",
}
// Generate a task ID for this workflow instance
taskID := "workflow-" + fmt.Sprintf("%d", time.Now().Unix())
// Since this is a page node, show a welcome page that auto-redirects to login
htmlContent := `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta http-equiv="refresh" content="3;url=/process">
<title>Phone Processing System</title>
<style>
body {
font-family: Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
text-align: center;
padding: 50px;
margin: 0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.welcome {
background: rgba(255, 255, 255, 0.1);
padding: 40px;
border-radius: 15px;
backdrop-filter: blur(10px);
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
max-width: 500px;
width: 100%;
}
.welcome h1 {
margin-bottom: 20px;
font-size: 2.5em;
}
.welcome p {
margin-bottom: 30px;
font-size: 1.1em;
opacity: 0.9;
}
.features {
margin-top: 30px;
text-align: left;
opacity: 0.8;
}
.features h3 {
margin-bottom: 15px;
color: #fff;
}
.features ul {
list-style: none;
padding: 0;
}
.features li {
margin-bottom: 8px;
padding-left: 20px;
position: relative;
}
.features li:before {
content: "✓";
position: absolute;
left: 0;
color: #4CAF50;
}
.countdown {
margin-top: 20px;
font-size: 1.2em;
opacity: 0.9;
}
</style>
</head>
<body>
<div class="welcome">
<h1>📱 Phone Processing System</h1>
<p>Welcome to our advanced phone number processing workflow</p>
<div class="features">
<h3>Features:</h3>
<ul>
<li>CSV/JSON file upload support</li>
<li>Phone number validation and formatting</li>
<li>Automated welcome SMS sending</li>
<li>Invalid number filtering</li>
<li>Comprehensive processing reports</li>
</ul>
</div>
<div class="countdown">
<p>Initializing workflow...</p>
<p>Task ID: ` + taskID + `</p>
<p>Redirecting to login page in <span id="countdown">3</span> seconds...</p>
</div>
</div>
<script>
let countdown = 3;
const countdownElement = document.getElementById('countdown');
const interval = setInterval(() => {
countdown--;
countdownElement.textContent = countdown;
if (countdown <= 0) {
clearInterval(interval);
}
}, 1000);
</script>
</body>
</html>`
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
resultData := map[string]any{
"html_content": rs,
"step": "initialize",
"data": data,
}
resultPayload, _ := json.Marshal(resultData)
return mq.Result{Payload: resultPayload, Ctx: ctx}
}
type LoginPage struct {
@@ -121,21 +240,177 @@ type LoginPage struct {
}
func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Check if this is a form submission
var inputData map[string]interface{}
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
// Check if we have form data (username/password)
if formData, ok := inputData["form"].(map[string]interface{}); ok {
// This is a form submission, pass it through for verification
credentials := map[string]interface{}{
"username": formData["username"],
"password": formData["password"],
}
inputData["credentials"] = credentials
updatedPayload, _ := json.Marshal(inputData)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}
}
}
// Otherwise, show the form
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("LoginPage Error: %s", err.Error()), Ctx: ctx}
data = make(map[string]interface{})
}
// Simulate user input from page
data["credentials"] = map[string]interface{}{
"username": "admin",
"password": "password123",
}
data["login_attempted"] = true
// HTML content for login page
htmlContent := `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Phone Processing System - Login</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
margin: 0;
padding: 0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.login-container {
background: white;
padding: 2rem;
border-radius: 10px;
box-shadow: 0 10px 25px rgba(0,0,0,0.2);
width: 100%;
max-width: 400px;
}
.login-header {
text-align: center;
margin-bottom: 2rem;
}
.login-header h1 {
color: #333;
margin: 0;
font-size: 1.8rem;
}
.login-header p {
color: #666;
margin: 0.5rem 0 0 0;
}
.form-group {
margin-bottom: 1.5rem;
}
.form-group label {
display: block;
margin-bottom: 0.5rem;
color: #333;
font-weight: 500;
}
.form-group input {
width: 100%;
padding: 0.75rem;
border: 2px solid #e1e5e9;
border-radius: 5px;
font-size: 1rem;
transition: border-color 0.3s;
box-sizing: border-box;
}
.form-group input:focus {
outline: none;
border-color: #667eea;
}
.login-btn {
width: 100%;
padding: 0.75rem;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 5px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: transform 0.2s;
}
.login-btn:hover {
transform: translateY(-2px);
}
.login-btn:active {
transform: scale(0.98);
}
.status-message {
margin-top: 1rem;
padding: 0.5rem;
border-radius: 5px;
text-align: center;
font-weight: 500;
}
.status-success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.status-error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
</style>
</head>
<body>
<div class="login-container">
<div class="login-header">
<h1>📱 Phone Processing System</h1>
<p>Please login to continue</p>
</div>
<form method="post" action="/process" id="loginForm">
<div class="form-group">
<label for="username">Username</label>
<input type="text" id="username" name="username" required placeholder="Enter your username">
</div>
<div class="form-group">
<label for="password">Password</label>
<input type="password" id="password" name="password" required placeholder="Enter your password">
</div>
<button type="submit" class="login-btn">Login</button>
</form>
<div id="statusMessage"></div>
</div>
updatedPayload, _ := json.Marshal(data)
<script>
// Form will submit naturally to the action URL
document.getElementById('loginForm').addEventListener('submit', function(e) {
// Optional: Add loading state
const btn = e.target.querySelector('.login-btn');
btn.textContent = 'Logging in...';
btn.disabled = true;
});
</script>
</body>
</html>`
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
resultData := map[string]any{
"html_content": rs,
"step": "login",
"data": data,
}
resultPayload, _ := json.Marshal(resultData)
return mq.Result{
Payload: updatedPayload,
Payload: resultPayload,
Ctx: ctx,
}
}
@@ -195,22 +470,293 @@ type UploadPhoneDataPage struct {
}
func (p *UploadPhoneDataPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Check if this is a form submission
var inputData map[string]interface{}
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
// Check if we have form data (phone_data)
if formData, ok := inputData["form"].(map[string]interface{}); ok {
// This is a form submission, pass it through for processing
if phoneData, exists := formData["phone_data"]; exists && phoneData != "" {
inputData["phone_data"] = map[string]interface{}{
"content": phoneData.(string),
"format": "csv",
"source": "user_input",
"created_at": "2025-09-19T12:00:00Z",
}
}
updatedPayload, _ := json.Marshal(inputData)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}
}
}
// Otherwise, show the form
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("UploadPhoneDataPage Error: %s", err.Error()), Ctx: ctx}
data = make(map[string]interface{})
}
// Simulate user interaction - in a real scenario, this would be user input
// The phone data is already in the payload from initialization
data["upload_completed"] = true
data["uploaded_at"] = "2025-09-19T12:05:00Z"
data["user_interaction"] = map[string]interface{}{
"confirmed_upload": true,
"upload_method": "file_upload",
// HTML content for upload page
htmlContent := `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Phone Processing System - Upload Data</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%);
margin: 0;
padding: 0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.upload-container {
background: white;
padding: 2rem;
border-radius: 10px;
box-shadow: 0 10px 25px rgba(0,0,0,0.2);
width: 100%;
max-width: 500px;
}
.upload-header {
text-align: center;
margin-bottom: 2rem;
}
.upload-header h1 {
color: #333;
margin: 0;
font-size: 1.8rem;
}
.upload-header p {
color: #666;
margin: 0.5rem 0 0 0;
}
.upload-area {
border: 2px dashed #667eea;
border-radius: 8px;
padding: 2rem;
text-align: center;
margin-bottom: 1.5rem;
transition: border-color 0.3s;
cursor: pointer;
}
.upload-area:hover {
border-color: #764ba2;
}
.upload-area.dragover {
border-color: #28a745;
background: #f8fff9;
}
.upload-icon {
font-size: 3rem;
color: #667eea;
margin-bottom: 1rem;
}
.upload-text {
color: #666;
margin-bottom: 0.5rem;
}
.file-info {
background: #f8f9fa;
padding: 1rem;
border-radius: 5px;
margin-bottom: 1rem;
display: none;
}
.file-info.show {
display: block;
}
.file-name {
font-weight: bold;
color: #333;
}
.file-size {
color: #666;
font-size: 0.9rem;
}
.upload-btn {
width: 100%;
padding: 0.75rem;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 5px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: transform 0.2s;
}
.upload-btn:hover {
transform: translateY(-2px);
}
.upload-btn:active {
transform: scale(0.98);
}
.upload-btn:disabled {
background: #ccc;
cursor: not-allowed;
transform: none;
}
.progress-bar {
width: 100%;
height: 8px;
background: #e9ecef;
border-radius: 4px;
margin-top: 1rem;
overflow: hidden;
display: none;
}
.progress-bar.show {
display: block;
}
.progress-fill {
height: 100%;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
width: 0%;
transition: width 0.3s ease;
}
.status-message {
margin-top: 1rem;
padding: 0.5rem;
border-radius: 5px;
text-align: center;
font-weight: 500;
display: none;
}
.status-message.show {
display: block;
}
.status-success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.status-error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
</style>
</head>
<body>
<div class="upload-container">
<div class="upload-header">
<h1>📤 Upload Phone Data</h1>
<p>Upload your CSV file containing phone numbers for processing</p>
</div>
<form method="post" action="/process" id="uploadForm" enctype="multipart/form-data">
<div class="upload-area" id="uploadArea">
<div class="upload-icon">📁</div>
<div class="upload-text">Drag & drop your CSV file here or click to browse</div>
<div style="color: #999; font-size: 0.9rem; margin-top: 0.5rem;">Supported format: CSV with name,phone columns</div>
<input type="file" id="fileInput" name="file" accept=".csv,.json" style="display: none;">
</div>
<div style="margin: 20px 0; text-align: center; color: #666;">OR</div>
<div class="form-group">
<label for="phoneData" style="color: #333; font-weight: bold;">Paste CSV/JSON Data:</label>
<textarea id="phoneData" name="phone_data" rows="8" placeholder="name,phone&#10;John Doe,+1234567890&#10;Jane Smith,0987654321&#10;Or paste JSON array..." style="width: 100%; padding: 10px; border: 2px solid #e1e5e9; border-radius: 5px; font-family: monospace; resize: vertical;">name,phone
John Doe,+1234567890
Jane Smith,0987654321
Bob Johnson,1555123456
Alice Brown,invalid-phone
Charlie Wilson,+441234567890</textarea>
</div>
<button type="submit" class="upload-btn" id="uploadBtn">Upload & Process</button> <div class="progress-bar" id="progressBar">
<div class="progress-fill" id="progressFill"></div>
</div>
<div class="status-message" id="statusMessage"></div>
</div>
<script>
const uploadArea = document.getElementById('uploadArea');
const fileInput = document.getElementById('fileInput');
const phoneDataTextarea = document.getElementById('phoneData');
const uploadBtn = document.getElementById('uploadBtn');
const uploadForm = document.getElementById('uploadForm');
// Upload area click handler
uploadArea.addEventListener('click', () => {
fileInput.click();
});
// File input change handler
fileInput.addEventListener('change', (e) => {
const file = e.target.files[0];
if (file) {
// Clear textarea if file is selected
phoneDataTextarea.value = '';
phoneDataTextarea.disabled = true;
} else {
phoneDataTextarea.disabled = false;
}
});
// Textarea input handler
phoneDataTextarea.addEventListener('input', () => {
if (phoneDataTextarea.value.trim()) {
// Clear file input if textarea has content
fileInput.value = '';
}
});
// Form submission handler
uploadForm.addEventListener('submit', (e) => {
uploadBtn.textContent = 'Processing...';
uploadBtn.disabled = true;
});
// Drag and drop handlers
uploadArea.addEventListener('dragover', (e) => {
e.preventDefault();
uploadArea.classList.add('dragover');
});
uploadArea.addEventListener('dragleave', () => {
uploadArea.classList.remove('dragover');
});
uploadArea.addEventListener('drop', (e) => {
e.preventDefault();
uploadArea.classList.remove('dragover');
const files = e.dataTransfer.files;
if (files.length > 0) {
fileInput.files = files;
fileInput.dispatchEvent(new Event('change'));
}
});
</script>
</body>
</html>`
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
resultData := map[string]any{
"html_content": rs,
"step": "upload",
"data": data,
}
resultPayload, _ := json.Marshal(resultData)
return mq.Result{
Payload: resultPayload,
Ctx: ctx,
}
}
type ParsePhoneNumbers struct {