Streaming and Cancellation
Process large rule sets efficiently with streaming execution and cancellation support.
Table of Contents
- ExecuteAsync (Streaming)
- ExecuteBufferedAsync (Chunked)
- CancellationToken
- Parallel Async with Cancellation
ExecuteAsync (Streaming)
Use ExecuteAsync when you have many rules and want to process results as they arrive, without waiting for all rules to complete.
Basic Streaming
var workflow = new Workflow
{
Description = "Large validation set",
Rules = GetRulesFromDatabase() // 100+ rules
};
workflow.Compile(parameters);
// Process results as they arrive
await foreach (var result in workflow.ExecuteAsync(parameters))
{
if (!result.Success)
{
LogFailure(result);
// Can break early — remaining rules are still evaluated
// but you stop consuming
}
}
Short-Circuit on First Failure
await foreach (var result in workflow.ExecuteAsync(parameters))
{
if (!result.Success)
{
// Stop processing and return early
return new ValidationFailure(result);
}
}
// All rules passed
return ValidationSuccess();
Real-Time Progress Reporting
var processed = 0;
var total = workflow.Rules.Count;
await foreach (var result in workflow.ExecuteAsync(parameters))
{
processed++;
ReportProgress(new ProgressInfo
{
Processed = processed,
Total = total,
CurrentRule = result.RuleDescription,
Success = result.Success
});
}
ExecuteBufferedAsync (Chunked)
Process rules in fixed-size chunks for batch operations.
Batch Processing
var workflow = new Workflow
{
Rules = GetRulesFromDatabase() // 1000+ rules
};
workflow.Compile(parameters);
// Process 50 rules at a time
await foreach (var chunk in workflow.ExecuteBufferedAsync(parameters, bufferSize: 50))
{
// chunk is RuleResult[]
await SaveResultsToDatabase(chunk);
// Or process in batches
var failures = chunk.Where(r => !r.Success).ToList();
if (failures.Any())
{
await NotifyFailures(failures);
}
}
Memory-Efficient Processing
// With 10,000 rules, loading all results into memory at once is expensive
// Buffered execution keeps memory usage constant
await foreach (var chunk in workflow.ExecuteBufferedAsync(parameters, bufferSize: 100))
{
// Only 100 RuleResult objects in memory at once
foreach (var result in chunk)
{
ProcessResult(result);
}
// Results can be garbage collected after this iteration
}
Buffered with Dependencies
Buffered execution respects rule dependencies. Rules in a buffer execute only after their dependencies (which may be in previous buffers) complete.
var ruleA = new Rule { Description = "A", Expression = "true", IsActive = true };
var ruleB = new Rule { Description = "B", DependsOnRuleId = ruleA.Id, Expression = "true", IsActive = true };
// ... more rules
// If bufferSize = 1, A is in buffer 0, B is in buffer 1
// B executes after A completes
await foreach (var chunk in workflow.ExecuteBufferedAsync(parameters, bufferSize: 1))
{
// chunk[0] = result for that batch
}
CancellationToken
Cancel rule execution to stop processing early.
Timeout-Based Cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var results = await workflow.ExecuteParallelAsync(parameters, cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Rule execution timed out after 5 seconds");
}
User-Initiated Cancellation
var cts = new CancellationTokenSource();
// Start long-running rule evaluation
var task = Task.Run(async () =>
{
await foreach (var result in workflow.ExecuteAsync(parameters, cts.Token))
{
UpdateUI(result);
}
});
// User clicks "Cancel" button
buttonCancel.Click += (s, e) => cts.Cancel();
Mid-Stream Cancellation
using var cts = new CancellationTokenSource();
await foreach (var result in workflow.ExecuteAsync(parameters, cts.Token))
{
if (result.RuleDescription == "Critical check" && !result.Success)
{
// Critical rule failed — cancel remaining evaluation
cts.Cancel();
break;
}
}
Parallel Async with Cancellation
Combine parallel execution with cancellation for maximum throughput with safety.
// Check 50 rules against external APIs
var workflow = new Workflow
{
Rules = new List<Rule>
{
new Rule { Description = "Check API 1", Expression = "await Api1.IsHealthy()" },
new Rule { Description = "Check API 2", Expression = "await Api2.IsHealthy()" },
// ... 48 more
}
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var results = await workflow.ExecuteParallelAsync(parameters, cts.Token);
// If any API takes longer than 10 seconds, OperationCanceledException is thrown
// Otherwise, all 50 results are available
Combining with RuleBatch
var batch = new RuleBatch()
.AddRules(GetHealthCheckRules());
batch.Compile(parameters);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var results = await batch.EvaluateParallelAsync(parameters, cts.Token);
var healthyCount = results.Count(r => r.Success);
Console.WriteLine($"{healthyCount}/{results.Length} services healthy");
Performance Considerations
| Mode | Memory | Latency | Use Case |
|---|---|---|---|
Execute() | All results | Low | Few rules, need all results |
ExecuteAsync() | One at a time | Low | Many rules, streaming |
ExecuteBufferedAsync() | Fixed chunk | Low | Many rules, batch processing |
ExecuteParallelAsync() | All results | Lowest | CPU-intensive, many rules |
Rule of thumb:
- < 10 rules: Use
Execute() - 10-100 rules: Use
ExecuteParallel()orExecuteParallelAsync() -
100 rules: Use
ExecuteBufferedAsync()with appropriate buffer size - UI updates: Use
ExecuteAsync()for real-time progress - External APIs: Use
ExecuteParallelAsync()with cancellation timeout