Child Workflows
While workflows in Hatchet implement a DAG, there are many cases where the structure of a workflow isn't known ahead of time. For example, you may have a batch processing workflow where the number of tasks is determined by the input - for example, running a workflow per page in a PDF. In these cases, you can use child workflows to dynamically create new workflows as needed.
Note that spawning child workflows is a durable operation, meaning that spawning a child workflow will persist the state of the parent workflow. This means that if the parent workflow is interrupted, it will pick up the child workflow in the exact same state when it resumes. The index of the child workflow will be used as the default key, but custom keys can be specified if the order of the child workflows can vary.
Looping Workflows
To loop through a list and create a new workflow per element in the list, you can simply use spawnWorkflow
in a loop. For example:
@hatchet.workflow("fanout:create")
class Parent:
@hatchet.step()
def spawn(self, context: Context):
for i in range(10):
context.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
return {}
Note that calling spawnWorkflow
will return immediately, and the child workflows will run in parallel. If you want to wait for child workflows, you can await the results of each workflow (see below).
Scatter/Gather Workflows
To run all child workflows in parallel, and then wait for all of them to complete, you can use the result
method on the returned method.
@hatchet.workflow()
class Parent:
@hatchet.step(timeout="5m")
async def spawn(self, context: Context):
results = []
for i in range(10):
results.append(
(
await context.aio.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
).result()
)
result = await asyncio.gather(*results)
return {"results": result}
@hatchet.workflow()
class Child:
@hatchet.step()
async def process(self, context: Context):
a = context.workflow_input()["a"]
return {"status": "success " + a}
Error Handling
If child workflows fail, an error will be raised to the parent, which can be caught and handled as needed. For example, to spawn a recovery workflow if a child workflow fails:
@hatchet.workflow("fanout:create")
class Parent:
@hatchet.step()
async def spawn(self, context: Context):
try:
(
await context.aio.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
).result()
except Exception as e:
# Spawn a recovery workflow
context.spawn_workflow("recovery-workflow", { "error": str(e) });
return {}