User Guide
Tutorials
Fullstack - FastAPI/React
Server Result Streaming

Implementing Real-time Progress Streaming with Hatchet and FastAPI

Create an event stream generator

Now, let's create an event stream generator function in /backend/src/api/main.py that subscribes to the Hatchet event stream for a specific workflow run and yields events:

def event_stream_generator(workflowRunId):
    ''' This helper function is a generator that yields events from the Hatchet event stream. '''
    stream = hatchet.client.listener.stream(workflowRunId)
    for event in stream:
        ''' you can filter and transform event data here that will be sent to the client'''
        if event.type == "step_completed":
            data = json.dumps({
                "type": event.type,
                "payload": event.payload,
                "messageId": workflowRunId
            })
            yield "data: " + data + "\n\n"

In this step, we create an event stream generator function that subscribes to the Hatchet event stream for a given workflowRunId. It yields events from the stream, allowing you to filter and transform the event data before sending it to the client. In this example, we filter the events to only include "step_completed" events. You can further filter for your needs, for example, you can specify which step results to stream or exclude certain data from the payload.

Create a streaming endpoint

Next, let's create a streaming GET endpoint that the client connects to in order to receive real-time progress updates:

@app.get("/message/{messageId}")
async def stream(messageId: str):
    ''' in a normal application you might use the message id to look up a workflowRunId
    for this simple case, we have no persistence and just use the message id as the workflowRunId
    you might also consider looking up the workflowRunId in a database and returning the results
    if the message has already been processed '''
    workflowRunId = messageId
    return StreamingResponse(event_stream_generator(workflowRunId), media_type='text/event-stream')

In this step, we create a streaming endpoint that uses the messageId (which is the workflowRunId) to subscribe to the event stream and returns a StreamingResponse with the event data.

View Complete File on GitHub (opens in a new tab)

Next Steps: Consuming Streams in React →