Working with AsyncIO
Hatchet's Python SDK makes heavy use of AsyncIO under the hood to manage persistent connections, listeners, the runners for each workflow, and many other things! AsyncIO is a form of concurrency that makes use of cooperative multitasking (opens in a new tab). This means that when blocking non-async-safe functions are added to the mix, it can cause issues and degrade the performance of the SDK. While using async methods is optional with Hatchet, using async methods incorrectly can cause issues and degrade the performance of the SDK.
Using the sync_to_async
Decorator
To help out with mitigating any issues with blocking that may cause slow downs, you can make use of
the sync_to_async
decorator available through the Hatchet SDK.
- Can be utilized in two ways:
- as a decorator to wrap simple or complex functions you've written
- as an inline wrapper to wrap external library calls that you're unsure about
- Accepts both synchronous and asynchronous functions
- Accepts a
loop
and anexecutor
keyword argument that lets you specify your own loop or executor
Example:
import time
from hatchet_sdk import Context, sync_to_async
from hatchet_sdk.v2.hatchet import Hatchet
hatchet = Hatchet(debug=True)
@sync_to_async # This will now be async safe!
def blocking_function():
time.sleep(5)
return {"type": "sync_blocking"}
@sync_to_async # This will now be async safe!
async def async_blocking_function():
time.sleep(5)
return {"type": "async_blocking"}
@hatchet.function()
async def my_func(context: Context) -> dict:
data = [
await blocking_function(),
await async_blocking_function(),
]
return {
"status": "success",
"data": data,
}
worker = hatchet.worker("worker", max_runs=5)
worker.start()
Inline Wrapped Example:
...
def blocking_function():
time.sleep(5)
return {"type": "sync_blocking"}
@hatchet.function()
async def my_func(context: Context) -> dict:
data = await sync_to_async(blocking_function)()
return {
"status": "success",
"data": data,
}
...
Keyword Arguments Example:
import asyncio
import concurrent
...
executor = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
def blocking_function():
time.sleep(5)
return {"type": "sync_blocking"}
@hatchet.function()
async def my_func(context: Context) -> dict:
data = await sync_to_async(blocking_function)(executor=executor, loop=loop)
return {
"status": "success",
"data": data,
}
...
More Resources for AsyncIO Development
If you're looking for more info on developing with AsyncIO more broadly, we highly recommend the following resources:
- Python's Documentation on Developing with AsyncIO (opens in a new tab)
- Tusamma's Medium post about How AsyncIO works (opens in a new tab)
- Zac Hatfield-Dodds's PyCon 2023 talk on Async: scaling structured concurrency with static and dynamic analysis (opens in a new tab)