Concurrency Limits and Fairness
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting a concurrency limit on the workflow.
For example, the following workflow will only allow 5 concurrent executions for any workflow execution of ConcurrencyDemoWorkflow
:
const workflow: Workflow = {
id: "concurrency-example",
description: "test",
on: {
event: "concurrency:create",
},
concurrency: {
expression: "'default'",
maxRuns: 1,
limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
},
steps: [...],
};
The argument limitStrategy
to the concurrency
configuration can be set to either CANCEL_IN_PROGRESS
(the default, documented above), or GROUP_ROUND_ROBIN
. See documentation for the GROUP_ROUND_ROBIN
strategy below.
Use-Case: Group Round Robin
You can distribute workflows fairly between tenants using the GROUP_ROUND_ROBIN
option for limitStrategy
. This will ensure that each distinct group gets a fair share of the concurrency limit. For example, let's say 5 workflows got queued in quick succession for keys A
, B
, and C
:
A, A, A, A, A, B, B, B, B, B, C, C, C, C, C
If there is a maximum of 2 concurrent executions, the execution order will be:
A, B, C, A, B, C, A, B, C, A, B, C, A, B, C
This can be set in the concurrency
configuration as follows:
const workflow: Workflow = {
id: 'concurrency-example-rr',
description: 'test',
on: {
event: 'concurrency:create',
},
concurrency: {
expression: "input.group",
maxRuns: 2,
limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
},
steps: [...],
};
This pattern can be used for:
- Setting concurrency for a specific user session by
session_id
(i.e. multiple chat messages sent) - Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.
For more advanced use cases, you can set a key function that runs on your worker which can access external data sources or perform more complex transformations.
Use-Case: Cancelling In-Progress Workflows
You can use the custom concurrency function to prioritize recent workflows over older ones using the CANCEL_IN_PROGRESS
strategy.
For example, the following workflow will only allow 1 concurrent execution per user:
const workflow: Workflow = {
id: "concurrency-example",
description: "test",
on: {
event: "concurrency:create",
},
concurrency: {
maxRuns: 1,
limitStrategy: ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS,
expression: "input.user_id",
},
steps: [
{
name: "step1",
run: async (ctx) => {
const { data } = ctx.workflowInput();
const { signal } = ctx.controller;
if (signal.aborted) throw new Error("step1 was aborted");
console.log("starting step1 and waiting 5 seconds...", data);
await sleep(5000);
if (signal.aborted) throw new Error("step1 was aborted");
// NOTE: the AbortController signal can be passed to many http libraries to cancel active requests
// fetch(url, { signal })
// axios.get(url, { signal })
console.log("executed step1!");
return { step1: `step1 results for ${data}!` };
},
},
{
name: "step2",
parents: ["step1"],
run: (ctx) => {
console.log(
"executed step2 after step1 returned ",
ctx.stepOutput("step1"),
);
return { step2: "step2 results!" };
},
},
],
};
NOTE: You must manage your own abort signaling to ensure that the workflow can be cancelled.