SDK Reference
Python SDK
Workflow

Workflow Configuration

To create a workflow, simply create a new class and use the hatchet.workflow and hatchet.step decorators to define the structure of your workflow. For example, a simple 2-step workflow would look like:

from hatchet_sdk import Hatchet
 
hatchet = Hatchet()
 
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass
 
    @hatchet.step(parents=["step1"])
    def step2(self, context):
        print("executed step2")
        pass

You'll notice that the workflow defines a workflow trigger (in this case, on_events), and the workflow definition. The workflow definition is a series of steps, which can be defined using the hatchet.step decorator. Each step must be a method on the class, and must accept a context argument. The context argument is a Context object, which contains information about the workflow, such as the input data and the output data of previous steps.

To create multi-step workflows, you can use parents to define the steps which the current step depends on.

Workflow Triggers

You can define the following automatic triggers for workflows:

Retrieving Workflow Input Data

You can get access to the workflow's input data, such as the event data or other specified input data, by using the context.workflow_input() method on the context. For example, given the following event:

{
  "name": "test"
}

You can get access to the event data by doing the following:

@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context : Context):
        print("executed step1", context.workflow_input())
        pass

Step Outputs

Step outputs should be a dict and are optional. For example:

@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context : Context):
        return {
            "step-1-output": "test"
        }

Future steps can access this output by calling context.step_output("<step>"). In this example, a future step could access this data via context.step_output("step1").

Sync vs Async Steps

You can define async steps by using the async keyword on the step method. For example:

@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    async def step1(self, context : Context):
        print("executed step1", context.workflow_input())
        await asyncio.sleep(1)
        pass

When steps are run with async, they will be executed in the main thread using the default asyncio event loop. If an asyncio event loop is not set when the worker is started with worker.start, one will be created. If you are using async steps, make sure you are not blocking the event loop. If you are running blocking code, decorating the method with only def will execute it in a separate thread. You can also call asyncio.run within your def method if you're in need of using async/await within a def method, but please ensure that you are not using a third-party dependency which expects a shared event loop.

As a quick rule of thumb:

  • If you are using async/await, use async def and ensure you are not blocking the event loop.
  • If you are running blocking code, use def and ensure you are not using third-party dependencies that expect a shared event loop.

Timeouts

The default timeout on Hatchet is 60 seconds per step run.

You can declare a timeout for a step by passing timeout to the hatchet.step decorator. Timeouts are strings in the format of 1h, 1m, 1s, etc. For example, to timeout a step after 5 minutes, you can do the following:

@hatchet.step(timeout="5m")
def step1(self, context):
    print("executed step1")
    pass

Termination, Sleeps and Threads

Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call time.sleep within a step. Consider using an async method with await asyncio.sleep instead.

You can also determine whether to exit the thread by calling context.done() within a step, which returns true if the step has been cancelled. For example:

@hatchet.step(timeout="2s")
def step1(self, context):
    while True:
        # this step will gracefully exit after 2 seconds
        if context.done():
            break
        pass

If you need control over cancellation, you can also use context.cancel() to cancel the current step, though this is not recommended.