Dynamic fanout
In this example, we'll explore how to implement dynamic fanout patterns in Dagster. Dynamic fanout is useful when you need to process a variable number of items in parallel, where the exact count isn't known until runtime. This is particularly valuable for data processing pipelines that need to handle varying workloads efficiently.
Problem: Variable workload processing
Imagine you have a data processing pipeline that receives multiple related records, where each record contains 10-30 processing units that require expensive computation. Without dynamic fanout, you'd have to do one of the following:
- Process everything sequentially (slow)
- Pre-define a fixed number of parallel processes (inflexible)
- Process all units in a single large operation (difficult to debug and monitor)
The challenge is creating a pipeline that can:
- Dynamically spawn sub-pipelines based on input data
- Process each record's units in parallel
- Collect and aggregate results efficiently
- Maintain proper lineage and observability
Solution 1: Sequential processing within sub-pipelines
The first approach uses dynamic fanout to create parallel sub-pipelines for each record, but processes units within each sub-pipeline sequentially. This provides the first layer of parallelization.
@dg.op
def sub_pipeline_process_record_option_a(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.
1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Currently sequential as specified]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")
# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")
# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []
# Sequential processing (current implementation)
for i, unit in enumerate(processing_units):
context.log.info(f"Processing unit {i + 1}/{len(processing_units)} for {record['id']}")
result = process_single_unit(unit)
unit_results.append(result)
# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)
context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)
return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}
This approach creates a sub-pipeline for each input record:
- Dynamic trigger: Uses
DynamicOut
to create one sub-pipeline per input record - Sequential unit processing: Processes 10-30 units within each sub-pipeline sequentially
- Result aggregation: Combines unit results into a final record output
Processing layer | Parallelization | Units per record |
---|---|---|
Record-level | Parallel | 1 sub-pipeline per record |
Unit-level | Sequential | 10-30 units processed in order |
Solution 2: Parallel processing within sub-pipelines
The second approach adds a second layer of parallelization by processing units within each sub-pipeline in parallel using multiprocessing.
@dg.op
def sub_pipeline_process_record_option_b(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.
1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Implemented using multiprocessing pool]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")
# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")
# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []
# Parallel processing (enable when ready for second layer)
def process_unit_worker(unit):
return process_single_unit(unit)
if len(processing_units) > 1:
num_processes = min(multiprocessing.cpu_count() - 1, len(processing_units))
with multiprocessing.Pool(processes=num_processes) as pool:
unit_results = pool.map(process_unit_worker, processing_units)
else:
unit_results = [process_unit_worker(processing_units[0])] if processing_units else []
# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)
context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)
return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}
This enhanced approach provides two layers of parallelization:
- Record-level parallelization: Multiple sub-pipelines run simultaneously
- Unit-level parallelization: Within each sub-pipeline, units are processed using a multiprocessing pool
- Automatic scaling: Number of processes adapts to available CPU cores and workload size
Processing layer | Parallelization | Performance benefit |
---|---|---|
Record-level | Parallel | Scales with number of records |
Unit-level | Parallel | Scales with CPU cores |
Complete pipeline implementation
The complete pipeline uses a graph-backed asset to orchestrate the dynamic fanout pattern:
@dg.graph_asset
def main_pipeline_results(input_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""COMPLETE MAIN PIPELINE.
1. ✅ Receives multiple related data records.
2. ✅ sub_pipeline processes each record individually (First layer of parallelization)
3. ✅ Individual results are collected for additional processing
4. ✅ Generate final results (used by downstream assets for final report)
The graph-backed asset ensures:
- Sub-pipelines are triggered based on number of inputs
- Main pipeline cannot complete before ALL sub-pipelines return results
- Full visibility into parallel execution
- Proper asset lineage and dependencies
"""
# Launch sub-pipelines (one per input record)
sub_pipeline_triggers = trigger_sub_pipelines(input_records)
# Execute sub-pipelines in parallel (first layer of parallelization)
sub_pipeline_results = sub_pipeline_triggers.map(sub_pipeline_process_record_option_a)
# Collect ALL results before proceeding (fan-in / synchronization barrier)
collected_results = sub_pipeline_results.collect() # This waits for all to complete
# Perform additional processing and return final results
return collect_sub_pipeline_results(collected_results)
@dg.asset
def final_report(main_pipeline_results: list[dict[str, Any]]) -> dict[str, Any]:
"""MAIN PIPELINE: "generate a comprehensive final report".
This asset depends on main_pipeline_results, ensuring the entire pipeline
completes before the final report is generated.
"""
total_units = sum(result["units_processed"] for result in main_pipeline_results)
return {
"pipeline_summary": {
"total_records_processed": len(main_pipeline_results),
"total_units_processed": total_units,
"average_units_per_record": total_units / len(main_pipeline_results)
if main_pipeline_results
else 0,
},
"detailed_results": main_pipeline_results,
"report_generated_at": "2025-08-25T10:00:00Z",
"pipeline_status": "completed_successfully",
}
Key features of this implementation:
- Dynamic triggering:
trigger_sub_pipelines()
creates sub-pipelines based on input data - Parallel execution:
.map()
processes each record in its own sub-pipeline - Synchronization barrier:
.collect()
ensures all sub-pipelines complete before proceeding - Result aggregation:
collect_sub_pipeline_results()
processes all results together
The graph-backed asset approach provides:
- Full observability: Each sub-pipeline execution is tracked individually
- Proper lineage: Clear dependency relationships between operations
- Fault tolerance: Failed sub-pipelines can be retried independently
- Scalability: Automatically adapts to varying input sizes