SDK Reference
Python SDK
Concurrency and Fairness

Concurrency Limits and Fairness

By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting a concurrency limit on the workflow.

For example, the following workflow will only allow 5 concurrent executions for any workflow execution of ConcurrencyDemoWorkflow:

from hatchet_sdk import ConcurrencyLimitStrategy
 
@hatchet.workflow(
    on_events=["concurrency-test"],
    concurrency=Concurrency(
        max_runs=5,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
        expression="'default'",
    ),
)
class ConcurrencyDemoWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

Use-Case: Enforcing Per-User Concurrency Limits

You can use the custom CEL expression to enforce per-user concurrency limits. CEL expressions are evaluated on the Hatchet engine (not in your worker) and can be used to extract values from the workflow input or additional metadata.

You can distribute workflows fairly between tenants using the GROUP_ROUND_ROBIN option for limit_strategy. This will ensure that each distinct group gets a fair share of the concurrency limit.

For example, let's say 5 workflows got queued in quick succession for keys A, B, and C:

A, A, A, A, A, B, B, B, B, B, C, C, C, C, C

If there is a maximum of 2 concurrent executions, the execution order will be:

A, B, C, A, B, C, A, B, C, A, B, C, A, B, C

This can be set in the concurrency configuration as follows:

@hatchet.workflow(
    on_events=["concurrency-test"],
    concurrency=Concurrency(
        max_runs=1,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
        expression="input.user_id",
    ),
)
class ConcurrencyDemoWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

This same approach can be used for:

  • Setting concurrency for a specific user session by session_id (i.e. multiple chat messages sent)
  • Limiting data or document ingestion by setting an input hash or on-file key.
  • Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.

For more advanced use cases, you can set a key function that runs on your worker which can access external data sources or perform more complex transformations.

Use-Case: Cancelling In-Progress Workflows

You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered:

@hatchet.workflow(
    on_events=["concurrency-test"],
    concurrency=Concurrency(
        max_runs=1,
        limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
    ),
)
class ConcurrencyDemoWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass