Concurrency Limits and Fairness
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting a concurrency limit on the workflow.
For example, the following workflow will only allow 5 concurrent executions for any workflow execution of ConcurrencyDemoWorkflow
:
from hatchet_sdk import ConcurrencyLimitStrategy
@hatchet.workflow(
on_events=["concurrency-test"],
concurrency=Concurrency(
max_runs=5,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
expression="'default'",
),
)
class ConcurrencyDemoWorkflow:
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
Use-Case: Enforcing Per-User Concurrency Limits
You can use the custom CEL expression to enforce per-user concurrency limits. CEL expressions are evaluated on the Hatchet engine (not in your worker) and can be used to extract values from the workflow input or additional metadata.
You can distribute workflows fairly between tenants using the GROUP_ROUND_ROBIN
option for limit_strategy
. This will ensure that each distinct group gets a fair share of the concurrency limit.
For example, let's say 5 workflows got queued in quick succession for keys A
, B
, and C
:
A, A, A, A, A, B, B, B, B, B, C, C, C, C, C
If there is a maximum of 2 concurrent executions, the execution order will be:
A, B, C, A, B, C, A, B, C, A, B, C, A, B, C
This can be set in the concurrency
configuration as follows:
@hatchet.workflow(
on_events=["concurrency-test"],
concurrency=Concurrency(
max_runs=1,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
expression="input.user_id",
),
)
class ConcurrencyDemoWorkflow:
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
This same approach can be used for:
- Setting concurrency for a specific user session by
session_id
(i.e. multiple chat messages sent) - Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.
For more advanced use cases, you can set a key function that runs on your worker which can access external data sources or perform more complex transformations.
Use-Case: Cancelling In-Progress Workflows
You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered:
@hatchet.workflow(
on_events=["concurrency-test"],
concurrency=Concurrency(
max_runs=1,
limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
),
)
class ConcurrencyDemoWorkflow:
@hatchet.step()
def step1(self, context):
print("executed step1")
pass