Concurrency controls
In this example, we'll explore different approaches to control concurrency in Dagster. When your pipelines interact with rate-limited APIs, shared databases, or resource-constrained systems, you need to limit how many operations execute simultaneously. Dagster provides several mechanisms at different levels of granularity.
Problem: Preventing resource overload
Imagine you have assets that query a database with limited connection pools, call rate-limited APIs, or consume significant memory. Running too many of these operations simultaneously can cause failures, throttling, or performance degradation.
The key question is: At what level should you limit concurrency—across all runs, within a single run, or for specific resources?
Solution 1: Concurrency pools (cross-run limits)
Concurrency pools limit how many assets or ops with the same pool can execute simultaneously across all runs. This is ideal for protecting shared resources like databases or APIs.
import time
import dagster as dg
@dg.asset(pool="database")
def query_customers(context: dg.AssetExecutionContext):
"""Asset assigned to the 'database' pool."""
context.log.info("Querying customers table...")
time.sleep(5) # Simulate database query
return {"count": 1000}
@dg.asset(pool="database")
def query_orders(context: dg.AssetExecutionContext):
"""Asset assigned to the 'database' pool."""
context.log.info("Querying orders table...")
time.sleep(5) # Simulate database query
return {"count": 5000}
@dg.asset(pool="api")
def fetch_external_data(context: dg.AssetExecutionContext):
"""Asset assigned to the 'api' pool."""
context.log.info("Fetching from external API...")
time.sleep(3) # Simulate API call
return {"status": "success"}
Configure pool limits in your deployment settings (dagster.yaml or Dagster+ settings):
concurrency:
pools:
default_limit: 5
Or set specific pool limits via CLI: dagster instance concurrency set database 2
| Concurrency pools | |
|---|---|
| Scope | Across all runs in the deployment |
| Granularity | Per asset/op with same pool name |
| Configuration | Pool name on asset + limit in deployment config |
| Best for | Database connections, API rate limits |
Solution 2: Executor concurrency (within-run limits)
Executor concurrency limits how many ops execute simultaneously within a single run. This controls parallelism without affecting other runs.
import time
import dagster as dg
@dg.asset
def asset_a(context: dg.AssetExecutionContext):
context.log.info("Processing asset A...")
time.sleep(2)
@dg.asset
def asset_b(context: dg.AssetExecutionContext):
context.log.info("Processing asset B...")
time.sleep(2)
@dg.asset
def asset_c(context: dg.AssetExecutionContext):
context.log.info("Processing asset C...")
time.sleep(2)
@dg.asset
def asset_d(context: dg.AssetExecutionContext):
context.log.info("Processing asset D...")
time.sleep(2)
# Limit concurrent ops within a single run to 2
limited_job = dg.define_asset_job(
name="limited_concurrency_job",
selection=[asset_a, asset_b, asset_c, asset_d],
executor_def=dg.multiprocess_executor.configured({"max_concurrent": 2}),
)
| Executor concurrency | |
|---|---|
| Scope | Within a single run |
| Granularity | All ops in the run |
| Configuration | max_concurrent on executor |
| Best for | Memory constraints, CPU limits |
Solution 3: Tag-based concurrency (grouped limits)
Tag-based concurrency limits ops with specific tags, allowing fine-grained control within a run. Multiple ops can run in parallel, but only N with a given tag.
import time
import dagster as dg
@dg.asset(op_tags={"database": "warehouse"})
def warehouse_sync(context: dg.AssetExecutionContext):
"""Tagged asset for tag-based concurrency control."""
context.log.info("Syncing to warehouse...")
time.sleep(10)
@dg.asset(op_tags={"database": "warehouse"})
def warehouse_aggregate(context: dg.AssetExecutionContext):
"""Another asset with same tag for concurrency grouping."""
context.log.info("Aggregating warehouse data...")
time.sleep(10)
# Job with tag concurrency limits
warehouse_job = dg.define_asset_job(
name="warehouse_job",
selection=[warehouse_sync, warehouse_aggregate],
executor_def=dg.multiprocess_executor.configured(
{
"max_concurrent": 4,
"tag_concurrency_limits": [{"key": "database", "value": "warehouse", "limit": 1}],
}
),
)
| Tag-based concurrency | |
|---|---|
| Scope | Within a single run |
| Granularity | Ops with matching tag key/value |
| Configuration | tag_concurrency_limits on executor |
| Best for | Mixed workloads with different resource needs |
Solution 4: Run queue limits (deployment-wide)
Run queue limits control how many runs can be in progress simultaneously across your entire deployment. Configure in deployment settings:
concurrency:
runs:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: 'dagster/backfill'
limit: 3
| Run queue limits | |
|---|---|
| Scope | Entire deployment |
| Granularity | Whole runs |
| Configuration | Deployment settings (dagster.yaml) |
| Best for | Overall system capacity, backfill management |
When to use each approach
| Control type | Scope | Protects against | Example use case |
|---|---|---|---|
| Concurrency pools | Cross-run | Overloading shared resources | Database connection limits |
| Executor limits | Single run | Memory/CPU exhaustion | Large data transformations |
| Tag concurrency | Single run | Resource contention by category | Mixed DB + API workloads |
| Run queue limits | Deployment | Too many simultaneous runs | Backfill throttling |
Use concurrency pools when:
- You need to protect a shared resource (database, API) across all runs
- Multiple runs might simultaneously access the same external system
- You want ops to queue until the resource is available
Use executor limits when:
- You want to control parallelism within a single run
- Memory or CPU constraints limit how many ops can run at once
- You don't need cross-run coordination
Use tag-based limits when:
- Different ops in the same run have different resource requirements
- You want some ops to run in parallel while limiting others
- You need category-based throttling within a run
Use run queue limits when:
- You need to limit total runs across your deployment
- Backfills or sensors might launch many runs at once
- Your infrastructure has fixed capacity for concurrent runs
Combining approaches
These mechanisms can be combined. For example:
- Pools + executor limits: Protect external resources while also limiting local parallelism
- Run queue + pools: Limit total runs AND protect specific resources within those runs
- Tag limits + pools: Fine-grained control within runs plus cross-run protection
concurrency:
runs:
max_concurrent_runs: 10
pools:
default_limit: 3
This configuration allows up to 10 concurrent runs, with at most 3 ops per pool executing across all runs.