---
title: "Process S3 Objects at Scale: a Parallel Pipeline | Hatchet"
description: "Build a parallel pipeline to process S3 objects at scale with Hatchet — fan-out, per-bucket concurrency, idempotent processing. Python, TypeScript, Go, Ruby."
---

import { Callout, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
import { snippets } from "@/lib/generated/snippets";
import { Snippet } from "@/components/code";
import AmazonS3ImagePipelineDiagram from "@/components/AmazonS3ImagePipelineDiagram";

# Processing S3 Objects at Scale with Hatchet

## Introduction

Amazon S3 is a reliable-and-boring distributed object store with strong [read-after-write consistency](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel), making it an ideal backend for processing massive datasets at scale. However, coordinating workloads that actually leverage S3's scale (without accruing a comically large cloud bill) is non-trivial.

This guide walks through building an _embarrassingly parallel_ Hatchet pipeline for ingesting and processing hundreds of thousands of images stored across multiple regionally bound S3 buckets. This approach will be making extensive use of Hatchet's [concurrency control](/v1/concurrency) features, paired with aggressive fan-outs via [dynamic child spawning](/v1/child-spawning#fan-out-spawning-many-children-in-parallel), to parallelize the heck out of a data processing workflow while ensuring fair allocation of resources and idempotent processing of objects.

<figure style={{ margin: "2rem auto", maxWidth: "400px", textAlign: "center" }}>
  <img
    src="/s3-workflow/spider-man-workers.png"
    alt="Three workers all pointing at each other, each claiming to have processed the same S3 object"
    style={{ width: "100%", height: "auto", borderRadius: "8px" }}
  />
  <figcaption
    style={{
      marginTop: "0.75rem",
      fontSize: "0.875rem",
      fontStyle: "italic",
      opacity: 0.7,
    }}
  >
    Without concurrency control, nothing stops multiple workers all grabbing,
    and processing, the same object.
  </figcaption>
</figure>

### Scenario

For the purposes of this guide, we'll constrain the system as follows:

- Each bucket corresponds to a separate class of objects.
- Bucket sizes are unevenly distributed.
- Objects are ephemeral. Once successfully processed, an object should be deleted.
- Data can be added or removed at any point, with a sizable chunk being front-loaded at startup.

## Setup

### AWS Credentials

This pipeline requires [AWS access keys](https://docs.aws.amazon.com/sdkref/latest/guide/feature-static-credentials.html) configured with the following IAM permissions:

Action, Resource, Reason

`s3:ListAllMyBuckets`, `*`, Paginate across buckets in the account.
`s3:ListBucket`, `arn:aws:s3:::<your-bucket-prefix>-*`, Paginate object keys within each bucket.
`s3:GetObject`, `arn:aws:s3:::<your-bucket-prefix>-*/*`, Download each object for processing.
`s3:DeleteObject`, `arn:aws:s3:::<your-bucket-prefix>-*/*`, Remove the object after processing.

_Note: `<your-bucket-prefix>` should be scoped at runtime to the specific bucket prefix you are targeting._

These can be set via environment variables, which should be automatically picked up by the SDK client:

#### Python

```typescript
const s3 = new S3Client({ forcePathStyle: true });
```

#### Typescript

```python
s3 = boto3.client("s3")
```

## Workflows

To ensure maximum parallelism, we treat the processing of each object within each bucket as an independent unit of work. Concretely, the execution sequence is:

1. Fetch buckets from S3.
2. Fetch objects within each bucket.
3. Process each object.
4. Delete each object.

We implement this as a _double fan-out_: one fan-out across buckets to spawn per-bucket polling tasks, and a second fan-out within each polling task to spawn one processing task per object. This lets us distribute both the listing and processing work across a horizontally scaled fleet of workers without any single worker becoming a bottleneck.

```mermaid
flowchart LR
    A("<b>S3::ListBuckets</b><br/><i>cron trigger</i>")
    B("<b>S3::ListObjects</b><br/><i>bucket-0</i>")
    C("<b>S3::ListObjects</b><br/><i>bucket-N</i>")
    D("<b>Process</b><br/><i>bucket-0/object-0.png</i>")
    E("<b>Process</b><br/><i>bucket-0/object-N.png</i>")
    A -.-> B
    A -.-> C
    B -.-> D
    B -.-> E
    style B stroke:#a855f7
    style C stroke:#a855f7
    style D stroke:#14b8a6
    style E stroke:#14b8a6
```

### Define the Models

Start by defining the data models for our workflow inputs.

#### Python

```typescript
type ListObjectsInput = {
  bucket: string;
};

type ProcessObjectInput = {
  bucket: string;
  key: string;
};
```

#### Typescript

```python
class ListObjectsInput(BaseModel):
    bucket: str


class ProcessObjectInput(BaseModel):
    bucket: str
    key: str
```

Notice that we only have models defined for our input data, as the task naturally concludes upon object deletion.

### Fetch Buckets

We start by fetching all relevant buckets from S3. For simplicity, this pipeline forgoes [S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html) in favor of periodic scanning via a [cron trigger](/v1/cron-runs).

As the root of the pipeline, this task paginates across all buckets in our region, fanning out a child workflow per bucket. Since we're just iterating across mini-batches/pages of buckets, there's no meaningful work here to parallelize.

However, our strategy should still guard against _overlapping executions_ of the same cron job. If S3 throttles us mid-scan and a run takes longer than the cron interval, the next tick should be rejected until the in-flight run finishes, rather than piling on more concurrent listing calls and making the throttling worse.

```mermaid
flowchart LR
    A("<b>Fetch Buckets</b><br/><i>cron trigger</i>")
    B("bucket-0")
    C("bucket-1")
    D[...]
    E("bucket-N")
    A -.-> B
    A -.-> C
    A ~~~ D
    A -.-> E
    style D fill:none,stroke:none
    style B stroke:#a855f7
    style C stroke:#a855f7
    style E stroke:#a855f7
```

#### Python

```typescript
const fetchBucketsWorkflow = hatchet.workflow({
  name: 'fetch_s3_buckets',
  on: {
    cron: '* * * * *',
  },
  concurrency: {
    expression: "'singleton'",
    maxRuns: 1,
    limitStrategy: ConcurrencyLimitStrategy.CANCEL_NEWEST,
  },
});
```

#### Typescript

```python
fetch_buckets_workflow = hatchet.workflow(
    name="fetch_s3_buckets",
    on_crons=["* * * * *"],
    concurrency=ConcurrencyExpression(
        expression="'singleton'",
        max_runs=1,
        limit_strategy=ConcurrencyLimitStrategy.CANCEL_NEWEST,
    ),
)
```

Here, we set the concurrency key to the CEL expression `'singleton'`, a string literal that evaluates to the same value for every run. Combined with `max_runs=1` and `CANCEL_NEWEST`, this rule funnels all runs through **a single slot on a single worker**: while one is in-flight, any new run triggered by the cron is immediately canceled.

The task body itself is responsible for gathering all relevant buckets from our region and spawning a new `fetch_objects` task from the `fetch_s3_objects` workflow for each:

#### Python

```typescript
fetchBucketsWorkflow.task({
  name: 'fetch_buckets',
  fn: async () => {
    for await (const page of paginateListBuckets(
      { client: s3, pageSize: 10 },
      { Prefix: BUCKET_PREFIX }
    )) {
      const items = (page.Buckets ?? [])
        .filter((bucket): bucket is { Name: string } => bucket.Name !== undefined)
        .map((bucket) => ({
          input: { bucket: bucket.Name },
          opts: {
            childKey: bucket.Name,
            additionalMetadata: { 'bucket-name': bucket.Name },
          },
        }));

      if (items.length > 0) {
        await fetchObjectsWorkflow.runManyNoWait(items);
      }
    }

    return {};
  },
});
```

#### Typescript

```python
@fetch_buckets_workflow.task()
async def fetch_buckets(input: EmptyModel, ctx: Context) -> dict[str, Any]:
    paginator = s3.get_paginator("list_buckets")
    pages = paginator.paginate(
        Prefix=BUCKET_PREFIX,
        PaginationConfig={"PageSize": 10},
    )

    for page in pages:
        items = [
            fetch_objects_workflow.create_bulk_run_item(
                input=ListObjectsInput(bucket=b["Name"]),
                child_key=b["Name"],
                additional_metadata={"bucket-name": b["Name"]},
            )
            for b in page.get("Buckets", [])
        ]
        if items:
            await fetch_objects_workflow.aio_run_many(items, wait_for_result=False)

    return {}
```

We use the bucket name as its child key for each spawn to ensure deduplication of downstream `fetch_objects` runs.

### Fetch Objects

Next, we define a workflow responsible for polling a given S3 bucket for objects, before fanning out and assigning each object a processing task.

```mermaid
flowchart LR
    A("<b>List objects</b><br/><i>input.bucket</i>")
    B("Download &<br/>process")
    C("Download &<br/>process")
    D[...]
    E("Download &<br/>process")
    F("Download &<br/>process")
    A -.-> B
    A -.-> C
    A -.-> F
    A ~~~ D
    A -.-> E
    style D fill:none,stroke:none
    classDef parent stroke:#3b82f6,stroke-width:2px
    classDef leaf stroke:#a855f7,stroke-width:2px
    class A parent
    class B,C,E,F leaf
```

Similarly to [Fetch Buckets](#fetch-buckets), this workflow needs to ensure each bucket has at most one poller in-flight at a time, preventing overlaps via the `CANCEL_NEWEST` strategy and constraining to a single run. Here, however, we group by bucket name using the CEL expression `input.bucket` to enforce per-bucket concurrency.

Additionally, we define a global concurrency expression that limits how many bucket pollers can spin up simultaneously. Tasks exceeding this rule are enqueued using the `GROUP_ROUND_ROBIN` strategy and will execute when concurrent slots free up.

#### Python

```typescript
const fetchObjectsWorkflow = hatchet.workflow({
  name: 'fetch_s3_objects',
  concurrency: [
    {
      expression: 'input.bucket',
      maxRuns: 1,
      limitStrategy: ConcurrencyLimitStrategy.CANCEL_NEWEST,
    },
    {
      expression: "'constant'",
      maxRuns: MAX_CONCURRENT_BUCKET_POLLERS,
      limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
    },
  ],
});
```

#### Typescript

```python
fetch_objects_workflow = hatchet.workflow(
    name="fetch_s3_objects",
    input_validator=ListObjectsInput,
    concurrency=[
        ConcurrencyExpression(
            expression="input.bucket",
            max_runs=1,
            limit_strategy=ConcurrencyLimitStrategy.CANCEL_NEWEST,
        ),
        ConcurrencyExpression.from_int(MAX_CONCURRENT_BUCKET_POLLERS),
    ],
)
```

These two `ConcurrencyExpression` entries compose to ensure a **single poller per bucket** paired with a **workflow-level limitation** on total in-flight pollers.

The task itself paginates through the bucket's objects and spawns a processing run per object:

#### Python

```typescript
fetchObjectsWorkflow.task({
  name: 'fetch_objects',
  fn: async (input: ListObjectsInput) => {
    for await (const page of paginateListObjectsV2(
      { client: s3, pageSize: 100 },
      { Bucket: input.bucket }
    )) {
      const items = (page.Contents ?? [])
        .filter((obj): obj is { Key: string } => obj.Key !== undefined)
        .map((obj) => ({
          input: { bucket: input.bucket, key: obj.Key },
          opts: {
            childKey: `${input.bucket}/${obj.Key}`,
          },
        }));

      if (items.length > 0) {
        await processObjectWorkflow.runManyNoWait(items);
      }
    }

    return {};
  },
});
```

#### Typescript

```python
@fetch_objects_workflow.task()
async def fetch_objects(input: ListObjectsInput, ctx: Context) -> dict[str, Any]:
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(
        Bucket=input.bucket,
        PaginationConfig={"PageSize": 100},
    )

    for page in pages:
        items = [
            process_object_workflow.create_bulk_run_item(
                input=ProcessObjectInput(bucket=input.bucket, key=obj["Key"]),
                child_key=f"{input.bucket}/{obj['Key']}",
            )
            for obj in page.get("Contents", [])
        ]
        if items:
            await process_object_workflow.aio_run_many(items, wait_for_result=False)

    return {}
```

The child key here combines the bucket name and object key `({bucket}/{key})`, guaranteeing that re-triggering the parent won't spawn duplicate processing runs for the same object. S3 object keys are only unique within a specific bucket, so the bucket prefix is mandatory here.

### Process Objects

Lastly, we define the processing workflow, which represents the bulk of the pipeline's actual computational work. Recall from the previous sections that:

1. The distribution of objects per bucket is not uniform.
2. Processed objects are immediately deleted.

Rather than capping concurrency globally, we scope the limit _per bucket_ using `S3_WORKER_MAX_RUNS_PER_BUCKET`. A global cap would allow the largest buckets to monopolize all processing slots. A per-bucket cap keeps throughput fair across the system regardless of individual bucket size. When a bucket hits its limit, additional runs are enqueued and dispatched round-robin via `GROUP_ROUND_ROBIN` as slots open.


The diagram above shows this steady state: work interleaves across buckets, the worker pool runs at partial capacity, and a backlog drains in round-robin order. Even when `bucket-1` has the most pending objects, it doesn't monopolize the workers.

There is no child-side deduplication on this workflow (no concurrency expression keyed on `{bucket}/{object-key}`). Because `process-object` runs are only ever spawned by [`fetch_objects`](#fetch-objects) (which already handles deduplication via child key at spawn time) a secondary concurrency expression would be redundant.

#### Python

```typescript
const processObjectWorkflow = hatchet.workflow({
  name: 'process_object',
  concurrency: {
    expression: 'input.bucket',
    maxRuns: MAX_RUNS_PER_BUCKET,
    limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
  },
});
```

#### Typescript

```python
process_object_workflow = hatchet.workflow(
    name="process_object",
    input_validator=ProcessObjectInput,
    concurrency=ConcurrencyExpression(
        expression="input.bucket",
        max_runs=MAX_RUNS_PER_BUCKET,
        limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
    ),
)
```

The final step is to download the object, process it, and safely delete it from the source bucket:

#### Python

```typescript
processObjectWorkflow.task({
  name: 'process_object',
  fn: async (input: ProcessObjectInput, ctx) => {
    let body: Buffer;

    try {
      const response = await s3.send(
        new GetObjectCommand({ Bucket: input.bucket, Key: input.key })
      );
      if (!response.Body) {
        return {};
      }
      body = Buffer.from(await response.Body.transformToByteArray());
    } catch (err) {
      if (err instanceof NoSuchKey || err instanceof NoSuchBucket) {
        await ctx.log(`skipping ${input.bucket}/${input.key}: not found`);
        return {};
      }
      throw err;
    }

    // TODO: actual image processing here

    await s3.send(new DeleteObjectCommand({ Bucket: input.bucket, Key: input.key }));
    return {};
  },
});
```

#### Typescript

```python
@process_object_workflow.task()
def process_object(input: ProcessObjectInput, ctx: Context) -> dict[str, Any]:
    buf = BytesIO()
    try:
        s3.download_fileobj(input.bucket, input.key, buf)
    except s3.exceptions.ClientError as e:
        if (code := e.response["Error"]["Code"]) in {
            "NoSuchKey",
            "NoSuchBucket",
            "404",
        }:
            ctx.log(f"skipping {input.bucket}/{input.key}: not found ({code})")
            return {}
        raise

    buf.seek(0)
    # TODO: actual image processing here

    s3.delete_object(Bucket=input.bucket, Key=input.key)
    return {}
```

This architecture provides an idempotent, effectively-exactly-once processing pipeline via two core mechanisms:

1. **Idempotent runs:** Each `download_and_process` task fetches, processes, then deletes the target object. If the object does not exist, the `NoSuchKey` / `NoSuchBucket` (404) branch catches it and returns a no-op. S3's strong read-after-write consistency ensures that a "not found" response is a reliable completion signal.
2. **Spawn-time deduplication:** Concurrent duplicate runs for the exact same `(bucket, object)` are actively prevented by the parent's `child_key` constraint, meaning the 404 no-op path operates purely as a safety net rather than the norm.

## Register and start the worker

You can start the worker, which will begin polling all S3 buckets prefixed with `S3_WORKER_BUCKET_PREFIX` as follows:

#### Python

From `sdks/python/`:

```bash
S3_WORKER_BUCKET_PREFIX="bucket-" \
S3_WORKER_MAX_CONCURRENT_BUCKET_POLLERS=10 \
S3_WORKER_MAX_RUNS_PER_BUCKET=20 \
S3_WORKER_SLOTS=40 \
poetry run python -m examples.aws.s3.worker
```

#### Typescript

From `sdks/typescript/`:

```bash
S3_WORKER_BUCKET_PREFIX="bucket-" \
S3_WORKER_MAX_CONCURRENT_BUCKET_POLLERS=10 \
S3_WORKER_MAX_RUNS_PER_BUCKET=20 \
S3_WORKER_SLOTS=40 \
npx ts-node src/v1/examples/aws/s3/worker.ts
```

> **Info:** **Running Worker Locally**
>
> Start `localstack/localstack:4.14` using the provided `docker-compose.yaml`:
>
> ```bash
> docker compose -f examples/aws/s3/docker-compose.yml up -d --wait
> ```
>
> Then set the following environment variables before running:
>
> ```bash
> AWS_ENDPOINT_URL=http://localhost:4566
> AWS_ACCESS_KEY_ID=test
> AWS_SECRET_ACCESS_KEY=test
> AWS_REGION=us-east-1
> ```

## Testing

#### Python

Run the Python worker test suite to exercise the end-to-end workflow, with startup and teardown managed by `pytest` fixtures. This will:

- Spin up a LocalStack instance via `docker compose`.
- Seed it with S3 data.
- Start the worker, which will initiate polling.
- Validate that every seeded object is processed and deleted.

```bash
pytest examples/aws/s3/test_worker.py
```

#### Typescript

There is no automated test suite for the TypeScript worker. Use the steps above to run it manually against LocalStack.
