We use cookies

We use cookies to ensure you get the best experience on our website. For more information on how we use cookies, please see our cookie policy.

By clicking "Accept", you agree to our use of cookies.
Learn more.

User GuideDirected Acyclic Graphs (DAGs)

Declarative Workflow Design (DAGs)

Hatchet workflows are designed in a Directed Acyclic Graph (DAG) format, where each task is a node in the graph, and the dependencies between tasks are the edges. This structure ensures that workflows are organized, predictable, and free from circular dependencies. By defining the sequence and dependencies of tasks upfront, you can easily understand the actual runtime state as compared to the expected state when debugging or troubleshooting.

Defining a Workflow

Start by declaring a workflow with a name. The workflow object can declare additional workflow-level configuration options which we’ll cover later.

The returned object is an instance of the Workflow class, which is the primary interface for interacting with the workflow (i.e. running, enqueuing, scheduling, etc).

from hatchet_sdk import Context, EmptyModel, Hatchet
 
hatchet = Hatchet(debug=True)
 
simple = hatchet.workflow(name="SimpleWorkflow")

Defining a Task

Now that we have a workflow, we can define a task to be executed as part of the workflow. Tasks are defined by calling the task method on the workflow object.

The task method takes a name and a function that defines the task’s behavior. The function will receive the workflow’s input and return the task’s output. Tasks also accept a number of other configuration options, which are covered elsewhere in our documentation.

In Python, the task method is a decorator, which is used like this to wrap a function:

@simple.task()
def task_1(input: EmptyModel, ctx: Context) -> None:
    print("executed task_1")

The function takes two arguments: input, which is a Pydantic model, and ctx, which is the Hatchet Context object. We’ll discuss both of these more later.

In the internals of Hatchet, the task is called using positional arguments, meaning that you can name input and ctx whatever you like.

For instance, def task_1(foo: EmptyModel, bar: Context) -> None: is perfectly valid.

Building a DAG with Task Dependencies

The power of Hatchet’s workflow design comes from connecting tasks into a DAG structure. Tasks can specify dependencies (parents) which must complete successfully before the task can start.

@simple.task()
def first_task(input: EmptyModel, ctx: Context) -> dict:
    return {"result": "Hello World"}
 
@simple.task(parents=[first_task])
def second_task(input: EmptyModel, ctx: Context) -> dict:
    # Access output from parent task
    first_result = ctx.get_parent_output(first_task)
    print(f"First task said: {first_result['result']}")
    return {"final_result": "Completed"}

Accessing Parent Task Outputs

As shown in the examples above, tasks can access outputs from their parent tasks using the context object:

# Inside a task with parent dependencies
parent_output = ctx.get_parent_output(parent_task_name)

Running a Workflow

You can run workflows directly or enqueue them for asynchronous execution. All the same methods for running a task are available for workflows!

# Run workflow and wait for the result
result = simple.run(input_data)
 
# Enqueue workflow to be executed asynchronously
run_id = simple.run_no_wait(input_data)