Workflow Configuration
To create a workflow, simply create a new class and use the hatchet.workflow
and hatchet.step
decorators to define the structure of your workflow. For example, a simple 2-step workflow would look like:
from hatchet_sdk import Hatchet
hatchet = Hatchet()
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
@hatchet.step(parents=["step1"])
def step2(self, context):
print("executed step2")
pass
You'll notice that the workflow defines a workflow trigger (in this case, on_events
), and the workflow definition. The workflow definition is a series of steps, which can be defined using the hatchet.step
decorator. Each step must be a method on the class, and must accept a context
argument. The context
argument is a Context
object, which contains information about the workflow, such as the input data and the output data of previous steps.
To create multi-step workflows, you can use parents
to define the steps which the current step depends on.
Workflow Triggers
You can define the following automatic triggers for workflows:
on_events
: Trigger the workflow when a specific event is sent to the Hatchet API. See the documentation for running workflows via events for more information.on_crons
: Trigger the workflow on a cron schedule. See the documentation for running workflows via cron schedules for more information.
Retrieving Workflow Input Data
You can get access to the workflow's input data, such as the event data or other specified input data, by using the context.workflow_input()
method on the context
. For example, given the following event:
{
"name": "test"
}
You can get access to the event data by doing the following:
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step()
def step1(self, context : Context):
print("executed step1", context.workflow_input())
pass
Step Outputs
Step outputs should be a dict
and are optional. For example:
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step()
def step1(self, context : Context):
return {
"step-1-output": "test"
}
Future steps can access this output by calling context.step_output("<step>")
. In this example, a future step could access this data via context.step_output("step1")
.
Sync vs Async Steps
You can define async steps by using the async
keyword on the step method. For example:
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step()
async def step1(self, context : Context):
print("executed step1", context.workflow_input())
await asyncio.sleep(1)
pass
When steps are run with async
, they will be executed in the main thread using the default asyncio event loop. If an asyncio event loop is not set when the worker is started with worker.start
, one will be created. If you are using async
steps, make sure you are not blocking the event loop. If you are running blocking code, decorating the method with only def
will execute it in a separate thread. You can also call asyncio.run
within your def
method if you're in need of using async/await
within a def
method, but please ensure that you are not using a third-party dependency which expects a shared event loop.
As a quick rule of thumb:
- If you are using
async/await
, useasync def
and ensure you are not blocking the event loop. - If you are running blocking code, use
def
and ensure you are not using third-party dependencies that expect a shared event loop.
Timeouts
The default timeout on Hatchet is 60 seconds per step run.
You can declare a timeout for a step by passing timeout
to the hatchet.step
decorator. Timeouts are strings in the format of 1h
, 1m
, 1s
, etc. For example, to timeout a step after 5 minutes, you can do the following:
@hatchet.step(timeout="5m")
def step1(self, context):
print("executed step1")
pass
Termination, Sleeps and Threads
Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call time.sleep
within a step. Consider using an async
method with await asyncio.sleep
instead.
You can also determine whether to exit the thread by calling context.done()
within a step, which returns true if the step has been cancelled. For example:
@hatchet.step(timeout="2s")
def step1(self, context):
while True:
# this step will gracefully exit after 2 seconds
if context.done():
break
pass
If you need control over cancellation, you can also use context.cancel()
to cancel the current step, though this is not recommended.