Parallel Execution
Parallel Execution
Section titled “Parallel Execution”When workflow steps are independent, running them sequentially wastes time. Financial analysis can examine fundamentals, technicals, and market position simultaneously. Document processing can extract text, images, and metadata in parallel. This tutorial shows you how to use Fork and Join for concurrent execution.
What You Will Build
Section titled “What You Will Build”A comprehensive stock analysis workflow that runs three independent analyses in parallel:
- Financial Analysis - Revenue, margins, debt ratios
- Technical Analysis - Price trends, support/resistance levels
- Market Analysis - Sector outlook, competitive position
After all three complete, results are synthesized into a final report.
Step 1: Define the State
Section titled “Step 1: Define the State”The state includes fields for each parallel analysis result:
[WorkflowState]public record AnalysisState : IWorkflowState{ public Guid WorkflowId { get; init; } public Company Company { get; init; } = null!; public MarketData? MarketData { get; init; } public FinancialAnalysis? FinancialAnalysis { get; init; } public TechnicalAnalysis? TechnicalAnalysis { get; init; } public MarketAnalysis? MarketAnalysis { get; init; } public SynthesizedReport? Report { get; init; } public string? FinalReport { get; init; }}
public record Company(string Ticker, string Name, string Sector);
public record MarketData( decimal CurrentPrice, decimal Volume, IReadOnlyList<decimal> HistoricalPrices);
public record FinancialAnalysis( decimal RevenueGrowth, decimal ProfitMargin, decimal DebtToEquity, string Outlook);
public record TechnicalAnalysis( string Trend, decimal SupportLevel, decimal ResistanceLevel, IReadOnlyList<string> Signals);
public record MarketAnalysis( string SectorOutlook, IReadOnlyList<string> Competitors, decimal MarketShare, string CompetitivePosition);
public record SynthesizedReport( string Recommendation, decimal TargetPrice, string Rationale, IReadOnlyList<string> KeyRisks);Step 2: Define the Workflow with Fork/Join
Section titled “Step 2: Define the Workflow with Fork/Join”Use Fork to start parallel paths and Join to synchronize them:
var workflow = Workflow<AnalysisState> .Create("comprehensive-analysis") .StartWith<GatherData>() .Fork( flow => flow.Then<FinancialAnalysisStep>(), flow => flow.Then<TechnicalAnalysisStep>(), flow => flow.Then<MarketAnalysisStep>()) .Join<SynthesizeResults>() .Finally<GenerateReport>();All three analysis steps execute concurrently. The Join<SynthesizeResults> step waits for all paths to complete, then executes with the merged state containing all analysis results.
Step 3: Implement the Steps
Section titled “Step 3: Implement the Steps”GatherData
Section titled “GatherData”Fetches the market data needed by all analyses:
public class GatherData : IWorkflowStep<AnalysisState>{ private readonly IMarketDataService _marketData;
public GatherData(IMarketDataService marketData) { _marketData = marketData; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { var data = await _marketData.GetDataAsync(state.Company.Ticker, ct);
return state .With(s => s.MarketData, data) .AsResult(); }}FinancialAnalysisStep
Section titled “FinancialAnalysisStep”public class FinancialAnalysisStep : IWorkflowStep<AnalysisState>{ private readonly IFinancialAnalyzer _analyzer;
public FinancialAnalysisStep(IFinancialAnalyzer analyzer) { _analyzer = analyzer; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { var analysis = await _analyzer.AnalyzeAsync( state.Company, state.MarketData!, ct);
return state .With(s => s.FinancialAnalysis, analysis) .AsResult(); }}TechnicalAnalysisStep
Section titled “TechnicalAnalysisStep”public class TechnicalAnalysisStep : IWorkflowStep<AnalysisState>{ private readonly ITechnicalAnalyzer _analyzer;
public TechnicalAnalysisStep(ITechnicalAnalyzer analyzer) { _analyzer = analyzer; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { var analysis = await _analyzer.AnalyzeAsync( state.Company.Ticker, state.MarketData!.HistoricalPrices, ct);
return state .With(s => s.TechnicalAnalysis, analysis) .AsResult(); }}MarketAnalysisStep
Section titled “MarketAnalysisStep”public class MarketAnalysisStep : IWorkflowStep<AnalysisState>{ private readonly IMarketAnalyzer _analyzer;
public MarketAnalysisStep(IMarketAnalyzer analyzer) { _analyzer = analyzer; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { var analysis = await _analyzer.AnalyzeAsync( state.Company.Sector, state.Company.Ticker, ct);
return state .With(s => s.MarketAnalysis, analysis) .AsResult(); }}SynthesizeResults
Section titled “SynthesizeResults”The join step receives state with all analysis results populated:
public class SynthesizeResults : IWorkflowStep<AnalysisState>{ private readonly IReportSynthesizer _synthesizer;
public SynthesizeResults(IReportSynthesizer synthesizer) { _synthesizer = synthesizer; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { // All three analyses are available here after Join var report = await _synthesizer.SynthesizeAsync( state.FinancialAnalysis!, state.TechnicalAnalysis!, state.MarketAnalysis!, ct);
return state .With(s => s.Report, report) .AsResult(); }}GenerateReport
Section titled “GenerateReport”public class GenerateReport : IWorkflowStep<AnalysisState>{ private readonly IReportGenerator _generator;
public GenerateReport(IReportGenerator generator) { _generator = generator; }
public async Task<StepResult<AnalysisState>> ExecuteAsync( AnalysisState state, StepContext context, CancellationToken ct) { var markdown = await _generator.GenerateMarkdownAsync( state.Company, state.Report!, ct);
return state .With(s => s.FinalReport, markdown) .AsResult(); }}Understanding State Merging
Section titled “Understanding State Merging”When fork paths complete, their states are merged using reducer semantics.
Default Behavior (Overwrite)
Section titled “Default Behavior (Overwrite)”By default, the last value wins for scalar properties:
// Each fork path sets a different property - no conflictspublic FinancialAnalysis? FinancialAnalysis { get; init; }public TechnicalAnalysis? TechnicalAnalysis { get; init; }public MarketAnalysis? MarketAnalysis { get; init; }Since each path sets a unique property, merging is straightforward.
Collection Accumulation
Section titled “Collection Accumulation”Use [Append] to accumulate values across paths:
[WorkflowState]public record AnalysisState : IWorkflowState{ // Scalar properties use overwrite semantics public FinancialAnalysis? FinancialAnalysis { get; init; }
// Collection properties can accumulate with [Append] [Append] public ImmutableList<AnalysisWarning> Warnings { get; init; } = [];}Each fork path can add warnings, and all warnings appear in the merged state.
Advanced Fork Patterns
Section titled “Advanced Fork Patterns”Instance Names for Duplicate Steps
Section titled “Instance Names for Duplicate Steps”If you need the same step type in multiple fork paths, use instance names:
.Fork( flow => flow.Then<AnalyzeStep>("Technical"), flow => flow.Then<AnalyzeStep>("Fundamental")).Join<SynthesizeStep>()This generates distinct phases (Technical, Fundamental) but shares the step implementation.
Multi-Step Fork Paths
Section titled “Multi-Step Fork Paths”Each fork path can contain multiple sequential steps:
.Fork( flow => flow .Then<FetchFinancials>() .Then<AnalyzeFinancials>(), flow => flow .Then<FetchTechnicals>() .Then<AnalyzeTechnicals>()).Join<Synthesize>()Error Handling in Fork Paths
Section titled “Error Handling in Fork Paths”By default, fork paths continue independently even if one fails:
// Default: continue-on-error.Fork( flow => flow.Then<Analysis1>(), flow => flow.Then<Analysis2>(), flow => flow.Then<Analysis3>()).Join<Synthesize>()Enable fail-fast to stop all paths when one fails:
// Fail-fast: cancel other paths on first failure.Fork( options => options.FailFast(), flow => flow.Then<Analysis1>(), flow => flow.Then<Analysis2>(), flow => flow.Then<Analysis3>()).Join<Synthesize>()Generated Artifacts
Section titled “Generated Artifacts”Phase Enum
Section titled “Phase Enum”public enum ComprehensiveAnalysisPhase{ NotStarted, GatherData, FinancialAnalysisStep, TechnicalAnalysisStep, MarketAnalysisStep, SynthesizeResults, GenerateReport, Completed, Failed}Fork Handler
Section titled “Fork Handler”The generated saga cascades to all fork paths:
// After GatherData completes, start all fork pathspublic async Task<object[]> Handle( ExecuteGatherDataCommand command, GatherData step, IDocumentSession session, TimeProvider time, CancellationToken ct){ var result = await step.ExecuteAsync(State, ct); State = AnalysisStateReducer.Reduce(State, result.StateUpdate);
// Return commands for all fork paths (executed in parallel) return [ new ExecuteFinancialAnalysisStepCommand(WorkflowId), new ExecuteTechnicalAnalysisStepCommand(WorkflowId), new ExecuteMarketAnalysisStepCommand(WorkflowId) ];}Key Points
Section titled “Key Points”- Fork paths execute concurrently for faster total completion time
- Join waits for all paths before continuing to the next step
- State from all paths is merged using reducer semantics
- Use instance names when the same step type appears in multiple paths
- Each fork path can have multiple steps chained sequentially
- Configure error handling for fail-fast or continue-on-error behavior
Next Steps
Section titled “Next Steps”You have learned how to run steps in parallel. Sometimes workflows need to iterate until a condition is met:
- Loops - Repeat steps until quality thresholds are achieved
- Approvals - Pause workflows for human review
- Agent Selection - Choose the best agent for each task