Skip to main content

Concurrency controls

In this example, we'll explore different approaches to control concurrency in Dagster. When your pipelines interact with rate-limited APIs, shared databases, or resource-constrained systems, you need to limit how many operations execute simultaneously. Dagster provides several mechanisms at different levels of granularity.

Problem: Preventing resource overload

Imagine you have assets that query a database with limited connection pools, call rate-limited APIs, or consume significant memory. Running too many of these operations simultaneously can cause failures, throttling, or performance degradation.

The key question is: At what level should you limit concurrency—across all runs, within a single run, or for specific resources?

Solution 1: Concurrency pools (cross-run limits)

Concurrency pools limit how many assets or ops with the same pool can execute simultaneously across all runs. This is ideal for protecting shared resources like databases or APIs.

src/project_mini/defs/concurrency_controls/pool_concurrency.py
import time

import dagster as dg


@dg.asset(pool="database")
def query_customers(context: dg.AssetExecutionContext):
"""Asset assigned to the 'database' pool."""
context.log.info("Querying customers table...")
time.sleep(5) # Simulate database query
return {"count": 1000}


@dg.asset(pool="database")
def query_orders(context: dg.AssetExecutionContext):
"""Asset assigned to the 'database' pool."""
context.log.info("Querying orders table...")
time.sleep(5) # Simulate database query
return {"count": 5000}


@dg.asset(pool="api")
def fetch_external_data(context: dg.AssetExecutionContext):
"""Asset assigned to the 'api' pool."""
context.log.info("Fetching from external API...")
time.sleep(3) # Simulate API call
return {"status": "success"}

Configure pool limits in your deployment settings (dagster.yaml or Dagster+ settings):

concurrency:
pools:
default_limit: 5

Or set specific pool limits via CLI: dagster instance concurrency set database 2

Concurrency pools
ScopeAcross all runs in the deployment
GranularityPer asset/op with same pool name
ConfigurationPool name on asset + limit in deployment config
Best forDatabase connections, API rate limits

Solution 2: Executor concurrency (within-run limits)

Executor concurrency limits how many ops execute simultaneously within a single run. This controls parallelism without affecting other runs.

src/project_mini/defs/concurrency_controls/executor_concurrency.py
import time

import dagster as dg


@dg.asset
def asset_a(context: dg.AssetExecutionContext):
context.log.info("Processing asset A...")
time.sleep(2)


@dg.asset
def asset_b(context: dg.AssetExecutionContext):
context.log.info("Processing asset B...")
time.sleep(2)


@dg.asset
def asset_c(context: dg.AssetExecutionContext):
context.log.info("Processing asset C...")
time.sleep(2)


@dg.asset
def asset_d(context: dg.AssetExecutionContext):
context.log.info("Processing asset D...")
time.sleep(2)


# Limit concurrent ops within a single run to 2
limited_job = dg.define_asset_job(
name="limited_concurrency_job",
selection=[asset_a, asset_b, asset_c, asset_d],
executor_def=dg.multiprocess_executor.configured({"max_concurrent": 2}),
)
Executor concurrency
ScopeWithin a single run
GranularityAll ops in the run
Configurationmax_concurrent on executor
Best forMemory constraints, CPU limits

Solution 3: Tag-based concurrency (grouped limits)

Tag-based concurrency limits ops with specific tags, allowing fine-grained control within a run. Multiple ops can run in parallel, but only N with a given tag.

src/project_mini/defs/concurrency_controls/tag_concurrency.py
import time

import dagster as dg


@dg.asset(op_tags={"database": "warehouse"})
def warehouse_sync(context: dg.AssetExecutionContext):
"""Tagged asset for tag-based concurrency control."""
context.log.info("Syncing to warehouse...")
time.sleep(10)


@dg.asset(op_tags={"database": "warehouse"})
def warehouse_aggregate(context: dg.AssetExecutionContext):
"""Another asset with same tag for concurrency grouping."""
context.log.info("Aggregating warehouse data...")
time.sleep(10)


# Job with tag concurrency limits
warehouse_job = dg.define_asset_job(
name="warehouse_job",
selection=[warehouse_sync, warehouse_aggregate],
executor_def=dg.multiprocess_executor.configured(
{
"max_concurrent": 4,
"tag_concurrency_limits": [{"key": "database", "value": "warehouse", "limit": 1}],
}
),
)
Tag-based concurrency
ScopeWithin a single run
GranularityOps with matching tag key/value
Configurationtag_concurrency_limits on executor
Best forMixed workloads with different resource needs

Solution 4: Run queue limits (deployment-wide)

Run queue limits control how many runs can be in progress simultaneously across your entire deployment. Configure in deployment settings:

concurrency:
runs:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: 'dagster/backfill'
limit: 3
Run queue limits
ScopeEntire deployment
GranularityWhole runs
ConfigurationDeployment settings (dagster.yaml)
Best forOverall system capacity, backfill management

When to use each approach

Control typeScopeProtects againstExample use case
Concurrency poolsCross-runOverloading shared resourcesDatabase connection limits
Executor limitsSingle runMemory/CPU exhaustionLarge data transformations
Tag concurrencySingle runResource contention by categoryMixed DB + API workloads
Run queue limitsDeploymentToo many simultaneous runsBackfill throttling

Use concurrency pools when:

  • You need to protect a shared resource (database, API) across all runs
  • Multiple runs might simultaneously access the same external system
  • You want ops to queue until the resource is available

Use executor limits when:

  • You want to control parallelism within a single run
  • Memory or CPU constraints limit how many ops can run at once
  • You don't need cross-run coordination

Use tag-based limits when:

  • Different ops in the same run have different resource requirements
  • You want some ops to run in parallel while limiting others
  • You need category-based throttling within a run

Use run queue limits when:

  • You need to limit total runs across your deployment
  • Backfills or sensors might launch many runs at once
  • Your infrastructure has fixed capacity for concurrent runs

Combining approaches

These mechanisms can be combined. For example:

  • Pools + executor limits: Protect external resources while also limiting local parallelism
  • Run queue + pools: Limit total runs AND protect specific resources within those runs
  • Tag limits + pools: Fine-grained control within runs plus cross-run protection
concurrency:
runs:
max_concurrent_runs: 10
pools:
default_limit: 3

This configuration allows up to 10 concurrent runs, with at most 3 ops per pool executing across all runs.