Skip to main content

Dynamic fanout

In this example, we'll explore how to implement dynamic fanout patterns in Dagster. Dynamic fanout is useful when you need to process a variable number of items in parallel, where the exact count isn't known until runtime. This is particularly valuable for data processing pipelines that need to handle varying workloads efficiently.

Problem: Variable workload processing

Imagine you have a data processing pipeline that receives multiple related records, where each record contains 10-30 processing units that require expensive computation. Without dynamic fanout, you'd have to do one of the following:

  • Process everything sequentially (slow)
  • Pre-define a fixed number of parallel processes (inflexible)
  • Process all units in a single large operation (difficult to debug and monitor)

The challenge is creating a pipeline that can:

  • Dynamically spawn sub-pipelines based on input data
  • Process each record's units in parallel
  • Collect and aggregate results efficiently
  • Maintain proper lineage and observability

Solution 1: Sequential processing within sub-pipelines

The first approach uses dynamic fanout to create parallel sub-pipelines for each record, but processes units within each sub-pipeline sequentially. This provides the first layer of parallelization.

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
@dg.op
def sub_pipeline_process_record_option_a(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.

1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Currently sequential as specified]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")

# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")

# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []

# Sequential processing (current implementation)
for i, unit in enumerate(processing_units):
context.log.info(f"Processing unit {i + 1}/{len(processing_units)} for {record['id']}")
result = process_single_unit(unit)
unit_results.append(result)

# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)

context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)

return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}

This approach creates a sub-pipeline for each input record:

  1. Dynamic trigger: Uses DynamicOut to create one sub-pipeline per input record
  2. Sequential unit processing: Processes 10-30 units within each sub-pipeline sequentially
  3. Result aggregation: Combines unit results into a final record output
Processing layerParallelizationUnits per record
Record-levelParallel1 sub-pipeline per record
Unit-levelSequential10-30 units processed in order

Solution 2: Parallel processing within sub-pipelines

The second approach adds a second layer of parallelization by processing units within each sub-pipeline in parallel using multiprocessing.

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
@dg.op
def sub_pipeline_process_record_option_b(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.

1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Implemented using multiprocessing pool]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")

# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")

# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []

# Parallel processing (enable when ready for second layer)
def process_unit_worker(unit):
return process_single_unit(unit)

if len(processing_units) > 1:
num_processes = min(multiprocessing.cpu_count() - 1, len(processing_units))
with multiprocessing.Pool(processes=num_processes) as pool:
unit_results = pool.map(process_unit_worker, processing_units)
else:
unit_results = [process_unit_worker(processing_units[0])] if processing_units else []

# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)

context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)

return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}

This enhanced approach provides two layers of parallelization:

  1. Record-level parallelization: Multiple sub-pipelines run simultaneously
  2. Unit-level parallelization: Within each sub-pipeline, units are processed using a multiprocessing pool
  3. Automatic scaling: Number of processes adapts to available CPU cores and workload size
Processing layerParallelizationPerformance benefit
Record-levelParallelScales with number of records
Unit-levelParallelScales with CPU cores

Complete pipeline implementation

The complete pipeline uses a graph-backed asset to orchestrate the dynamic fanout pattern:

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
@dg.graph_asset
def main_pipeline_results(input_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""COMPLETE MAIN PIPELINE.

1. ✅ Receives multiple related data records.
2. ✅ sub_pipeline processes each record individually (First layer of parallelization)
3. ✅ Individual results are collected for additional processing
4. ✅ Generate final results (used by downstream assets for final report)

The graph-backed asset ensures:
- Sub-pipelines are triggered based on number of inputs
- Main pipeline cannot complete before ALL sub-pipelines return results
- Full visibility into parallel execution
- Proper asset lineage and dependencies
"""
# Launch sub-pipelines (one per input record)
sub_pipeline_triggers = trigger_sub_pipelines(input_records)

# Execute sub-pipelines in parallel (first layer of parallelization)
sub_pipeline_results = sub_pipeline_triggers.map(sub_pipeline_process_record_option_a)

# Collect ALL results before proceeding (fan-in / synchronization barrier)
collected_results = sub_pipeline_results.collect() # This waits for all to complete

# Perform additional processing and return final results
return collect_sub_pipeline_results(collected_results)


@dg.asset
def final_report(main_pipeline_results: list[dict[str, Any]]) -> dict[str, Any]:
"""MAIN PIPELINE: "generate a comprehensive final report".

This asset depends on main_pipeline_results, ensuring the entire pipeline
completes before the final report is generated.
"""
total_units = sum(result["units_processed"] for result in main_pipeline_results)

return {
"pipeline_summary": {
"total_records_processed": len(main_pipeline_results),
"total_units_processed": total_units,
"average_units_per_record": total_units / len(main_pipeline_results)
if main_pipeline_results
else 0,
},
"detailed_results": main_pipeline_results,
"report_generated_at": "2025-08-25T10:00:00Z",
"pipeline_status": "completed_successfully",
}

Key features of this implementation:

  1. Dynamic triggering: trigger_sub_pipelines() creates sub-pipelines based on input data
  2. Parallel execution: .map() processes each record in its own sub-pipeline
  3. Synchronization barrier: .collect() ensures all sub-pipelines complete before proceeding
  4. Result aggregation: collect_sub_pipeline_results() processes all results together

The graph-backed asset approach provides:

  • Full observability: Each sub-pipeline execution is tracked individually
  • Proper lineage: Clear dependency relationships between operations
  • Fault tolerance: Failed sub-pipelines can be retried independently
  • Scalability: Automatically adapts to varying input sizes