User Guide
Working With Hatchet
Understanding Workflows

Workflows in Hatchet

Now that you understand the foundational Step concept, you can combine these foundational building blocks into a structured sequence of tasks, known as a workflow.

Workflows can be as simple as a single step or as complex as a multi-step process with branching logic and parallel execution. Hatchet offers a declarative-first approach, providing a clear, organized blueprint for task execution.

Declarative Workflow Design (DAGs)

Hatchet workflows are designed in a Directed Acyclic Graph (DAG) format, where each step is a node in the graph, and the dependencies between steps are the edges. This structure ensures that workflows are organized, predictable, and free from circular dependencies. By defining the sequence and dependencies of steps upfront, you can easily understand the actual runtime state as compared to the expected state when debugging or troubleshooting.

Example: A Simple DAG Workflow

Here's an example of a simpleg graphical DAG workflow in Hatchet, consisting of four steps:

Simple DAG

Each Step is represented as a node in the graph, and the arrows between the nodes represent the dependencies between the steps (left to right). In this example, start must complete before load_docs can start, load_docs must complete before reason_docs can start, and so on.

These relationships are defined in code by specifying required parents in the parent array of that step.

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
    @hatchet.step()
    def start(self, context: Context):
        return {
            "status": "starting...",
        }
    @hatchet.step()
    def load_docs(self, context: Context):
        # Load the relevant documents
        return {
            "status": "docs loaded",
            "docs": text_content,
        }
 
    @hatchet.step(parents=["load_docs"])
    def reason_docs(self, ctx: Context):
        docs = ctx.step_output("load_docs")['docs']
        # Reason about the relevant docs
 
        return {
            "status": "writing a response",
            "research": research,
        }
 
    @hatchet.step(parents=["reason_docs"])
    def generate_response(self, ctx: Context):
        docs = ctx.step_output("reason_docs")['research']
        # Reason about the relevant docs
 
        return {
            "status": "complete",
            "message": message,
        }
 

Accessing Workflow State Within a Step

In Hatchet, each step within a workflow has access to parent step outputs. This capability allows you to pass data between steps, enabling you to build complex workflows that respond dynamically to the outcomes of previous steps.

@hatchet.step(parents=["previous_step"])
def my_step(context: Context) -> dict:
    data = context.workflow_input()
    previous_step_data = ctx.step_output("previous_step")
    # Perform some operation
    return output

Next Steps: Understanding Workers

With a solid grasp of Hatchet's workflow capabilities, the next concept to explore is workers. Understanding workers is the last step to deploying and managing workflows with Hatchet.

Continue to Understanding Workers →