Documentation Index Fetch the complete documentation index at: https://mintlify.com/mofa-org/mofa/llms.txt
Use this file to discover all available pages before exploring further.
Overview
This comprehensive example demonstrates MoFA’s graph-based workflow orchestration system. It includes:
Linear, conditional, and parallel workflow patterns
ETL data pipelines
LLM agent workflow integration (Dify-style)
Event listening and monitoring
ReAct agent workflow integration
Multi-agent parallel analysis
Intelligent data pipelines
What You’ll Learn
Using WorkflowBuilder for fluent workflow construction
Implementing conditional branching
Parallel execution and result aggregation
State management and data passing
LLM integration in workflows
Event monitoring and checkpoints
Prerequisites
Rust 1.75 or higher
OpenAI API key (for LLM examples)
Understanding of async/await and graph concepts
Architecture
Source Code Overview
The example contains 1061 lines with 10 different workflow scenarios:
Linear Workflow
Conditional Workflow
Parallel Workflow
LLM Agent Integration
Cargo.toml
use mofa_sdk :: workflow :: { WorkflowBuilder , WorkflowValue };
let graph = WorkflowBuilder :: new ( "linear" , "Linear Workflow" )
. description ( "Simple linear data processing" )
. start ()
. task ( "fetch" , "Fetch Data" , | _ctx , input | async move {
info! ( "Fetching data..." );
let data = format! ( "Data: {}" , input . as_str () . unwrap_or ( "default" ));
Ok ( WorkflowValue :: String ( data ))
})
. task ( "process" , "Process Data" , | _ctx , input | async move {
info! ( "Processing: {:?}" , input );
let processed = format! ( "Processed - {}" , input . as_str () . unwrap_or ( "" ));
Ok ( WorkflowValue :: String ( processed ))
})
. task ( "save" , "Save Result" , | _ctx , input | async move {
info! ( "Saving: {:?}" , input );
Ok ( WorkflowValue :: String ( "Save successful" . to_string ()))
})
. end ()
. build ();
let executor = WorkflowExecutor :: new ( ExecutorConfig :: default ());
let result = executor . execute ( & graph , WorkflowValue :: String ( "API" . into ())) . await ? ;
Running the Example
cd examples/workflow_orchestration
cargo run
Run with LLM Examples (6-10)
export OPENAI_API_KEY = "your-api-key"
cargo run
Workflow Examples
1. Linear
2. Conditional
3. Parallel
4. Data Pipeline
5. Event Monitoring
6. ReAct Agent
7. Multi-Agent
8. Conditional LLM
9. Intelligent Pipeline
10. Tool Chain
Pattern : Start → Fetch → Process → Save → EndSimple sequential processing pipeline. Use Case : ETL, data transformation
Pattern : Start → Check → Branch A / Branch B → EndConditional routing based on data values. Use Case : Decision trees, rule engines
Pattern : Start → Fork → [A, B, C] → Join → EndConcurrent task execution with aggregation. Use Case : Batch processing, fan-out/fan-in
Pattern : Extract → Transform → LoadETL pipeline with state management. Use Case : Data warehousing, analytics
Pattern : Workflow with event listenersMonitor execution with checkpoints. Use Case : Observability, debugging
Pattern : Context → ReAct Agent → LLM SynthesisIntegrate ReAct reasoning into workflows. Use Case : Complex problem solving
Pattern : Fork → [Tech, Business, User] → LLM SynthesisParallel expert analysis with LLM aggregation. Use Case : Multi-perspective analysis
Pattern : LLM Classify → Simple / Complex → DecisionLLM-driven conditional routing. Use Case : Dynamic task routing
Pattern : Extract → Transform → LLM AnalysisETL + intelligent insights. Use Case : Business intelligence
Pattern : [Calculator, DateTime] → LLM SummaryParallel tools with LLM synthesis. Use Case : Multi-tool orchestration
Key Concepts
WorkflowBuilder API
WorkflowBuilder :: new ( id , name )
. description ( desc ) // Workflow description
. start () // Add start node
. task ( id , name , handler ) // Add task node
. condition ( id , name , predicate ) // Add conditional node
. parallel ( id , name ) // Start parallel section
. branch ( id , name , handler ) // Add parallel branch
. join ( id , name ) // Join parallel branches
. llm_agent ( id , name , agent ) // Add LLM node
. end () // Add end node
. build () // Build graph
Node Types
Entry point of the workflow: WorkflowNode :: start ( "start" )
Executes custom logic: WorkflowNode :: task ( "task_id" , "Task Name" , | ctx , input | async move {
// Your logic
Ok ( WorkflowValue :: String ( "result" . to_string ()))
})
Routes based on predicate: WorkflowNode :: condition ( "check" , "Check" , | ctx , input | async move {
input . as_i64 () . unwrap_or ( 0 ) > 50
})
Integrates LLM agent: WorkflowNode :: llm_agent ( "llm" , "LLM Node" , agent )
Context and State
// Access workflow context
. task ( "process" , "Process" , | ctx , input | async move {
// Set variable
ctx . set_variable ( "count" , WorkflowValue :: Int ( 10 )) . await ;
// Get variable
let count = ctx . get_variable ( "count" ) . await
. and_then ( | v | v . as_i64 ())
. unwrap_or ( 0 );
Ok ( WorkflowValue :: Int ( count + 1 ))
})
Event Monitoring
use mofa_sdk :: workflow :: ExecutionEvent ;
let ( event_tx , mut event_rx ) = mpsc :: channel ( 100 );
let executor = WorkflowExecutor :: new ( config )
. with_event_sender ( event_tx );
tokio :: spawn ( async move {
while let Some ( event ) = event_rx . recv () . await {
match event {
ExecutionEvent :: WorkflowStarted { workflow_id , .. } => {
info! ( "Started: {}" , workflow_id );
}
ExecutionEvent :: NodeCompleted { node_id , result } => {
info! ( "Completed: {} - {:?}" , node_id , result . status);
}
_ => {}
}
}
});
Expected Output
=== MoFA Workflow Orchestration Example ===
--- Example 1: Linear Workflow ---
[fetch_data] Fetching data...
[process] Processing data
[save] Saving result
Workflow status: Completed
Number of executed nodes: 5
--- Example 2: Conditional Branch Workflow ---
[check_value] Checking value: 75 (threshold: 50)
[high_path] Executing high-value path
Workflow status: Completed
--- Example 3: Parallel Execution Workflow ---
[task_a] Starting Task A...
[task_b] Starting Task B...
[task_c] Starting Task C...
[task_b] Task B complete
[task_c] Task C complete
[task_a] Task A complete
[join] Aggregating all results
Workflow status: Completed
=== Dify-style LLM/Agent Workflow Examples ===
--- Example 6: ReAct Agent Decision Workflow ---
[gather_context] Gathering context information...
[react_agent] Starting ReAct reasoning...
[react_agent] Reasoning complete, iterations: 3
[final_synthesis] LLM synthesizing results...
Workflow status: Completed
=== All examples have finished executing ===
Advanced Features
Checkpoints and Recovery
let executor = WorkflowExecutor :: new ( ExecutorConfig {
enable_checkpoints : true ,
checkpoint_interval : 3 , // Checkpoint every 3 nodes
.. Default :: default ()
});
// Resume from checkpoint
let result = executor . resume_from_checkpoint (
& checkpoint_data ,
& graph
) . await ? ;
Timeout and Retries
let config = ExecutorConfig {
max_execution_time : Duration :: from_secs ( 300 ),
retry_on_failure : true ,
max_retries : 3 ,
.. Default :: default ()
};
Custom Aggregation
. join_with_transform ( "join" , "Aggregate" , | results | async move {
// Custom aggregation logic
let sum : i64 = results . values ()
. filter_map ( | v | v . as_i64 ())
. sum ();
WorkflowValue :: Int ( sum )
})
Common Use Cases
ETL Pipelines Extract, transform, load data workflows
ML Pipelines Training, validation, deployment flows
CI/CD Build, test, deploy automation
Business Process Multi-stage approval workflows
Troubleshooting
Problem : Node returns errorSolution : Add error handling:. task ( "task" , "Task" , | ctx , input | async move {
match process ( input ) . await {
Ok ( result ) => Ok ( result ),
Err ( e ) => {
// Log and return default
warn! ( "Task failed: {}" , e );
Ok ( WorkflowValue :: String ( "default" . into ()))
}
}
})
Problem : Workflow hangsSolution : Check for circular dependencies and add timeout:let config = ExecutorConfig {
max_execution_time : Duration :: from_secs ( 60 ),
.. Default :: default ()
};
Problem : Large workflows consume too much memorySolution : Use streaming or chunking:. task ( "process" , "Process" , | ctx , input | async move {
// Process in chunks
for chunk in input . chunks ( 1000 ) {
process_chunk ( chunk ) . await ? ;
}
Ok ( WorkflowValue :: Null )
})
Next Steps
ReAct Agent Integrate reasoning agents
Multi-Agent Multi-agent workflows
Workflow Guide Advanced workflow patterns
Workflow API Complete API reference