Concurrency Limits and Fairness
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by decorating a custom function with hatchet.concurrency
. This function returns a concurrency group key, which is a string that is used to group concurrent executions. Note that this function should not also be used as a hatchet.step
. For example, the following workflow will only allow 5 concurrent executions for any workflow execution of ConcurrencyDemoWorkflow
, since the key is statically set to concurrency-key
:
from hatchet_sdk import ConcurrencyLimitStrategy
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
@hatchet.concurrency(max_runs=5, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
def concurrency(self, context) -> str:
return "concurrency-key"
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
Use-Case: Enforcing Per-User Concurrency Limits
You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
@hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
def concurrency(self, context) -> str:
return context.workflow_input()["user_id"]
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
This same approach can be used for:
- Setting concurrency for a specific user session by
session_id
(i.e. multiple chat messages sent) - Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.
Use-Case: Cancelling In-Progress Workflows
You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered:
@hatchet.workflow(on_events=["concurrency-test"])
class Concurrency
@hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS)
def concurrency(self, context) -> str:
return context.workflow_input()["user_id"]
@hatchet.step()
def step1(self, context):
print("executed step1")
pass