Managed Compute in Python
This feature is currently in beta and may be subject to change.
Overview
Hatchet Managed Compute lets you define and manage compute resources directly in your Python code. This guide shows you how to configure compute instances, create workflows, and manage workers using the Hatchet Python SDK.
This guide assumes you are already familiar with the basics of Hatchet and have a local workflow running using Docker. If you are not in this state, please see the Getting Started Guide and Docker Guide.
Basic Configuration
Compute Configuration
The Compute
class allows you to define resource requirements for your workload directly in your Python code. You can define multiple compute configurations to use in your workflows on a step-by-step basis. This allows you to easily optimize your compute resources for different parts of your workflow.
from hatchet_sdk.compute import Compute
# Define a default compute configuration
default_compute = Compute(
cpu_kind="shared", # CPU type: "shared" or "performance"
cpus=2, # Number of CPU cores
memory_mb=1024, # Memory allocation in MB
num_replicas=2, # Number of instances
regions=["ewr"] # Deployment regions
)
# Define a basic compute configuration
basic = Compute(
cpu_kind="shared",
cpus=1,
memory_mb=1024,
num_replicas=1,
regions=["ewr"]
)
For a full list of configuration options, see the Compute API documentation.
GPU Support
GPU compute has limited region support and constraints. See the GPU docs for more information.
Hatchet Managed Compute supports GPU instances. You can define GPU compute configurations in the same way as CPU configurations, but with the addition of the gpu
parameter.
gpu_compute = Compute(
cpu_kind="shared",
gpu_kind="a100",
gpus=1,
cpus=1,
memory_mb=1024,
num_replicas=1,
regions=["ewr"],
)
For a full list of configuration options, see the Compute API documentation.
Defining Compute Requirements in Workflows
Workflow Definition
Use compute configuration can then be passed to the step
decorator to define compute requirements for each step in your workflow.
from hatchet_sdk import Context, Hatchet
hatchet = Hatchet()
@hatchet.workflow(on_events=["user:create"])
class ManagedWorkflow:
@hatchet.step(
timeout="11s",
retries=3,
compute=default_compute
)
def step1(self, context: Context):
print("executed step1")
time.sleep(10)
return {
"step1": "step1",
}
@hatchet.step(
timeout="11s",
retries=3,
compute=basic
)
def step2(self, context: Context):
print("executed step2")
time.sleep(10)
return {
"step2": "step2",
}
Worker Management
Setting Up Workers
Configure and start workers to execute your workflows as you would normally do with any other Hatchet worker:
def main():
# Create workflow instance
workflow = ManagedWorkflow()
# Initialize worker with max runs limit
worker = hatchet.worker("test-worker", max_runs=1)
# Register workflow with worker
worker.register_workflow(workflow)
# Start the worker
worker.start()
A complete example of a workflow that uses managed compute can be found here (opens in a new tab).