Procedural Child Task Spawning
Hatchet supports the dynamic creation of child tasks during a parent task’s execution. This powerful feature enables:
- Complex, reusable task hierarchies - Break down complex tasks into simpler, reusable components
- Fan-out parallelism - Scale out to multiple parallel tasks dynamically
- Dynamic task behavior - Create loops and conditional branches at runtime
- Agent-based tasks - Support AI agents that can create new tasks based on analysis results or loop until a condition is met
Creating Parent and Child Tasks
To implement child task spawning, you first need to create both parent and child task definitions.
First, we’ll declare a couple of tasks for the parent and child:
We also created a step on the parent task that spawns the child tasks. Now, we’ll add a couple of steps to the child task:
And that’s it! The fanout parent will run and spawn the child, and then will collect the results from its steps.
Running Child Tasks
To spawn and run a child task from a parent task, use the appropriate method for your language:
# Inside a parent task
child_result = child_task.run(child_input)
Parallel Child Task Execution
As shown in the examples above, you can spawn multiple child tasks in parallel:
# Run multiple child workflows concurrently with asyncio
import asyncio
async def run_child_workflows(n: int) -> list[dict[str, Any]]:
return await child.aio_run_many([
child.create_bulk_run_item(
options=TriggerWorkflowOptions(
input=ChildInput(n=i),
)
)
for i in range(n)
])
# In your parent task
child_results = await run_child_workflows(input.n)
Use Cases for Child Workflows
Child workflows are ideal for:
- Dynamic fan-out processing - When the number of parallel tasks is determined at runtime
- Reusable workflow components - Create modular workflows that can be reused across different parent workflows
- Resource-intensive operations - Spread computation across multiple workers
- Agent-based systems - Allow AI agents to spawn new workflows based on their reasoning
- Long-running operations - Break down long operations into smaller, trackable units of work
Error Handling with Child Workflows
When working with child workflows, it’s important to properly handle errors. Here are patterns for different languages:
try:
child_result = child.run(ChildInput(a="foobar"))
except Exception as e:
# Handle error from child workflow
print(f"Child workflow failed: {e}")
# Decide how to proceed - retry, skip, or fail the parent