SDK Reference
Python SDK
Concurrency and Fairness

Concurrency Limits and Fairness

By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by decorating a custom function with hatchet.concurrency. This function returns a concurrency group key, which is a string that is used to group concurrent executions. Note that this function should not also be used as a hatchet.step. For example, the following workflow will only allow 5 concurrent executions for any workflow execution of ConcurrencyDemoWorkflow, since the key is statically set to concurrency-key:

from hatchet_sdk import ConcurrencyLimitStrategy
 
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
    @hatchet.concurrency(max_runs=5, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
    def concurrency(self, context) -> str:
        return "concurrency-key"
 
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

Use-Case: Enforcing Per-User Concurrency Limits

You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:

@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
    @hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
    def concurrency(self, context) -> str:
        return context.workflow_input()["user_id"]
 
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

This same approach can be used for:

  • Setting concurrency for a specific user session by session_id (i.e. multiple chat messages sent)
  • Limiting data or document ingestion by setting an input hash or on-file key.
  • Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.

Use-Case: Cancelling In-Progress Workflows

You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered:

@hatchet.workflow(on_events=["concurrency-test"])
class Concurrency
    @hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS)
    def concurrency(self, context) -> str:
        return context.workflow_input()["user_id"]
 
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass