Declarative Workflow Design (DAGs)
Hatchet workflows are designed in a Directed Acyclic Graph (DAG) format, where each task is a node in the graph, and the dependencies between tasks are the edges. This structure ensures that workflows are organized, predictable, and free from circular dependencies. By defining the sequence and dependencies of tasks upfront, you can easily understand the actual runtime state as compared to the expected state when debugging or troubleshooting.
Defining a Workflow
Start by declaring a workflow with a name. The workflow object can declare additional workflow-level configuration options which we’ll cover later.
The returned object is an instance of the Workflow
class, which is the primary interface for interacting with the workflow (i.e. running, enqueuing, scheduling, etc).
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
simple = hatchet.workflow(name="SimpleWorkflow")
Defining a Task
Now that we have a workflow, we can define a task to be executed as part of the workflow. Tasks are defined by calling the task
method on the workflow object.
The task
method takes a name and a function that defines the task’s behavior. The function will receive the workflow’s input and return the task’s output. Tasks also accept a number of other configuration options, which are covered elsewhere in our documentation.
In Python, the task
method is a decorator, which is used like this to wrap a function:
@simple.task()
def task_1(input: EmptyModel, ctx: Context) -> None:
print("executed task_1")
The function takes two arguments: input
, which is a Pydantic model, and ctx
, which is the Hatchet Context
object. We’ll discuss both of these more later.
In the internals of Hatchet, the task is called using positional arguments, meaning that you can name input
and ctx
whatever you like.
For instance, def task_1(foo: EmptyModel, bar: Context) -> None:
is perfectly valid.
Building a DAG with Task Dependencies
The power of Hatchet’s workflow design comes from connecting tasks into a DAG structure. Tasks can specify dependencies (parents) which must complete successfully before the task can start.
@simple.task()
def first_task(input: EmptyModel, ctx: Context) -> dict:
return {"result": "Hello World"}
@simple.task(parents=[first_task])
def second_task(input: EmptyModel, ctx: Context) -> dict:
# Access output from parent task
first_result = ctx.get_parent_output(first_task)
print(f"First task said: {first_result['result']}")
return {"final_result": "Completed"}
Accessing Parent Task Outputs
As shown in the examples above, tasks can access outputs from their parent tasks using the context object:
# Inside a task with parent dependencies
parent_output = ctx.get_parent_output(parent_task_name)
Running a Workflow
You can run workflows directly or enqueue them for asynchronous execution. All the same methods for running a task are available for workflows!
# Run workflow and wait for the result
result = simple.run(input_data)
# Enqueue workflow to be executed asynchronously
run_id = simple.run_no_wait(input_data)