User Guide
Features
Streaming

Streaming in Hatchet

Hatchet offers real-time result streaming of data from a background worker allowing you to subscribe to workflow progress, relay individual step results, and send data from an inprogress step run. This feature enables you to provide real-time updates and progress to users as the workflow runs, enhancing the user experience and interactivity of your application.

How It Works

When a workflow is triggered, Hatchet generates events for each step of the workflow, including the step's status, output, and relevant metadata. These events are generated by the background workers executing the workflow steps. By subscribing to this event stream, you can capture these events in real-time and, optionally, forward them to the frontend client.

Here's a high-level overview of the real-time progress streaming process:

  1. The client triggers a workflow by sending a request to the backend API.
  2. The backend API initiates the workflow using the Hatchet SDK and obtains a unique workflowRunId.
  3. The backend API returns the workflowRunId to the client as a reference.
  4. The client establishes a connection to a dedicated endpoint on the backend API, passing the workflowRunId as a parameter.
  5. The backend API subscribes to the Hatchet event stream for the specified workflowRunId.
  6. As the background workers execute the workflow steps, Hatchet generates events for each step, which are captured by the backend API through the event stream.
  7. The backend API processes the events, extracts relevant information, and sends the data to the client in real-time using a streaming response.
  8. The client receives the streamed data and updates the user interface accordingly, providing real-time progress updates.

If the connection is lost (e.g., page reload or transient network failure), the client can reconnect to the same endpoint and resume receiving new real-time updates by re-establishing the stream at step 4.

Listeners

Listeners are used to subscribe to the event stream for a specific workflow run. They are asynchronous generators that yield events as they are received from the event stream. You can filter and transform the event data before sending it to the client. Listeners generate events for Workflow run events, Step run events, and Step stream events.

Here's an example of how to create a listener:

async def listen_for_files():
    workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
    listener = hatchet.listener.stream(workflowRunId)
 
    async for event in listener:
        # Filter and transform event data here
        data = json.dumps({
            "type": event.type,
            "messageId": workflowRunId
        })
        print("data: " + data + "\n\n")
 

Streaming from a Step Context

You can also stream events from a specific step context, enabling you to stream arbitrary events, progress, intermediate inference, or debugging information from a step.

@hatchet.step()
def step1(self, context: Context):
    # Stream some data from the step context
    context.put_stream('hello from step1')
    # continue with the step run...
    return {"step1": "results"}

Streaming Files

Hatchet supports streaming base64 encoded files as part of the event payload, allowing you to transfer small to medium-sized files (under 4 MB) between the backend and frontend without waiting for a step result. For large files, consider using a file storage service and streaming the file URLs instead.

To stream a file from a step context, encode the file data as base64 and stream it as a payload:

@hatchet.step()
def step1(self, context: Context):
    # Get the directory of the current script
    script_dir = os.path.dirname(os.path.abspath(__file__))
    
    # Construct the path to the image file relative to the script's directory
    image_path = os.path.join(script_dir, "image.jpeg")
    
    # Load the image file
    with open(image_path, "rb") as image_file:
        image_data = image_file.read()
    
    # Encode the image data as base64
    base64_image = base64.b64encode(image_data).decode('utf-8')
    
    # Stream the base64-encoded image data
    context.put_stream(base64_image)
 
    # continue with the step run...
    return {"step1": "results"}
 

Consuming Streams on Frontend

To consume a stream from the backend, create a Streaming Response endpoint to "proxy" the stream from the Hatchet workflow run.

First, write a generator to filter and transform the event data before sending it to the client:

def event_stream_generator(workflowRunId):
    ''' This helper function is a generator that yields events from the Hatchet event stream. '''
    stream = hatchet.client.listener.stream(workflowRunId)
    for event in stream:
        ''' you can filter and transform event data here that will be sent to the client'''
        if event.type == "step_completed":
            data = json.dumps({
                "type": event.type,
                "payload": event.payload,
                "messageId": workflowRunId
            })
            yield "data: " + data + "\n\n"

Next, create a streaming GET endpoint that the client connects to in order to receive real-time progress updates:

@app.get("/message/{messageId}")
async def stream(messageId: str):
    ''' in a normal application you might use the message id to look up a workflowRunId
    for this simple case, we have no persistence and just use the message id as the workflowRunId
    you might also consider looking up the workflowRunId in a database and returning the results
    if the message has already been processed '''
    workflowRunId = messageId
    return StreamingResponse(event_stream_generator(workflowRunId), media_type='text/event-stream')

Related Tutorials