Migrating from Celery to Hatchet
Celery is a mature and widely used background task system. This guide assumes you have already decided to migrate a Python project from Celery to Hatchet and want to understand what code and configuration need to change. Each section starts with a common Celery pattern, then shows what replaces it in Hatchet and what to watch out for.
This guide covers common Celery patterns. It does not cover every Celery setting or attempt a full infrastructure migration. For a discussion of Celery’s operational trade-offs, see this blog post.
Migration pattern lookup
Use this table to find the Celery pattern you have in your project and jump to the section that shows the Hatchet replacement. The table is a lookup aid; the sections below explain the migration details, caveats, and code changes.
| Step | Celery | Hatchet replacement | Migration category |
|---|---|---|---|
| 1 | celery[...] + broker dependencies | hatchet-sdk | Dependency change |
| 1 | Celery config / env vars | HATCHET_CLIENT_TOKEN | Operational change |
| 1 | Worker / Beat / Flower processes | Hatchet worker + engine/cloud | Operational change |
| 2 | Celery("app", broker=..., backend=...) | Hatchet() + HATCHET_CLIENT_TOKEN | Operational change |
| 3 | @app.task / @shared_task | @hatchet.task() | Small rewrite |
| 3 | def my_task(arg1, arg2) | def my_task(input: MyInput, ctx: Context) | Small rewrite |
| 4 | task.delay(...) / .apply_async(...) | .run(..., wait_for_result=False) / .aio_run(..., wait_for_result=False) | Small rewrite |
| 5 | celery -A app worker | hatchet worker dev or worker script | Small rewrite |
| 6 | autoretry_for / self.retry() | retries + backoff_factor + NonRetryableException | Small rewrite |
| 6 | time_limit / soft_time_limit | execution_timeout / schedule_timeout | Direct API swap |
| 7 | apply_async(countdown=...) / eta=... | task.aio_schedule(run_at, input) | Small rewrite |
| 7 | beat_schedule + celery beat process | on_crons=["..."] in task definition | Small rewrite |
| 8 | chain(a.s(), b.s()) | DAG workflow with parents=[a] | Conceptual redesign |
| 8 | group(a.s(), b.s()) | Parallel DAG tasks (no parents) | Conceptual redesign |
| 8 | chord(group, callback) | DAG task with multiple parents | Conceptual redesign |
| 9 | Result backend + AsyncResult + Flower | Hatchet run history + dashboard | Operational change |
| 3 | task_serializer (pickle/msgpack) | JSON via Pydantic | Small rewrite |
| 10 | task_routes / queues / routing | Worker registration + worker affinity | Conceptual redesign |
| 10 | Celery signals (task_prerun, etc.) | on_failure_task / on_success_task / ctx.log() | Conceptual redesign |
| 10 | revoke / task cancellation | Cancellation API + dashboard + ctx.is_cancelled | Small rewrite |
| 10 | Task priority | Priority (1-3 levels) | Operational change |
| 10 | task_always_eager / testing | .mock_run() / .aio_mock_run() | Small rewrite |
Step 1: Update dependencies and runtime configuration
Dependencies
Install hatchet-sdk alongside Celery using your chosen package manager. For example:
pip install hatchet-sdkCelery and Hatchet can run side-by-side during a migration. This lets you move Celery tasks to Hatchet one at a time and validate behavior incrementally. Since Celery and Hatchet do not share a workflow runtime, each migrated task or workflow must have a clear boundary. Once a task or workflow is moved to Hatchet, update the application code that enqueues it to call Hatchet instead of Celery.
Configuration
Celery projects configure broker and result backend connections. Hatchet does not have equivalent broker or result-backend settings. Instead, configure the Hatchet SDK so it can connect to the Hatchet engine. For Hatchet Cloud, configure the SDK with an API token:
export HATCHET_CLIENT_TOKEN="your-token-here"For self-hosted or local Hatchet deployments, you may need environment-specific client settings for the Hatchet engine endpoint and TLS configuration.
Celery’s default loader reads settings from a celeryconfig.py module on the Python path. Framework integrations may load the same settings from another source; for example, Django projects commonly load Celery settings from Django settings with a CELERY_ namespace. See the Celery configuration reference for the full list. If your project has an extensive Celery configuration, review each section of this guide to determine which settings need migration and which can be removed.
Process and deployment cleanup
Celery projects often run multiple long-lived processes, including workers, a task scheduler named Celery Beat, and optionally Flower for runtime monitoring. Depending on your environment, these may be managed through init scripts, systemd services, supervisor, launchd, Docker Compose, Kubernetes manifests, a Procfile, or CI/deployment scripts. Celery’s daemonization docs provide guidance on managing these processes and are a useful resource for understanding an existing deployment.
Replace Celery worker and Beat processes with Hatchet worker scripts. Hatchet cron runs are managed by the Hatchet engine, so there is no separate scheduler process to deploy. Hatchet’s dashboard replaces the basic Flower deployment path for runtime visibility.
Broker and result backend services are a separate cleanup step. Do not remove Redis, RabbitMQ, SQS, SQL databases, Cassandra, or other infrastructure just because Celery is being removed. Remove or decommission those services only after confirming they were used exclusively for Celery and are not still used by your application.
Step 2: Replace the Celery app with a Hatchet client
A Celery project usually has a shared Celery app instance that defines the broker, result backend, and task registry. In Hatchet, replace that shared app object with a shared Hatchet client.
Celery:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379", backend="redis://localhost:6379")Hatchet:
from hatchet_sdk import Hatchet
hatchet = Hatchet()The Hatchet() client reads its SDK configuration from HATCHET_CLIENT_* environment variables by default, including HATCHET_CLIENT_TOKEN. Create a Hatchet instance in a shared module and import it wherever your code needs to interact with Hatchet.
For self-hosted or local deployments, you may need additional client configuration such as the Hatchet API endpoint, gRPC host, or TLS settings. These can be supplied through environment variables, and the SDK also supports explicit client configuration when needed.
What changed:
- The shared app object changes. Replace the shared Celery
app = Celery(...)instance with a sharedhatchet = Hatchet()client. - Broker and backend settings move out of the app constructor. Hatchet does not take Celery-style
broker=orbackend=arguments. - Connection settings move to SDK configuration. Use
HATCHET_CLIENT_*environment variables, or explicit client configuration when needed. - The same client is reused across the migration. Import the shared
hatchetclient where you define tasks, workflows, workers, and code that triggers runs.
Step 3: Convert task definitions
Celery tasks accept optional task-specific arguments. In Hatchet, task functions receive one input object and a context object instead of positional args.
Celery:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def process_image(image_url: str, filters: list[str]) -> dict:
result = resize(image_url, filters)
return {"processed_url": result}To migrate the Celery task, define an input model for image_url and filters, change the decorator to @hatchet.task(...), add ctx: Context, and return a serializable task output.
Hatchet:
What changed:
@app.taskbecomes@hatchet.task(...). Useinput_validator=YourModelto validate and type the task input.- Positional arguments move into an input model. Replace
(image_url, filters)withinput: ImageInput, whereImageInputis a PydanticBaseModelwith those fields. - The task receives a context object. Add
ctx: Contextas the second argument for run metadata, retry information, parent task outputs, and logging. - The return value becomes the task output. Return a value Pydantic can serialize, such as a Pydantic model or a dict.
- Tasks can be sync or async. Hatchet tasks can be
deforasync def; the SDK recommends async for I/O-bound work.
If your Celery tasks use positional arguments, *args, or **kwargs, you
will need to define a Pydantic model that captures the expected fields. For
tasks that take no meaningful input, use EmptyModel from hatchet_sdk. This
input-model conversion is the main mechanical cost of the migration.
Serialization
Celery supports serializers such as JSON, pickle, YAML, and msgpack through task_serializer. Hatchet task inputs and outputs are serialized through Pydantic. If your Celery project uses pickle or another non-JSON serializer, make sure your task payloads can be represented in Pydantic’s JSON serialization mode. Values that Pydantic cannot serialize directly should be converted to JSON-compatible fields or handled with custom Pydantic serializers.
Step 4: Invoke tasks with input models
In Celery, a function decorated with @app.task is invoked through Celery’s task calling API. Calls such as .delay(...) pass arguments to the underlying task function, while apply_async(...) passes them through args= and kwargs=. In Hatchet, invoke the task with the input model you defined in Step 3. The same task input model is used whether you wait for the result or enqueue the task and continue.
Celery:
# Fire-and-forget
process_image.delay("https://example.com/photo.png", ["thumbnail"])
# With options
process_image.apply_async(
args=["https://example.com/photo.png", ["thumbnail"]],
)To migrate these calls, replace the task arguments with ImageInput(...) and call .run() or .aio_run() on the Hatchet task. By default, Hatchet waits for the task to finish and returns the typed result directly.
Hatchet:
What changed:
- Task arguments move into the input model. Replace task arguments such as
("https://example.com/photo.png", ["thumbnail"])withImageInput(image_url=..., filters=...). - Invocation methods change. Replace
.delay()/.apply_async()with.run()for sync code or.aio_run()for async code. - The default call waits for the result. Celery’s
.delay()enqueues work and returns anAsyncResultimmediately. Hatchet’s.run()and.aio_run()wait until the task completes and return the typed result directly. - Fire-and-forget uses
wait_for_result=False. To enqueue without waiting, passwait_for_result=False. This returns aTaskRunRefwith the run ID and.result()/.aio_result()methods you can call later. - Delayed execution uses a separate API. Replace Celery
countdownoretawith scheduled runs, covered in Step 7.
Step 5: Run a Hatchet worker
Celery workers are typically started from the CLI or through a process manager:
Celery:
celery -A tasks worker --loglevel=info --concurrency=4In Hatchet, the migration artifact is a Python worker script that explicitly registers the tasks and workflows it can execute:
Hatchet:
During development, start the worker with the Hatchet CLI, which handles authentication and hot reloads on code changes:
hatchet worker devIn production, run the worker script directly with your HATCHET_CLIENT_TOKEN set:
python worker.pyWhat changed:
- The worker is defined in code. Replace Celery’s app-based task discovery with a worker script that registers executable tasks and workflows in the worker’s
workflows=[...]list. - Worker capacity moves to
slots. Replace Celery’s--concurrency=4flag with theslotsparameter on the Hatchet worker. - Startup differs by environment. Use
hatchet worker devduring local development, then run the worker script through your process manager or container runtime in production. - Celery pool settings do not migrate directly. Celery supports pool types such as prefork, eventlet, gevent, and threads. Hatchet Python workers use the SDK’s sync/async execution model with worker slots, so CPU-bound work may need threads, subprocesses, or separate workers.
Step 6: Migrate retries and timeouts
Retries
Celery supports automatic retries for specified exception types and manual retries inside the task body.
Celery:
# Automatic: retry only on RequestError, with exponential backoff
@app.task(bind=True, autoretry_for=(RequestError,),
retry_backoff=True, max_retries=5, retry_backoff_max=60)
def call_api(self, order_id: str) -> dict:
return external_api_call(order_id)
# Manual: explicit retry with custom logic
@app.task(bind=True, max_retries=3)
def call_api_manual(self, order_id: str) -> dict:
try:
return external_api_call(order_id)
except RequestError as exc:
raise self.retry(exc=exc, countdown=30)To migrate automatic retries, move the retry policy onto the Hatchet task decorator. Hatchet retries task failures when retries are configured, so model retryable failures by raising normal exceptions.
Hatchet:
To prevent retries for known permanent failures, raise NonRetryableException from the task body:
from hatchet_sdk.exceptions import NonRetryableException
# Inside a task: skip retry for a permanent failure
if response.status_code == 400:
raise NonRetryableException("Bad request: do not retry")See retry policies for the full set of retry options and behavior.
What changed:
- Retry policy moves onto the task decorator. Replace Celery retry options such as
max_retries,retry_backoff, andretry_backoff_maxwith Hatchet’sretries,backoff_factor, andbackoff_max_seconds. - Review retry exception rules. Celery’s
autoretry_for=(...)lists the exceptions that should trigger retries. In Hatchet, task failures are retryable whenretries > 0; raiseNonRetryableExceptionfor failures that should not retry. - Manual
self.retry()logic needs redesign. Hatchet does not provide a directself.retry()equivalent inside the task body; task-level retry behavior is managed by the Hatchet engine.
Timeouts
Celery:
@app.task(time_limit=30, soft_time_limit=25)
def long_task():
...Celery defaults to no task time limit unless you configure one. Hatchet uses an execution_timeout for how long a task may run and schedule_timeout for how long a task may wait in the queue before being cancelled. For migration purposes, Hatchet’s execution_timeout is the closest match for Celery’s time_limit. Celery’s soft_time_limit has no exact Hatchet equivalent. If your task depends on soft time limits for cleanup, move that cleanup into explicit task logic during migration.
Hatchet:
from datetime import timedelta
@hatchet.task(execution_timeout=timedelta(seconds=30))
async def long_task(input, ctx):
...What changed:
- Hard task limits move to
execution_timeout. Replace Celerytime_limit=30withexecution_timeout=timedelta(seconds=30). - Queue wait time is controlled separately. Hatchet’s
schedule_timeoutcontrols how long a task may wait in the queue before being cancelled. - Soft timeout cleanup must be rewritten. Celery’s
soft_time_limithas no exact Hatchet equivalent, so cleanup that depends on soft-timeout exceptions should become explicit task logic. - Review timeout defaults. Celery task time limits are not enabled unless configured; Hatchet has default timeout behavior, so check the timeout docs before relying on implicit behavior.
Step 7: Migrate delayed and periodic tasks
Delayed execution
In Celery, delayed execution is configured at the call site with countdown or eta on apply_async:
# Run 5 minutes from now
process_image.apply_async(args=["https://example.com/photo.png", ["blur"]], countdown=300)
# Run at a specific time
from datetime import datetime, timedelta, timezone
process_image.apply_async(
args=["https://example.com/photo.png", ["blur"]],
eta=datetime.now(timezone.utc) + timedelta(hours=1),
)Celery’s documentation warns that ETA/countdown tasks remain in worker memory until their scheduled execution time, and recommends using short delays rather than scheduling far into the future.
When migrating those call sites to Hatchet, delayed execution moves to the scheduled run API. Scheduled runs are persisted by the Hatchet engine instead of being held in worker memory, which makes longer delays feasible:
What changed:
countdown/etabecomes scheduled runs. Replaceapply_async(..., countdown=...)orapply_async(..., eta=...)withtask.aio_schedule(run_at, input).- The task input shape stays the same. Scheduled runs use the same
ImageInput(...)model as immediate.aio_run(...)calls. - The worker no longer holds the delay. Hatchet persists the scheduled run in the engine until it is ready to execute.
Periodic tasks
In Celery, recurring schedules are usually defined in beat_schedule and executed by a separate Celery Beat process:
# celeryconfig.py
from celery.schedules import crontab
beat_schedule = {
"daily-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=9, minute=0),
},
}In production, Beat may be run through your process supervisor, container runtime, or directly from the CLI:
celery -A tasks beat # must run exactly one instanceIn Hatchet, cron triggers are declared on the task with the on_crons parameter, which accepts a list of cron expressions managed by the Hatchet engine. When migrating periodic tasks to Hatchet, convert Beat schedules that use Celery’s crontab(...) helper into on_crons entries:
What changed:
- Celery Beat is removed. You do not run a separate
celery beatprocess or ensure that only one Beat instance is active. - The schedule moves onto the task. Replace the
beat_scheduleentry withon_crons=["0 9 * * *"]. - Celery
crontab(...)becomes a cron expression. Use standard 5-field or 6-field cron syntax. - Non-crontab schedules need review. Interval, solar, or custom Beat schedules may not convert directly to a cron expression.
- Runtime schedule management moves to the cron client. If your application creates, lists, or deletes schedules dynamically, use the
hatchet.cronclient.
Step 8: Migrate chains, groups, and chords
This is the biggest conceptual change in the migration. Celery Canvas builds tasks at the call site. In contrast, Hatchet DAG workflows define the dependency graph up front in the workflow definition. Understanding Hatchet’s workflow orchestration model will make the rest of this section easier to follow.
Celery chain to Hatchet DAG
Celery:
from celery import chain
pipeline = chain(
validate.s(order_id),
charge.s(),
fulfill.s(),
notify.s(),
)
pipeline.apply_async()In a Celery chain, each task’s return value is passed as the first argument to the next task. To migrate a Celery chain, turn each task in the chain into a task in the same Hatchet workflow, then express the order of execution with parents. The first task has no parent, the second task depends on the first, and each later task depends on the task that came before it.
Hatchet:
What changed:
- Dependencies are declared on the task, not at the call site.
parents=[validate]means “run aftervalidatefinishes.” - Parent outputs are accessed explicitly via
ctx.task_output(parent_task), not passed as positional arguments. - The workflow is triggered as a unit. Replace
pipeline.apply_async()withawait order_pipeline.aio_run(OrderInput(...)). You do not chain individual task calls.
Celery group to parallel DAG tasks
A Celery group runs multiple tasks in parallel. If the parallel tasks are known when you define the workflow, migrate them to Hatchet as DAG tasks with the same parent, or with no parents if they can start immediately.
Celery:
from celery import group
checks = group(
check_inventory.s(order_id),
check_fraud.s(order_id),
)
checks.apply_async()To migrate this pattern, define both tasks in the same Hatchet workflow without making one depend on the other. If the Celery code also waits for and combines the group results, that is an additional aggregation step, so add a downstream convergence task with the parallel tasks as parents.
Hatchet:
What changed:
- Parallelism moves into the workflow definition. Tasks with no dependency between them can run concurrently.
- The call site starts the workflow, not a
group(...). You trigger the workflow as a unit instead of constructing a Canvas group dynamically. - Aggregation may become a convergence task. If the Celery code waits for group results and combines them, add a downstream task with the parallel tasks as parents.
- Runtime fan-out is a different pattern. If your Celery group is built from a runtime list, such as
group(process.s(item) for item in items), migrate that separately using child spawning.
Dynamic Celery groups to child spawning
Some Celery groups are built from a list that is only known at runtime:
Celery:
from celery import group
items = get_items_for_order(order_id)
result = group(process_item.s(item_id) for item_id in items)()This is different from a static group of known tasks. A Hatchet DAG is defined ahead of time, so it is not the right fit when the number of child tasks depends on runtime input. To migrate this pattern, use child spawning. The parent task receives the list, spawns one child run per item using aio_run_many, and collects the results:
What changed:
- Build the fan-out inside a parent task. Instead of constructing a
group(...)at the call site, the parent task computes the runtime list and callsaio_run_many. - Each item becomes a child run. The child task or workflow receives one item from the runtime list.
- Collect results where you spawn the children. Replace
GroupResulthandling with the results returned to the parent task byaio_run_many.
Celery chord to DAG convergence
A Celery chord runs a group of tasks in parallel and then runs a callback after every task in the group completes. When the parallel tasks are known ahead of time, migrate this pattern to a Hatchet DAG convergence task with multiple parents.
Celery:
from celery import chord
result = chord(
[fetch_a.s(order_id), fetch_b.s(order_id), fetch_c.s(order_id)]
)(aggregate.s())The aggregate callback receives a list of results from the group. To migrate a static chord, make the aggregate task depend on each parallel task and read each parent output explicitly.
Hatchet:
@workflow.task(parents=[fetch_a, fetch_b, fetch_c])
async def aggregate(input, ctx):
a = ctx.task_output(fetch_a)
b = ctx.task_output(fetch_b)
c = ctx.task_output(fetch_c)
...What changed:
- Fan-in is declared with multiple parents. A task with
parents=[fetch_a, fetch_b, fetch_c]runs only after all listed parents complete. - Parent outputs are accessed individually. Instead of receiving a list of group results as a callback argument, the aggregate task calls
ctx.task_output(parent)for each parent. - Runtime-sized chords need child spawning. If your Celery chord is built from a runtime-sized group, such as
chord([fetch.s(url) for url in urls])(aggregate.s()), migrate it with child spawning and explicit result collection rather than a static DAG convergence task.
Step 9: Replace result backend and Flower monitoring
In Celery, result retrieval and task-state inspection depend on a configured result backend. If your application calls AsyncResult.get() or checks AsyncResult.state, migrate those patterns to Hatchet’s result-returning invocation methods, TaskRunRef, or the runs client.
Flower is a separate cleanup step. Since Hatchet records run status, logs, retry attempts, timing, and workflow relationships in the dashboard, Flower is not needed for basic runtime visibility after migration is complete.
Consider the following Celery snippet that enqueues a task, retrieves its result, stores a task ID for later lookup, and checks task state:
Celery:
# Wait for the result
async_result = process_image.delay(
"https://example.com/photo.png",
["thumbnail"],
)
output = async_result.get(timeout=10)
print(output["processed_url"])
# Fire-and-forget, then retrieve later
async_result = process_image.apply_async(
args=["https://example.com/photo.png", ["blur"]],
)
task_id = async_result.id
# Check task state later
state = async_result.state
print(state) # PENDING, SUCCESS, FAILURE, RETRY, or REVOKED
# Retrieve the result when ready
output = async_result.get(timeout=10)
print(output["processed_url"])If your application branches on Celery task states, review that logic during
migration instead of renaming states one-for-one. For example, Celery only
reports STARTED when track_started=True or task_track_started is
configured. In Hatchet, logic that depends on STARTED usually maps to
checking for a RUNNING run status.
When migrating result-handling code to Hatchet, first decide whether the caller needs the result immediately or only needs a run reference. Use .run() / .aio_run() when the caller should wait for the result. Use wait_for_result=False when the caller should enqueue the work and inspect or retrieve the run later.
Hatchet:
What changed:
- Result retrieval is direct by default. Replace
AsyncResult.get()with.run()or.aio_run()when the caller should wait for the task result. - Fire-and-forget returns a run reference. Use
wait_for_result=Falsewhen the caller should enqueue work and continue. The returnedTaskRunRefincludes.workflow_run_idand.result()/.aio_result(). - State inspection moves to the runs client. Replace
AsyncResult.statechecks withhatchet.runs.aio_get_status(workflow_run_id)when application code needs run status. - Flower becomes deployment cleanup. Once the migrated workloads no longer run through Celery, remove the Flower process and use Hatchet’s dashboard for run visibility.
Step 10: Migrate other Celery project surfaces
This section covers Celery features that often appear in production projects but do not have a one-to-one Hatchet equivalent. For each one, identify the Celery behavior your application depends on, then migrate it to the closest Hatchet pattern.
Queues and routing
Celery uses task_routes and named queues to control which workers handle which tasks:
Celery:
# celeryconfig.py
task_routes = {
"tasks.process_image": {"queue": "image-processing"},
"tasks.send_email": {"queue": "notifications"},
}celery -A tasks worker -Q image-processingIn Hatchet, routing is handled by registering specific tasks on each worker. To migrate your Celery routes, remove the queue route for that task and register the migrated Hatchet task on the worker that should run it.
Hatchet:
What changed:
task_routesand queue-bound workers become worker registration. Replace Celery queue routing with Hatchet workers that register the tasks and workflows they can execute inworkflows=[...].- Advanced assignment uses worker labels. If you need weighted routing or capability-based assignment, Hatchet supports worker affinity where tasks declare
desired_worker_labelsand workers advertise capabilities. This replaces exchange-based routing patterns. - This is a redesign, not a rename. Simple queue-per-worker routing becomes explicit worker registration. Complex routing rules should be evaluated against worker affinity during migration.
Signals, hooks, and progress updates
Celery provides signals for task lifecycle events. A common pattern is logging or alerting on failure. Celery also supports self.update_state(state="PROGRESS", meta={...}) for progress reporting within a running task.
Celery:
from celery.signals import task_failure
@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
notify_ops_team(task_id, exception)Migrate Celery signals and progress reporting to Hatchet workflow-level hooks and structured logging. If the signal handles workflow-level success or failure, migrate it to a workflow hook. If it reports progress, move that into the task with ctx.log() or streaming:
Hatchet:
What changed:
task_failure/task_successsignals become workflow hooks.@workflow.on_failure_task()runs after any task in the workflow fails.@workflow.on_success_task()runs after all tasks succeed. These are tasks within the workflow, not global signal handlers.- Human-readable progress moves to logs. Use
ctx.log()to send progress messages visible in the dashboard. - Live progress data moves to streams. For real-time progress data that application code consumes, use
ctx.put_stream()to push data to subscribers. - Per-task
task_prerun/task_postrunsignals have no direct equivalent. If your project uses these for setup or teardown around individual tasks, consider dependency injection. Other cross-cutting behavior may require a small redesign.
Cancellation
Celery cancels tasks with revoke() on an AsyncResult:
Celery:
result = process_image.delay("https://example.com/photo.png", ["blur"])
result.revoke() # cancel if still pending
result.revoke(terminate=True) # also terminate if runningHatchet supports task cancellation through the runs client, the dashboard, or concurrency strategies like CANCEL_IN_PROGRESS. To cancel a run programmatically, use hatchet.runs.cancel(run_id). Running tasks should cooperate by checking ctx.is_cancelled:
Hatchet:
ref = await process_image.aio_run(
ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
wait_for_result=False,
)
# Cancel a run by ID
await hatchet.runs.aio_cancel(ref.workflow_run_id)
# Inside a task: cooperate with cancellation
@hatchet.task()
async def long_running(input, ctx):
for batch in batches:
if ctx.is_cancelled:
return {"status": "cancelled"}
await process_batch(batch)
return {"status": "done"}What changed:
result.revoke()becomeshatchet.runs.cancel(...)/hatchet.runs.aio_cancel(...). Celery cancels via theAsyncResultobject. Hatchet cancels via the runs client or the dashboard using the workflow run ID.- Running tasks must cooperate. Celery’s
revoke(terminate=True)sends a signal to the worker process. Hatchet sets a cancellation flag that the task checks withctx.is_cancelled. Long-running tasks should check the flag so they can stop promptly and run any cleanup logic. - Concurrency strategies can cancel automatically. Hatchet’s
CANCEL_IN_PROGRESSstrategy cancels existing runs when a new run arrives for the same concurrency key. This has no Celery equivalent.
Priority
Celery priority is assigned when the task is enqueued, and the meaning of the numeric priority value depends on the broker.
Celery:
process_image.apply_async(
args=["https://example.com/photo.png", ["blur"]],
priority=9, # broker-dependent scale
)When migrating priority-sensitive task calls to Hatchet, replace broker-specific numeric priorities with Hatchet’s priority enum. Hatchet supports three priority levels: Priority.LOW, Priority.MEDIUM, and Priority.HIGH. You can set priority when triggering a run, or define a default priority on the workflow.
Hatchet:
from hatchet_sdk import Priority
ref = await process_image.aio_run(
ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
wait_for_result=False,
priority=Priority.HIGH,
)What changed:
- Broker-specific numbers become Hatchet priority levels. Replace Celery numeric priorities with
Priority.LOW,Priority.MEDIUM, orPriority.HIGH. - Priority can be set per run or as a workflow default. Use per-run priority when priority changes by call site; use a workflow default when all runs of that workflow should share the same priority.
- Priority is scoped to a workflow type. Higher-priority runs of the same workflow are scheduled before lower-priority runs. Priority does not affect scheduling across different workflow types.
- Priority-sensitive behavior needs validation. Celery broker priority and Hatchet scheduling priority are different models, so validate workflows that depend on precise ordering.
Testing migrated tasks
Celery projects sometimes use task_always_eager for testing, though Celery’s own documentation discourages it for unit tests:
Celery:
task_always_eager = True
result = process_image.delay("https://example.com/photo.png", ["thumbnail"])
assert result.get()["processed_url"] == "https://cdn.example.com/photo.png"Hatchet tasks can be unit-tested without a running engine using .mock_run() (sync) or .aio_mock_run() (async), which execute the task function directly with a mock context:
Hatchet:
What changed:
task_always_eageris replaced by.mock_run()/.aio_mock_run(). These execute the task function locally without connecting to the Hatchet engine, similar to calling the function directly but with a mock context that providesretry_count,additional_metadata, andlifespan.- Integration tests use
.run()/.aio_run(). To test against the full Hatchet engine, call.run()or.aio_run()with a running local or test instance. - See the unit testing example for patterns including sync, async, durable, and workflow-level mock testing.
Migration caveats to review
Before finishing the migration, review these areas that require design decisions rather than mechanical code changes:
- Task function signatures. Every Hatchet task receives
(input, ctx)instead of positional args. Creating Pydantic input models for tasks with varied argument signatures is typically the largest mechanical effort. - Canvas -> DAGs and child spawning. Celery Canvas composition must be restructured into Hatchet DAG workflows or child spawning. This is a design change, not a rename.
- Manual retries. Celery’s
self.retry()has no Hatchet equivalent. Restructure aroundNonRetryableExceptionor handle retries within the task for specific calls. - Lifecycle hooks. Celery signals like
task_prerunandtask_postrunrequire workflow-level hooks, dependency injection, or restructuring in Hatchet. - Queue routing. Celery’s
task_routesand named queues become explicit worker registration and optionally worker affinity (a different routing model).
Hatchet-native features to adopt after migration
Once your tasks are running on Hatchet, these features go beyond Celery parity:
- Global rate limits: Key-based rate limiting enforced across all workers. Celery’s
rate_limitis per-worker only. - Concurrency strategies: Per-key concurrency control with strategies such as cancel-in-progress or cancel-newest. Celery has no per-task concurrency control.
- Durable sleep: Pause a workflow for minutes, hours, or days without holding a worker slot.
- Durable event waits: Pause until an external event arrives. Useful for webhook-driven or human-in-the-loop workflows.
- Durable tasks: Imperative workflow composition with checkpointing for long-running, stateful workflows.
None of these are needed for a basic migration. They become useful when your workloads grow from background tasks into durable workflows.
After the basic migration is working, use async APIs for I/O-bound tasks, lifespans to initialize shared resources once per worker, and dependency injection to pass common dependencies into tasks without wiring them manually in every function.
Before migrating every workload
Migration does not have to be all-or-nothing. Consider keeping specific workloads on Celery if:
- They are simple fire-and-forget tasks that are already reliable and do not need orchestration or observability.
- They depend on Celery ecosystem integrations (django-celery-beat, django-celery-results, or broker-specific features) that would be costly to replace.
- They require extremely high throughput and do not need durable retention, observability, or workflow orchestration. Review Hatchet’s architecture and guarantees before migrating those workloads.
Final cleanup
After the last Celery task is migrated, remove the celery package declaration, including any Celery extras, from your dependency file and regenerate your lock file. Transitive dependencies like kombu and amqp will be removed automatically. Remove broker client packages (such as redis) only if no other code uses them. Delete celeryconfig.py and any Celery-specific environment variables (broker_url, result_backend, etc.) once nothing references them. Remove Celery worker, Beat, and Flower process definitions from your deployment configuration.
Next steps
- Quickstart: set up Hatchet and run your first task
- Tasks: task definition and configuration
- Workers: worker options
- Retry policies: retries and backoff
- Timeouts: execution and scheduling timeouts
- Scheduled runs: delayed execution
- Cron runs: recurring schedules
- DAG workflows: multi-step pipelines
- Python SDK reference: full API reference