The CANCEL_IN_PROGRESS Concurrency Limit Strategy in Hatchet
Hatchet's CANCEL_IN_PROGRESS
concurrency limit strategy is a powerful tool for managing resource contention in your workflows. This strategy allows you to cancel currently running workflow instances to free up slots for new instances when the concurrency limit is reached.
How it works
When a workflow run finishes and the concurrency limit is reached for the workflow version and group key, the CANCEL_IN_PROGRESS strategy will:
- Fetch all currently running workflow runs for the given workflow version and concurrency key.
- Fetch the queued workflow runs for the same workflow version and group key, up to the maximum number of concurrent runs allowed.
- Cancel the oldest running workflow runs to make room for the queued runs, ensuring that there is space for all the queued runs to start executing.
- Start executing the queued workflow runs in the freed-up slots.
This strategy ensures that queued workflow runs can start executing as soon as possible, even if the concurrency limit is reached, by canceling older running instances.
When to use CANCEL_IN_PROGRESS
The CANCEL_IN_PROGRESS
strategy is particularly useful in scenarios where:
- You have long-running workflow instances that may become stale or irrelevant if newer instances are triggered.
- You want to prioritize processing the most recent data or events, even if it means canceling older workflow instances.
- You have resource-intensive workflows where it's more efficient to cancel an in-progress instance and start a new one than to wait for the old instance to complete.
- Your user UI allows for multiple inputs, but only the most recent is relevant (i.e. chat messages, form submissions, etc.).
However, it's important to note that canceling a workflow instance may leave your system in an inconsistent state if the canceled instance was in the middle of updating a resource. Make sure to design your workflows to handle cancellation gracefully and ensure data consistency.
How to use CANCEL_IN_PROGRESS
To use the CANCEL_IN_PROGRESS
concurrency limit strategy, define a concurrency
configuration in your workflow definition:
from hatchet_sdk import ConcurrencyLimitStrategy
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
@hatchet.concurrency(max_runs=10, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS)
def concurrency(self, context) -> str:
return context.workflow_input()["user_id"]
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
In this example:
maxRuns
sets the maximum number of concurrent instances allowed for this workflow.limitStrategy
is set toCANCEL_IN_PROGRESS
, indicating that when the concurrency limit is reached, currently running instances should be canceled to make room for new ones.key
is a function that takes the workflow context and returns a string key. This key is used to group workflow instances for the purpose of concurrency limiting and determining which instances to cancel. In this example, instances are grouped byuserId
, so when a new instance is triggered for a given user, any currently running instances for that same user will be canceled.
With this configuration, Hatchet will automatically manage your workflow's concurrency, canceling in-progress instances as needed to ensure that new instances can start executing immediately.
Remember to design your workflows to handle cancellation gracefully, and consider using checkpointing or idempotency techniques to ensure data consistency in the face of canceled instances.