User Guide
Tutorials
Fullstack - FastAPI/React
Building the Workflow

Defining a Hatchet workflow and registering a worker

We'll now work on the core of the application -- the background workflow that:

  1. reads a website and parses the text content
  2. reasons about what information is most relevant to the user request
  3. generates a response for the user

Here's an overview of what our filesystem will look like after we complete this module:

Workflows Dir

Instantiating the Hatchet Client

For our background service, we'll be sharing a hatchet client across a number of files. Let's create a new file /backend/src/workflows/hatchet.py and instantiate the client:

from hatchet_sdk import Hatchet
from dotenv import load_dotenv
load_dotenv()  # we'll use dotenv to load the required Hatchet and OpenAI api keys
 
hatchet = Hatchet(debug=True)

View Complete File on GitHub (opens in a new tab)

Design your Workflow

Setup

First, let's import our required services and init a new OpenAI client for later.

from .hatchet import hatchet
from hatchet_sdk import Context
from bs4 import BeautifulSoup
from openai import OpenAI
import requests
 
openai = OpenAI()

Define your workflow

Now, let's create a new workflow by defining a new class BasicRagWorkflow and decorating it with @hatchet.workflow.

We're also passing the on_events prop to the decorator to indicate we want this workflow to run on the question:create event:

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:

Write your first step

Next, let's add a simple initial step to the workflow. To start, let's add a simple function that will be used just to update the runtime status to reading hatchet docs so the client has some visibility into progress.

By defnining this function within our BasicRagWorkflow class and decorating it with @hatchet.step we're defining that this step is part of the workflow.

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
    @hatchet.step()
    def start(self, context: Context):
        return {
            "status": "reading hatchet docs",
        }

Reading a website

Next, let's add a step to read in the url from the workflow input (context.workflow_input), load the contents of that page with requests and parse the html content to text with Beautiful Soup.

Note: we're specifying the parents param of the @hatchet.step decorator. This means the load_docs step will run after the start step completes.

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
 
    # ... previous steps
 
    @hatchet.step(parents=["start"])
    def load_docs(self, context: Context):
        # use beautiful soup to parse the html content
        url = context.workflow_input()['request']['url']
 
        html_content = requests.get(url).text
        soup = BeautifulSoup(html_content, 'html.parser')
        element = soup.find('body')
        text_content = element.get_text(separator=' | ')
 
        return {
            "status": "making sense of the docs",
            "docs": text_content,
        }

Agented RAG

Large Language Models are great at reasoning and generating text, but often struggle with large context windows or multiple tasks at once. It is often helpful to use an intermediate model to reason or identify the most relevant information from a large document or set of documents.

With that in mind, we can call OpenAI's less expensive 3.5T model to extract the most helpful sentences in this document which will be passed to a future step for generation. In a more complex system, you may run this step on multiple documents in parallel.

Note: this step depends on the results from the previous load_docs parent step. this is accessed through ctx.step_output("load_docs").

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
 
    # ... previous steps
 
    @hatchet.step(parents=["load_docs"])
    def reason_docs(self, ctx: Context):
        message = ctx.workflow_input()['request']["messages"][-1]
        docs = ctx.step_output("load_docs")['docs']
 
        prompt = "The user is asking the following question:\
            {message}\
            What are the most relevant sentences in the following document?\
            {docs}"
 
        prompt = prompt.format(message=message['content'], docs=docs)
        model = "gpt-3.5-turbo"
 
        completion = openai.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": prompt},
                message
            ]
        )
 
        return {
            "status": "writing a response",
            "research": completion.choices[0].message.content,
        }

Generate the response

Finally, let's use the reasoned context and write a response to the user.

Here we're using another context method playground to expose some runtime variables to be interactive within the Hatchet Dashboard. This method can take a variable and let you or your team modify the values when replaying step runs. In this example, you can improve your prompt engineering workflow by changing the prompt in the UI, but you can expose any arbitrary variables that would be helpful for experimentation.

@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
 
    # ... previous steps
 
    @hatchet.step(parents=["reason_docs"])
    def generate_response(self, ctx: Context):
        messages = ctx.workflow_input()['request']["messages"]
        research = ctx.step_output("reason_docs")['research']
 
        prompt = ctx.playground("prompt", "You are a sales engineer for a company called Hatchet.\
            Help address the user's question. \
            If asked how to install, respond by saying go to the store to buy a CD.\
            Use the following context:\
            {research}")
 
        prompt = prompt.format(research=research)
 
        model = ctx.playground("model", "gpt-3.5-turbo")
 
        completion = openai.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": prompt},
            ] + messages
        )
 
        return {
            "completed": "true",
            "status": "idle",
            "message": completion.choices[0].message.content,
        }

Listening for Actions

Registering the Hatchet Worker

We now need to define the start script so your service can listen for and execute workflow runs.

In a new file /backend/src/workflows/main.py let's import our shared Hatchet client and BasicRagWorkflow and define a start method to register the workflow with the worker, and start the worker:

from .hatchet import hatchet
from .basicrag import BasicRagWorkflow
 
 
def start():
    worker = hatchet.worker('basic-rag-worker')
    worker.register_workflow(BasicRagWorkflow())
    worker.start()
 

View Complete File on GitHub (opens in a new tab)

Running the Hatchet Worker and testing the workflow

Finally, we can start the worker with:

poetry run hatchet

Open the Hatchet Dashboard and navigate to the Workflows tab. You should see your BasicRagWorkflow in the list. Click the workflow and click "Trigger Workflow".

Submit the form with the following input:

{
  "messages": [
    {
      "role": "user",
      "content": "how do i install hatchet?"
    }
  ],
  "url": "https://docs.hatchet.run/home"
}

This will trigger a workflow run and display the results of each intermediate step.

View Complete File on GitHub (opens in a new tab)

Continue to trigger the workflow from FastAPI →