# Migrating from Celery to Hatchet

Celery is a mature and widely used background task system. This guide assumes you have already decided to migrate a Python project from Celery to Hatchet and want to understand what code and configuration need to change. Each section starts with a common Celery pattern, then shows what replaces it in Hatchet and what to watch out for.

> **Info:** This guide covers common Celery patterns. It does not cover every Celery
>   setting or attempt a full infrastructure migration. For a discussion of
>   Celery's operational trade-offs, see [this blog
>   post](https://hatchet.run/blog/problems-with-celery).

## Migration pattern lookup

Use this table to find the Celery pattern you have in your project and jump to the section that shows the Hatchet replacement. The table is a lookup aid; the sections below explain the migration details, caveats, and code changes.

Step, Celery, Hatchet replacement, Migration category

[1](#step-1-update-dependencies-and-runtime-configuration), `celery[...]` + broker dependencies, `hatchet-sdk`, Dependency change
[1](#step-1-update-dependencies-and-runtime-configuration), Celery config / env vars, `HATCHET_CLIENT_TOKEN`, Operational change
[1](#step-1-update-dependencies-and-runtime-configuration), Worker / Beat / Flower processes, Hatchet worker + engine/cloud, Operational change
[2](#step-2-replace-the-celery-app-with-a-hatchet-client), `Celery("app", broker=..., backend=...)`, `Hatchet()` + `HATCHET_CLIENT_TOKEN`, Operational change
[3](#step-3-convert-task-definitions), `@app.task` / `@shared_task`, `@hatchet.task()`, Small rewrite
[3](#step-3-convert-task-definitions), `def my_task(arg1, arg2)`, `def my_task(input: MyInput, ctx: Context)`, Small rewrite
[4](#step-4-invoke-tasks-with-input-models), `task.delay(...)` / `.apply_async(...)`, `.run(..., wait_for_result=False)` / `.aio_run(..., wait_for_result=False)`, Small rewrite
[5](#step-5-run-a-hatchet-worker), `celery -A app worker`, `hatchet worker dev` or worker script, Small rewrite
[6](#step-6-migrate-retries-and-timeouts), `autoretry_for` / `self.retry()`, `retries` + `backoff_factor` + `NonRetryableException`, Small rewrite
[6](#step-6-migrate-retries-and-timeouts), `time_limit` / `soft_time_limit`, `execution_timeout` / `schedule_timeout`, Direct API swap
[7](#step-7-migrate-delayed-and-periodic-tasks), `apply_async(countdown=...)` / `eta=...`, `task.aio_schedule(run_at, input)`, Small rewrite
[7](#step-7-migrate-delayed-and-periodic-tasks), `beat_schedule` + `celery beat` process, `on_crons=["..."]` in task definition, Small rewrite
[8](#step-8-migrate-chains-groups-and-chords), `chain(a.s(), b.s())`, DAG workflow with `parents=[a]`, Conceptual redesign
[8](#step-8-migrate-chains-groups-and-chords), `group(a.s(), b.s())`, Parallel DAG tasks (no `parents`), Conceptual redesign
[8](#step-8-migrate-chains-groups-and-chords), `chord(group, callback)`, DAG task with multiple `parents`, Conceptual redesign
[9](#step-9-replace-result-backend-and-flower-monitoring), Result backend + `AsyncResult` + Flower, Hatchet run history + dashboard, Operational change
[3](#step-3-convert-task-definitions), `task_serializer` (pickle/msgpack), JSON via Pydantic, Small rewrite
[10](#step-10-migrate-other-celery-project-surfaces), `task_routes` / queues / routing, Worker registration + [worker affinity](/v1/advanced-assignment/worker-affinity), Conceptual redesign
[10](#step-10-migrate-other-celery-project-surfaces), Celery signals (`task_prerun`, etc.), `on_failure_task` / `on_success_task` / `ctx.log()`, Conceptual redesign
[10](#step-10-migrate-other-celery-project-surfaces), `revoke` / task cancellation, [Cancellation API](/v1/error-handling/cancellation) + dashboard + `ctx.is_cancelled`, Small rewrite
[10](#step-10-migrate-other-celery-project-surfaces), Task priority, [Priority](/v1/priority) (1-3 levels), Operational change
[10](#step-10-migrate-other-celery-project-surfaces), `task_always_eager` / testing, `.mock_run()` / `.aio_mock_run()`, Small rewrite

## Step 1: Update dependencies and runtime configuration

### Dependencies

Install `hatchet-sdk` alongside Celery using your chosen package manager. For example:

```bash
pip install hatchet-sdk
```

Celery and Hatchet can run side-by-side during a migration. This lets you move Celery tasks to Hatchet one at a time and validate behavior incrementally. Since Celery and Hatchet do not share a workflow runtime, each migrated task or workflow must have a clear boundary. Once a task or workflow is moved to Hatchet, update the application code that enqueues it to call Hatchet instead of Celery.

### Configuration

Celery projects configure broker and result backend connections. Hatchet does not have equivalent broker or result-backend settings. Instead, configure the Hatchet SDK so it can connect to the Hatchet engine. For [Hatchet Cloud](https://cloud.onhatchet.run), configure the SDK with an API token:

```bash
export HATCHET_CLIENT_TOKEN="your-token-here"
```

For [self-hosted](/self-hosting) or local Hatchet deployments, you may need environment-specific client settings for the Hatchet engine endpoint and TLS configuration.

Celery's default loader reads settings from a `celeryconfig.py` module on the Python path. Framework integrations may load the same settings from another source; for example, Django projects commonly load Celery settings from Django settings with a `CELERY_` namespace. See the [Celery configuration reference](https://docs.celeryq.dev/en/stable/userguide/configuration.html) for the full list. If your project has an extensive Celery configuration, review each section of this guide to determine which settings need migration and which can be removed.

### Process and deployment cleanup

Celery projects often run multiple long-lived processes, including workers, a task scheduler named Celery Beat, and optionally Flower for runtime monitoring. Depending on your environment, these may be managed through init scripts, systemd services, supervisor, launchd, Docker Compose, Kubernetes manifests, a `Procfile`, or CI/deployment scripts. Celery's [daemonization docs](https://docs.celeryq.dev/en/stable/userguide/daemonizing.html) provide guidance on managing these processes and are a useful resource for understanding an existing deployment.

Replace Celery worker and Beat processes with Hatchet [worker scripts](#step-5-run-a-hatchet-worker). Hatchet cron runs are managed by the Hatchet engine, so there is no separate scheduler process to deploy. Hatchet's dashboard replaces the basic Flower deployment path for runtime visibility.

Broker and result backend services are a separate cleanup step. Do not remove Redis, RabbitMQ, SQS, SQL databases, Cassandra, or other infrastructure just because Celery is being removed. Remove or decommission those services only after confirming they were used exclusively for Celery and are not still used by your application.

## Step 2: Replace the Celery app with a Hatchet client

A Celery project usually has a shared `Celery` app instance that defines the broker, result backend, and task registry. In Hatchet, replace that shared app object with a shared `Hatchet` client.

**Celery:**

```python
from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379", backend="redis://localhost:6379")
```

**Hatchet:**

```python
from hatchet_sdk import Hatchet

hatchet = Hatchet()
```

The `Hatchet()` client reads its SDK configuration from `HATCHET_CLIENT_*` environment variables by default, including `HATCHET_CLIENT_TOKEN`. Create a `Hatchet` instance in a shared module and import it wherever your code needs to interact with Hatchet.

For self-hosted or local deployments, you may need additional client configuration such as the Hatchet API endpoint, gRPC host, or TLS settings. These can be supplied through environment variables, and the SDK also supports explicit [client configuration](/reference/python/client) when needed.

What changed:

- **The shared app object changes.** Replace the shared Celery `app = Celery(...)` instance with a shared `hatchet = Hatchet()` client.
- **Broker and backend settings move out of the app constructor.** Hatchet does not take Celery-style `broker=` or `backend=` arguments.
- **Connection settings move to SDK configuration.** Use `HATCHET_CLIENT_*` environment variables, or explicit client configuration when needed.
- **The same client is reused across the migration.** Import the shared `hatchet` client where you define tasks, workflows, workers, and code that triggers runs.

## Step 3: Convert task definitions

Celery tasks accept optional task-specific arguments. In Hatchet, [task functions](/v1/tasks) receive one input object and a context object instead of positional args.

**Celery:**

```python
from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

@app.task
def process_image(image_url: str, filters: list[str]) -> dict:
    result = resize(image_url, filters)
    return {"processed_url": result}
```

To migrate the Celery task, define an input model for `image_url` and `filters`, change the decorator to `@hatchet.task(...)`, add `ctx: Context`, and return a serializable task output.

**Hatchet:**

```python
@hatchet.task(name="process-image", input_validator=ImageInput)
async def process_image(input: ImageInput, ctx: Context) -> ImageOutput:
    result = await resize(input.image_url, input.filters)
    return ImageOutput(processed_url=result)
```

What changed:

- **`@app.task` becomes `@hatchet.task(...)`.** Use `input_validator=YourModel` to validate and type the task input.
- **Positional arguments move into an input model.** Replace `(image_url, filters)` with `input: ImageInput`, where `ImageInput` is a Pydantic `BaseModel` with those fields.
- **The task receives a context object.** Add `ctx: Context` as the second argument for run metadata, retry information, parent task outputs, and logging.
- **The return value becomes the task output.** Return a value Pydantic can serialize, such as a Pydantic model or a dict.
- **Tasks can be sync or async.** Hatchet tasks can be `def` or `async def`; the SDK [recommends async](/reference/python/asyncio) for I/O-bound work.

> **Warning:** If your Celery tasks use positional arguments, `*args`, or `**kwargs`, you
>   will need to define a Pydantic model that captures the expected fields. For
>   tasks that take no meaningful input, use `EmptyModel` from `hatchet_sdk`. This
>   input-model conversion is the main mechanical cost of the migration.

### Serialization

Celery supports serializers such as JSON, pickle, YAML, and msgpack through `task_serializer`. Hatchet [task inputs and outputs](/v1/tasks#input-and-output) are serialized through Pydantic. If your Celery project uses `pickle` or another non-JSON serializer, make sure your task payloads can be represented in Pydantic's [JSON serialization mode](https://pydantic.dev/docs/validation/latest/concepts/serialization/#json-mode). Values that Pydantic cannot serialize directly should be converted to JSON-compatible fields or handled with custom Pydantic serializers.

## Step 4: Invoke tasks with input models

In Celery, a function decorated with `@app.task` is invoked through Celery's task calling API. Calls such as `.delay(...)` pass arguments to the underlying task function, while `apply_async(...)` passes them through `args=` and `kwargs=`. In Hatchet, invoke the task with the input model you defined in Step 3. The same task input model is used whether you wait for the result or enqueue the task and continue.

**Celery:**

```python
# Fire-and-forget
process_image.delay("https://example.com/photo.png", ["thumbnail"])

# With options
process_image.apply_async(
    args=["https://example.com/photo.png", ["thumbnail"]],
)
```

To migrate these calls, replace the task arguments with `ImageInput(...)` and call `.run()` or `.aio_run()` on the Hatchet task. By default, Hatchet waits for the task to finish and returns the typed result directly.

**Hatchet:**

```python
async def run_image_task() -> None:
    # Wait for the result (default behavior)
    result = await process_image.aio_run(
        ImageInput(image_url="https://example.com/photo.png", filters=["thumbnail"]),
    )
    print(result.processed_url)

    # Fire-and-forget: enqueue without waiting
    ref = await process_image.aio_run(
        ImageInput(image_url="https://example.com/photo.png", filters=["thumbnail"]),
        wait_for_result=False,
    )
    print(ref.workflow_run_id)  # available immediately
    # await ref.aio_result() to retrieve the result later
```

What changed:

- **Task arguments move into the input model.** Replace task arguments such as `("https://example.com/photo.png", ["thumbnail"])` with `ImageInput(image_url=..., filters=...)`.
- **Invocation methods change.** Replace `.delay()` / `.apply_async()` with `.run()` for sync code or `.aio_run()` for async code.
- **The default call waits for the result.** Celery's `.delay()` enqueues work and returns an `AsyncResult` immediately. Hatchet's `.run()` and `.aio_run()` wait until the task completes and return the typed result directly.
- **Fire-and-forget uses `wait_for_result=False`.** To enqueue without waiting, pass `wait_for_result=False`. This returns a `TaskRunRef` with the run ID and `.result()` / `.aio_result()` methods you can call later.
- **Delayed execution uses a separate API.** Replace Celery `countdown` or `eta` with scheduled runs, covered in [Step 7](#step-7-migrate-delayed-and-periodic-tasks).

## Step 5: Run a Hatchet worker

Celery workers are typically started from the CLI or through a process manager:

**Celery:**

```bash
celery -A tasks worker --loglevel=info --concurrency=4
```

In Hatchet, the migration artifact is a Python worker script that explicitly registers the tasks and workflows it can execute:

**Hatchet:**

```python
def start_worker() -> None:
    worker = hatchet.worker("image-worker", slots=4, workflows=[process_image])
    worker.start()
```

During development, start the worker with the [Hatchet CLI](/reference/cli/running-workers-locally), which handles authentication and hot reloads on code changes:

```bash
hatchet worker dev
```

In production, run the worker script directly with your `HATCHET_CLIENT_TOKEN` set:

```bash
python worker.py
```

What changed:

- **The worker is defined in code.** Replace Celery's app-based task discovery with a worker script that registers executable tasks and workflows in the worker's `workflows=[...]` list.
- **Worker capacity moves to `slots`.** Replace Celery's `--concurrency=4` flag with the `slots` parameter on the Hatchet worker.
- **Startup differs by environment.** Use `hatchet worker dev` during local development, then run the worker script through your process manager or container runtime in production.
- **Celery pool settings do not migrate directly.** Celery supports pool types such as prefork, eventlet, gevent, and threads. Hatchet Python workers use the SDK's sync/async execution model with worker slots, so CPU-bound work may need threads, subprocesses, or separate workers.

## Step 6: Migrate retries and timeouts

### Retries

Celery supports automatic retries for specified exception types and manual retries inside the task body.

**Celery:**

```python
# Automatic: retry only on RequestError, with exponential backoff
@app.task(bind=True, autoretry_for=(RequestError,),
          retry_backoff=True, max_retries=5, retry_backoff_max=60)
def call_api(self, order_id: str) -> dict:
    return external_api_call(order_id)

# Manual: explicit retry with custom logic
@app.task(bind=True, max_retries=3)
def call_api_manual(self, order_id: str) -> dict:
    try:
        return external_api_call(order_id)
    except RequestError as exc:
        raise self.retry(exc=exc, countdown=30)
```

To migrate automatic retries, move the retry policy onto the Hatchet task decorator. Hatchet retries task failures when retries are configured, so model retryable failures by raising normal exceptions.

**Hatchet:**

```python
@hatchet.task(
    name="call-api",
    retries=5,
    backoff_factor=2.0,
    backoff_max_seconds=60,
    execution_timeout=timedelta(seconds=30),
    input_validator=OrderInput,
)
async def call_api(input: OrderInput, ctx: Context) -> dict[str, str]:
    result = await external_api_call(input.order_id)
    return {"status": result}
```

To prevent retries for known permanent failures, raise `NonRetryableException` from the task body:

```python
from hatchet_sdk.exceptions import NonRetryableException

# Inside a task: skip retry for a permanent failure
if response.status_code == 400:
    raise NonRetryableException("Bad request: do not retry")
```

See [retry policies](/v1/error-handling/retry-policies) for the full set of retry options and behavior.

What changed:

- **Retry policy moves onto the task decorator.** Replace Celery retry options such as `max_retries`, `retry_backoff`, and `retry_backoff_max` with Hatchet's `retries`, `backoff_factor`, and `backoff_max_seconds`.
- **Review retry exception rules.** Celery's `autoretry_for=(...)` lists the exceptions that should trigger retries. In Hatchet, task failures are retryable when `retries > 0`; raise `NonRetryableException` for failures that should not retry.
- **Manual `self.retry()` logic needs redesign.** Hatchet does not provide a direct `self.retry()` equivalent inside the task body; task-level retry behavior is managed by the Hatchet engine.

### Timeouts

**Celery:**

```python
@app.task(time_limit=30, soft_time_limit=25)
def long_task():
    ...
```

Celery defaults to no task time limit unless you configure one. Hatchet uses an [`execution_timeout`](/v1/error-handling/timeouts) for how long a task may run and `schedule_timeout` for how long a task may wait in the queue before being cancelled. For migration purposes, Hatchet's `execution_timeout` is the closest match for Celery's `time_limit`. Celery's `soft_time_limit` has no exact Hatchet equivalent. If your task depends on soft time limits for cleanup, move that cleanup into explicit task logic during migration.

**Hatchet:**

```python
from datetime import timedelta

@hatchet.task(execution_timeout=timedelta(seconds=30))
async def long_task(input, ctx):
    ...
```

What changed:

- **Hard task limits move to `execution_timeout`.** Replace Celery `time_limit=30` with `execution_timeout=timedelta(seconds=30)`.
- **Queue wait time is controlled separately.** Hatchet's `schedule_timeout` controls how long a task may wait in the queue before being cancelled.
- **Soft timeout cleanup must be rewritten.** Celery's `soft_time_limit` has no exact Hatchet equivalent, so cleanup that depends on soft-timeout exceptions should become explicit task logic.
- **Review timeout defaults.** Celery task time limits are not enabled unless configured; Hatchet has default timeout behavior, so check the timeout docs before relying on implicit behavior.

## Step 7: Migrate delayed and periodic tasks

### Delayed execution

In Celery, delayed execution is configured at the call site with `countdown` or `eta` on `apply_async`:

```python
# Run 5 minutes from now
process_image.apply_async(args=["https://example.com/photo.png", ["blur"]], countdown=300)

# Run at a specific time
from datetime import datetime, timedelta, timezone
process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
    eta=datetime.now(timezone.utc) + timedelta(hours=1),
)
```

> **Warning:** Celery's documentation
>   [warns](https://docs.celeryq.dev/en/stable/userguide/calling.html#eta-and-countdown)
>   that ETA/countdown tasks remain in worker memory until their scheduled
>   execution time, and recommends using short delays rather than scheduling far
>   into the future.

When migrating those call sites to Hatchet, delayed execution moves to the scheduled run API. Scheduled runs are persisted by the Hatchet engine instead of being held in worker memory, which makes longer delays feasible:

```python
async def schedule_for_later() -> None:
    from datetime import datetime, timezone

    run_at = datetime.now(tz=timezone.utc) + timedelta(hours=1)
    await process_image.aio_schedule(
        run_at,
        ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
    )
```

What changed:

- **`countdown` / `eta` becomes scheduled runs.** Replace `apply_async(..., countdown=...)` or `apply_async(..., eta=...)` with `task.aio_schedule(run_at, input)`.
- **The task input shape stays the same.** Scheduled runs use the same `ImageInput(...)` model as immediate `.aio_run(...)` calls.
- **The worker no longer holds the delay.** Hatchet persists the scheduled run in the engine until it is ready to execute.

### Periodic tasks

In Celery, recurring schedules are usually defined in `beat_schedule` and executed by a separate Celery Beat process:

```python
# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    "daily-report": {
        "task": "tasks.generate_report",
        "schedule": crontab(hour=9, minute=0),
    },
}
```

In production, Beat may be run through your process supervisor, container runtime, or directly from the CLI:

```bash
celery -A tasks beat  # must run exactly one instance
```

In Hatchet, [cron triggers](/v1/cron-runs) are declared on the task with the `on_crons` parameter, which accepts a list of cron expressions managed by the Hatchet engine. When migrating periodic tasks to Hatchet, convert Beat schedules that use Celery's `crontab(...)` helper into `on_crons` entries:

```python
@hatchet.task(name="DailyReport", on_crons=["0 9 * * *"])
async def generate_report(input: EmptyModel, ctx: Context) -> dict[str, str]:
    await build_report()
    return {"status": "sent"}
```

What changed:

- **Celery Beat is removed.** You do not run a separate `celery beat` process or ensure that only one Beat instance is active.
- **The schedule moves onto the task.** Replace the `beat_schedule` entry with `on_crons=["0 9 * * *"]`.
- **Celery `crontab(...)` becomes a cron expression.** Use standard 5-field or 6-field cron syntax.
- **Non-crontab schedules need review.** Interval, solar, or custom Beat schedules may not convert directly to a cron expression.
- **Runtime schedule management moves to the cron client.** If your application creates, lists, or deletes schedules dynamically, use the [`hatchet.cron` client](/reference/python/feature-clients/cron).

## Step 8: Migrate chains, groups, and chords

This is the biggest conceptual change in the migration. Celery Canvas builds tasks at the _call site_. In contrast, Hatchet DAG workflows define the dependency graph up front in the workflow definition. Understanding Hatchet's [workflow orchestration model](/cookbooks/durable-tasks-vs-dags) will make the rest of this section easier to follow.

### Celery chain to Hatchet DAG

**Celery:**

```python
from celery import chain

pipeline = chain(
    validate.s(order_id),
    charge.s(),
    fulfill.s(),
    notify.s(),
)
pipeline.apply_async()
```

In a Celery chain, each task's return value is passed as the first argument to the next task. To migrate a Celery chain, turn each task in the chain into a task in the same Hatchet workflow, then express the order of execution with parents. The first task has no parent, the second task depends on the first, and each later task depends on the task that came before it.

**Hatchet:**

```python
order_pipeline = hatchet.workflow(name="OrderPipeline", input_validator=OrderInput)


@order_pipeline.task(execution_timeout=timedelta(seconds=30))
async def validate(input: OrderInput, ctx: Context) -> OrderValidated:
    ok = await check_inventory(input.order_id)
    return OrderValidated(order_id=input.order_id, valid=ok)


@order_pipeline.task(parents=[validate])
async def charge(input: OrderInput, ctx: Context) -> ChargeResult:
    parent = ctx.task_output(validate)
    cid = await process_charge(parent.order_id)
    return ChargeResult(order_id=input.order_id, charge_id=cid)


@order_pipeline.task(parents=[charge])
async def fulfill(input: OrderInput, ctx: Context) -> FulfillResult:
    parent = ctx.task_output(charge)
    tracking = await ship_order(parent.order_id)
    return FulfillResult(order_id=input.order_id, tracking_number=tracking)


@order_pipeline.task(parents=[fulfill])
async def notify(input: OrderInput, ctx: Context) -> NotifyResult:
    parent = ctx.task_output(fulfill)
    await send_notification(parent.order_id)
    return NotifyResult(order_id=input.order_id, notified=True)
```

What changed:

- **Dependencies are declared on the task**, not at the call site. `parents=[validate]` means "run after `validate` finishes."
- **Parent outputs are accessed explicitly** via `ctx.task_output(parent_task)`, not passed as positional arguments.
- **The workflow is triggered as a unit.** Replace `pipeline.apply_async()` with `await order_pipeline.aio_run(OrderInput(...))`. You do not chain individual task calls.

### Celery group to parallel DAG tasks

A Celery `group` runs multiple tasks in parallel. If the parallel tasks are known when you define the workflow, migrate them to Hatchet as DAG tasks with the same parent, or with no parents if they can start immediately.

**Celery:**

```python
from celery import group

checks = group(
    check_inventory.s(order_id),
    check_fraud.s(order_id),
)
checks.apply_async()
```

To migrate this pattern, define both tasks in the same Hatchet workflow without making one depend on the other. If the Celery code also waits for and combines the group results, that is an additional aggregation step, so add a downstream convergence task with the parallel tasks as parents.

**Hatchet:**

```python
order_checks = hatchet.workflow(name="OrderChecks", input_validator=OrderInput)


@order_checks.task()
async def check_inventory_task(input: OrderInput, ctx: Context) -> CheckResult:
    ok = await check_inventory(input.order_id)
    return CheckResult(passed=ok)


@order_checks.task()
async def check_fraud(input: OrderInput, ctx: Context) -> CheckResult:
    ok = await run_fraud_check(input.order_id)
    return CheckResult(passed=ok)
```

What changed:

- **Parallelism moves into the workflow definition.** Tasks with no dependency between them can run concurrently.
- **The call site starts the workflow, not a `group(...)`.** You trigger the workflow as a unit instead of constructing a Canvas group dynamically.
- **Aggregation may become a convergence task.** If the Celery code waits for group results and combines them, add a downstream task with the parallel tasks as parents.
- **Runtime fan-out is a different pattern.** If your Celery group is built from a runtime list, such as `group(process.s(item) for item in items)`, migrate that separately using [child spawning](/v1/child-spawning).

### Dynamic Celery groups to child spawning

Some Celery groups are built from a list that is only known at runtime:

**Celery:**

```python
from celery import group

items = get_items_for_order(order_id)
result = group(process_item.s(item_id) for item_id in items)()
```

This is different from a static group of known tasks. A Hatchet DAG is defined ahead of time, so it is not the right fit when the number of child tasks depends on runtime input. To migrate this pattern, use [child spawning](/v1/child-spawning). The parent task receives the list, spawns one child run per item using `aio_run_many`, and collects the results:

```python
@hatchet.task(name="process-item", input_validator=ItemInput)
async def process_item(input: ItemInput, ctx: Context) -> ItemResult:
    result = await do_work(input.item_id)
    return ItemResult(item_id=input.item_id, status=result)


@hatchet.task(name="fan-out-items", input_validator=OrderInput)
async def fan_out_items(input: OrderInput, ctx: Context) -> dict[str, list[ItemResult]]:
    items = await get_items_for_order(input.order_id)

    results = await process_item.aio_run_many(
        [
            process_item.create_bulk_run_item(input=ItemInput(item_id=item_id))
            for item_id in items
        ],
    )

    return {"results": results}
```

What changed:

- **Build the fan-out inside a parent task.** Instead of constructing a `group(...)` at the call site, the parent task computes the runtime list and calls `aio_run_many`.
- **Each item becomes a child run.** The child task or workflow receives one item from the runtime list.
- **Collect results where you spawn the children.** Replace `GroupResult` handling with the results returned to the parent task by `aio_run_many`.

### Celery chord to DAG convergence

A Celery `chord` runs a group of tasks in parallel and then runs a callback after every task in the group completes. When the parallel tasks are known ahead of time, migrate this pattern to a Hatchet DAG convergence task with multiple parents.

**Celery:**

```python
from celery import chord

result = chord(
    [fetch_a.s(order_id), fetch_b.s(order_id), fetch_c.s(order_id)]
)(aggregate.s())
```

The `aggregate` callback receives a list of results from the group. To migrate a static chord, make the aggregate task depend on each parallel task and read each parent output explicitly.

**Hatchet:**

```python
@workflow.task(parents=[fetch_a, fetch_b, fetch_c])
async def aggregate(input, ctx):
    a = ctx.task_output(fetch_a)
    b = ctx.task_output(fetch_b)
    c = ctx.task_output(fetch_c)
    ...
```

What changed:

- **Fan-in is declared with multiple parents.** A task with `parents=[fetch_a, fetch_b, fetch_c]` runs only after all listed parents complete.
- **Parent outputs are accessed individually.** Instead of receiving a list of group results as a callback argument, the aggregate task calls `ctx.task_output(parent)` for each parent.
- **Runtime-sized chords need child spawning.** If your Celery chord is built from a runtime-sized group, such as `chord([fetch.s(url) for url in urls])(aggregate.s())`, migrate it with child spawning and explicit result collection rather than a static DAG convergence task.

## Step 9: Replace result backend and Flower monitoring

In Celery, result retrieval and task-state inspection depend on a configured result backend. If your application calls `AsyncResult.get()` or checks `AsyncResult.state`, migrate those patterns to Hatchet's result-returning invocation methods, `TaskRunRef`, or the [runs client](/reference/python/feature-clients/runs).

Flower is a separate cleanup step. Since Hatchet records run status, logs, retry attempts, timing, and workflow relationships in the dashboard, Flower is not needed for basic runtime visibility after migration is complete.

Consider the following Celery snippet that enqueues a task, retrieves its result, stores a task ID for later lookup, and checks task state:

**Celery:**

```python
# Wait for the result
async_result = process_image.delay(
    "https://example.com/photo.png",
    ["thumbnail"],
)
output = async_result.get(timeout=10)
print(output["processed_url"])

# Fire-and-forget, then retrieve later
async_result = process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
)
task_id = async_result.id

# Check task state later
state = async_result.state
print(state)  # PENDING, SUCCESS, FAILURE, RETRY, or REVOKED

# Retrieve the result when ready
output = async_result.get(timeout=10)
print(output["processed_url"])
```

> **Info:** If your application branches on Celery task states, review that logic during
>   migration instead of renaming states one-for-one. For example, Celery only
>   reports `STARTED` when `track_started=True` or `task_track_started` is
>   configured. In Hatchet, logic that depends on `STARTED` usually maps to
>   checking for a `RUNNING` run status.

When migrating result-handling code to Hatchet, first decide whether the caller needs the result immediately or only needs a run reference. Use `.run()` / `.aio_run()` when the caller should wait for the result. Use `wait_for_result=False` when the caller should enqueue the work and inspect or retrieve the run later.

**Hatchet:**

```python
async def result_handling_example() -> None:
    # Wait for the result directly (replaces AsyncResult.get())
    result = await process_image.aio_run(
        ImageInput(image_url="https://example.com/photo.png", filters=["thumbnail"]),
    )
    print(result.processed_url)

    # Fire-and-forget, then retrieve later (replaces AsyncResult pattern)
    ref = await process_image.aio_run(
        ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
        wait_for_result=False,
    )
    run_id = ref.workflow_run_id  # available immediately

    # Check run status (replaces AsyncResult.state)
    status = await hatchet.runs.aio_get_status(run_id)
    print(status)  # QUEUED, RUNNING, COMPLETED, FAILED, or CANCELLED

    # Retrieve the result when ready
    result = await ref.aio_result()
    print(result.processed_url)
```

What changed:

- **Result retrieval is direct by default.** Replace `AsyncResult.get()` with `.run()` or `.aio_run()` when the caller should wait for the task result.
- **Fire-and-forget returns a run reference.** Use `wait_for_result=False` when the caller should enqueue work and continue. The returned `TaskRunRef` includes `.workflow_run_id` and `.result()` / `.aio_result()`.
- **State inspection moves to the runs client.** Replace `AsyncResult.state` checks with `hatchet.runs.aio_get_status(workflow_run_id)` when application code needs run status.
- **Flower becomes deployment cleanup.** Once the migrated workloads no longer run through Celery, remove the Flower process and use Hatchet's dashboard for run visibility.

## Step 10: Migrate other Celery project surfaces

This section covers Celery features that often appear in production projects but do not have a one-to-one Hatchet equivalent. For each one, identify the Celery behavior your application depends on, then migrate it to the closest Hatchet pattern.

### Queues and routing

Celery uses `task_routes` and named queues to control which workers handle which tasks:

**Celery:**

```python
# celeryconfig.py
task_routes = {
    "tasks.process_image": {"queue": "image-processing"},
    "tasks.send_email": {"queue": "notifications"},
}
```

```bash
celery -A tasks worker -Q image-processing
```

In Hatchet, routing is handled by registering specific tasks on each worker. To migrate your Celery routes, remove the queue route for that task and register the migrated Hatchet task on the worker that should run it.

**Hatchet:**

```python
def start_image_worker() -> None:
    """Register only image-processing tasks on this worker."""
    worker = hatchet.worker(
        "image-processing-worker",
        slots=4,
        workflows=[process_image],
    )
    worker.start()
```

What changed:

- **`task_routes` and queue-bound workers become worker registration.** Replace Celery queue routing with Hatchet workers that register the tasks and workflows they can execute in `workflows=[...]`.
- **Advanced assignment uses worker labels.** If you need weighted routing or capability-based assignment, Hatchet supports [worker affinity](/v1/advanced-assignment/worker-affinity) where tasks declare `desired_worker_labels` and workers advertise capabilities. This replaces exchange-based routing patterns.
- **This is a redesign, not a rename.** Simple queue-per-worker routing becomes explicit worker registration. Complex routing rules should be evaluated against worker affinity during migration.

### Signals, hooks, and progress updates

Celery provides [signals](https://docs.celeryq.dev/en/stable/userguide/signals.html) for task lifecycle events. A common pattern is logging or alerting on failure. Celery also supports `self.update_state(state="PROGRESS", meta={...})` for progress reporting within a running task.

**Celery:**

```python
from celery.signals import task_failure

@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
    notify_ops_team(task_id, exception)
```

Migrate Celery signals and progress reporting to Hatchet workflow-level hooks and structured logging. If the signal handles workflow-level success or failure, migrate it to a workflow hook. If it reports progress, move that into the task with `ctx.log()` or streaming:

**Hatchet:**

```python
hook_example = hatchet.workflow(name="HookExample", input_validator=OrderInput)


@hook_example.task()
async def process_order(input: OrderInput, ctx: Context) -> dict[str, str]:
    ctx.log(f"Processing order {input.order_id}")
    await process_charge(input.order_id)
    ctx.log(f"Order {input.order_id} charged")
    return {"status": "charged"}


@hook_example.on_failure_task()
async def on_order_failure(input: OrderInput, ctx: Context) -> None:
    ctx.log(f"Order {input.order_id} failed, notifying support")


@hook_example.on_success_task()
async def on_order_success(input: OrderInput, ctx: Context) -> None:
    ctx.log(f"Order {input.order_id} completed successfully")
```

What changed:

- **`task_failure` / `task_success` signals become workflow hooks.** `@workflow.on_failure_task()` runs after any task in the workflow fails. `@workflow.on_success_task()` runs after all tasks succeed. These are tasks within the workflow, not global signal handlers.
- **Human-readable progress moves to logs.** Use [`ctx.log()`](/v1/logging) to send progress messages visible in the dashboard.
- **Live progress data moves to streams.** For real-time progress data that application code consumes, use [`ctx.put_stream()`](/v1/streaming) to push data to subscribers.
- **Per-task `task_prerun` / `task_postrun` signals have no direct equivalent.** If your project uses these for setup or teardown around individual tasks, consider [dependency injection](/reference/python/dependency-injection). Other cross-cutting behavior may require a small redesign.

### Cancellation

Celery cancels tasks with `revoke()` on an `AsyncResult`:

**Celery:**

```python
result = process_image.delay("https://example.com/photo.png", ["blur"])
result.revoke()                    # cancel if still pending
result.revoke(terminate=True)      # also terminate if running
```

Hatchet supports [task cancellation](/v1/error-handling/cancellation) through the runs client, the dashboard, or concurrency strategies like `CANCEL_IN_PROGRESS`. To cancel a run programmatically, use `hatchet.runs.cancel(run_id)`. Running tasks should cooperate by checking `ctx.is_cancelled`:

**Hatchet:**

```python
ref = await process_image.aio_run(
    ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
    wait_for_result=False,
)

# Cancel a run by ID
await hatchet.runs.aio_cancel(ref.workflow_run_id)

# Inside a task: cooperate with cancellation
@hatchet.task()
async def long_running(input, ctx):
    for batch in batches:
        if ctx.is_cancelled:
            return {"status": "cancelled"}
        await process_batch(batch)
    return {"status": "done"}
```

What changed:

- **`result.revoke()` becomes `hatchet.runs.cancel(...)` / `hatchet.runs.aio_cancel(...)`.** Celery cancels via the `AsyncResult` object. Hatchet cancels via the [runs client](/reference/python/feature-clients/runs) or the dashboard using the workflow run ID.
- **Running tasks must cooperate.** Celery's `revoke(terminate=True)` sends a signal to the worker process. Hatchet sets a cancellation flag that the task checks with `ctx.is_cancelled`. Long-running tasks should check the flag so they can stop promptly and run any cleanup logic.
- **Concurrency strategies can cancel automatically.** Hatchet's `CANCEL_IN_PROGRESS` strategy cancels existing runs when a new run arrives for the same concurrency key. This has no Celery equivalent.

### Priority

Celery priority is assigned when the task is enqueued, and the meaning of the numeric priority value depends on the broker.

**Celery:**

```python
process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
    priority=9,  # broker-dependent scale
)
```

When migrating priority-sensitive task calls to Hatchet, replace broker-specific numeric priorities with Hatchet's priority enum. Hatchet supports [three priority levels](/v1/priority): `Priority.LOW`, `Priority.MEDIUM`, and `Priority.HIGH`. You can set priority when triggering a run, or define a default priority on the workflow.

**Hatchet:**

```python
from hatchet_sdk import Priority

ref = await process_image.aio_run(
    ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
    wait_for_result=False,
    priority=Priority.HIGH,
)
```

What changed:

- **Broker-specific numbers become Hatchet priority levels.** Replace Celery numeric priorities with `Priority.LOW`, `Priority.MEDIUM`, or `Priority.HIGH`.
- **Priority can be set per run or as a workflow default.** Use per-run priority when priority changes by call site; use a workflow default when all runs of that workflow should share the same priority.
- **Priority is scoped to a workflow type.** Higher-priority runs of the same workflow are scheduled before lower-priority runs. Priority does not affect scheduling across different workflow types.
- **Priority-sensitive behavior needs validation.** Celery broker priority and Hatchet scheduling priority are different models, so validate workflows that depend on precise ordering.

### Testing migrated tasks

Celery projects sometimes use `task_always_eager` for testing, though Celery's own documentation [discourages it](https://docs.celeryq.dev/en/stable/userguide/testing.html) for unit tests:

**Celery:**

```python
task_always_eager = True

result = process_image.delay("https://example.com/photo.png", ["thumbnail"])
assert result.get()["processed_url"] == "https://cdn.example.com/photo.png"
```

Hatchet tasks can be unit-tested without a running engine using `.mock_run()` (sync) or `.aio_mock_run()` (async), which execute the task function directly with a mock context:

**Hatchet:**

```python
async def test_process_image() -> None:
    result = await process_image.aio_mock_run(
        input=ImageInput(
            image_url="https://example.com/photo.png",
            filters=["thumbnail"],
        ),
    )
    assert result.processed_url == "https://cdn.example.com/photo.png"
```

What changed:

- **`task_always_eager` is replaced by `.mock_run()` / `.aio_mock_run()`.** These execute the task function locally without connecting to the Hatchet engine, similar to calling the function directly but with a mock context that provides `retry_count`, `additional_metadata`, and `lifespan`.
- **Integration tests use `.run()` / `.aio_run()`.** To test against the full Hatchet engine, call `.run()` or `.aio_run()` with a running local or test instance.
- **See the [unit testing example](https://github.com/hatchet-dev/hatchet/tree/main/sdks/python/examples/unit_testing)** for patterns including sync, async, durable, and workflow-level mock testing.

## Migration caveats to review

Before finishing the migration, review these areas that require design decisions rather than mechanical code changes:

- **Task function signatures.** Every Hatchet task receives `(input, ctx)` instead of positional args. Creating Pydantic input models for tasks with varied argument signatures is typically the largest mechanical effort.
- **Canvas -> DAGs and child spawning.** Celery Canvas composition must be restructured into Hatchet DAG workflows or child spawning. This is a design change, not a rename.
- **Manual retries.** Celery's `self.retry()` has no Hatchet equivalent. Restructure around `NonRetryableException` or handle retries within the task for specific calls.
- **Lifecycle hooks.** Celery signals like `task_prerun` and `task_postrun` require workflow-level hooks, dependency injection, or restructuring in Hatchet.
- **Queue routing.** Celery's `task_routes` and named queues become explicit worker registration and optionally worker affinity (a different routing model).

## Hatchet-native features to adopt after migration

Once your tasks are running on Hatchet, these features go beyond Celery parity:

- **[Global rate limits](/v1/rate-limits):** Key-based rate limiting enforced across all workers. Celery's `rate_limit` is per-worker only.
- **[Concurrency strategies](/v1/concurrency):** Per-key concurrency control with strategies such as cancel-in-progress or cancel-newest. Celery has no per-task concurrency control.
- **[Durable sleep](/v1/durable-sleep):** Pause a workflow for minutes, hours, or days without holding a worker slot.
- **[Durable event waits](/v1/durable-event-waits):** Pause until an external event arrives. Useful for webhook-driven or human-in-the-loop workflows.
- **[Durable tasks](/v1/durable-tasks):** Imperative workflow composition with checkpointing for long-running, stateful workflows.

None of these are needed for a basic migration. They become useful when your workloads grow from background tasks into durable workflows.

After the basic migration is working, use [async APIs](/reference/python/asyncio) for I/O-bound tasks, [lifespans](/reference/python/lifespans) to initialize shared resources once per worker, and [dependency injection](/reference/python/dependency-injection) to pass common dependencies into tasks without wiring them manually in every function.

## Before migrating every workload

Migration does not have to be all-or-nothing. Consider keeping specific workloads on Celery if:

- They are simple fire-and-forget tasks that are already reliable and do not need orchestration or observability.
- They depend on Celery ecosystem integrations (django-celery-beat, django-celery-results, or broker-specific features) that would be costly to replace.
- They require extremely high throughput and do not need durable retention, observability, or workflow orchestration. Review Hatchet's [architecture and guarantees](/v1/architecture-and-guarantees#good-fit-for) before migrating those workloads.

## Final cleanup

After the last Celery task is migrated, remove the `celery` package declaration, including any Celery extras, from your dependency file and regenerate your lock file. Transitive dependencies like `kombu` and `amqp` will be removed automatically. Remove broker client packages (such as `redis`) only if no other code uses them. Delete `celeryconfig.py` and any Celery-specific environment variables (`broker_url`, `result_backend`, etc.) once nothing references them. Remove Celery worker, Beat, and Flower process definitions from your deployment configuration.

## Next steps

- [Quickstart](/v1/quickstart): set up Hatchet and run your first task
- [Tasks](/v1/tasks): task definition and configuration
- [Workers](/v1/workers): worker options
- [Retry policies](/v1/error-handling/retry-policies): retries and backoff
- [Timeouts](/v1/error-handling/timeouts): execution and scheduling timeouts
- [Scheduled runs](/v1/scheduled-runs): delayed execution
- [Cron runs](/v1/cron-runs): recurring schedules
- [DAG workflows](/v1/directed-acyclic-graphs): multi-step pipelines
- [Python SDK reference](/reference/python/client): full API reference
