SDK Reference
Python SDK
Working with AsyncIO

Working with AsyncIO

Hatchet's Python SDK makes heavy use of AsyncIO under the hood to manage persistent connections, listeners, the runners for each workflow, and many other things! AsyncIO is a form of concurrency that makes use of cooperative multitasking (opens in a new tab). This means that when blocking non-async-safe functions are added to the mix, it can cause issues and degrade the performance of the SDK. While using async methods is optional with Hatchet, using async methods incorrectly can cause issues and degrade the performance of the SDK.

Using the sync_to_async Decorator

To help out with mitigating any issues with blocking that may cause slow downs, you can make use of the sync_to_async decorator available through the Hatchet SDK.

  • Can be utilized in two ways:
    • as a decorator to wrap simple or complex functions you've written
    • as an inline wrapper to wrap external library calls that you're unsure about
  • Accepts both synchronous and asynchronous functions
  • Accepts a loop and an executor keyword argument that lets you specify your own loop or executor

Example:

import time
 
from hatchet_sdk import Context, sync_to_async
from hatchet_sdk.v2.hatchet import Hatchet
 
hatchet = Hatchet(debug=True)
 
 
@sync_to_async  # This will now be async safe!
def blocking_function():
    time.sleep(5)
    return {"type": "sync_blocking"}
 
 
@sync_to_async  # This will now be async safe!
async def async_blocking_function():
    time.sleep(5)
    return {"type": "async_blocking"}
 
 
@hatchet.function()
async def my_func(context: Context) -> dict:
    data = [
        await blocking_function(),
        await async_blocking_function(),
    ]
    return {
        "status": "success",
        "data": data,
    }
 
 
worker = hatchet.worker("worker", max_runs=5)
 
worker.start()

Inline Wrapped Example:

...
def blocking_function():
    time.sleep(5)
    return {"type": "sync_blocking"}
 
 
@hatchet.function()
async def my_func(context: Context) -> dict:
    data = await sync_to_async(blocking_function)()
    return {
        "status": "success",
        "data": data,
    }
...

Keyword Arguments Example:

import asyncio
import concurrent
...
 
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
 
 
def blocking_function():
    time.sleep(5)
    return {"type": "sync_blocking"}
 
 
@hatchet.function()
async def my_func(context: Context) -> dict:
 
    data = await sync_to_async(blocking_function)(executor=executor, loop=loop)
    return {
        "status": "success",
        "data": data,
    }
...

More Resources for AsyncIO Development

If you're looking for more info on developing with AsyncIO more broadly, we highly recommend the following resources: