We use cookies

We use cookies to ensure you get the best experience on our website. For more information on how we use cookies, please see our cookie policy.

By clicking "Accept", you agree to our use of cookies.
Learn more.

User GuideSDK Improvements

SDK Improvements in V1

The Hatchet SDKs have seen considerable improvements with the V1 release.

🪓

The examples in our documentation now use the V1 SDKs, so following individual examples will help you get familiar with the new SDKs and understand how to migrate from V0.

Highlights

The Python SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as in the migration guide, on the Pydantic page, an in various examples. Here, we’ll list out each of them, along with their motivations and benefits.

First and foremost: Many of the changes in the V1 Python SDK are motivated by improved support for type checking and validation across large codebases and in production use-cases. With that in mind, the main highlights in the V1 Python SDK are:

  1. Workflows are now declared with hatchet.workflow, which returns a Workflow object, or hatchet.task (for simple cases) which returns a Standalone object. Workflows then have their corresponding tasks registered with Workflow.task. The Workflow object (and the Standalone object) can be reused easily across the codebase, and has wrapper methods like run and schedule that make it easy to run workflows. In these wrapper methods, inputs to the workflow are type checked, and you no longer need to specify the name of the workflow to run as a magic string.
  2. Tasks have their inputs type checked, and inputs are now Pydantic models. The input field is either the model you provide to the workflow as the input_validator, or is an EmptyModel, which is a helper Pydantic model Hatchet provides and uses as a default.
  3. In the new SDK, we define the parents of a task as a list of Task objects as opposed to as a list of strings. This also allows us to use ctx.task_output(my_task) to access the output of the my_task task in the a downstream task, while allowing that output to be type checked correctly.
  4. In the new SDK, inputs are injected directly into the task as the first positional argument, so the signature of a task now will be Callable[[YourWorkflowInputType, Context]]. This replaces the old method of accessing workflow inputs via context.workflow_input().

Other Breaking Changes

There have been a number of other breaking changes throughout the SDK in V1.

Typing improvements:

  1. External-facing protobuf objects, such as StickyStrategy and ConcurrencyLimitStrategy, have been replaced by native Python enums to make working with them easier.
  2. All external-facing types that are used for triggering workflows, scheduling workflows, etc. are now Pydantic objects, as opposed to being TypedDicts.
  3. The return type of each Task is restricted to a JSONSerializableMapping or a Pydantic model, to better align with what the Hatchet Engine expects.
  4. The ClientConfig now uses Pydantic Settings, and we’ve removed the static methods on the Client for from_environment and from_config in favor of passing configuration in correctly. See the configuration example for more details.
  5. The REST API wrappers, which previously were under hatchet.rest, have been completely overhauled.

Naming changes:

  1. We no longer have nested aio clients for async methods. Instead, async methods throughout the entire SDK are prefixed by aio_, similar to Langchain’s use of the a prefix to indicate async. For example, to run a workflow, you may now either use workflow.run() or workflow.aio_run().
  2. All functions on Hatchet clients are now verbs. For instance, if something was named hatchet.nounVerb before, it now will be something more like hatchet.verb_noun. For example, hatchet.runs.get_result gets the result of a workflow run.
  3. timeout, the execution timeout of a task, has been renamed to execution_timeout for clarity.

Removals:

  1. sync_to_async has been removed. We recommend reading our asyncio documentation for our recommendations on handling blocking work in otherwise async tasks.
  2. The AdminClient has been removed, and refactored into individual clients. For example, if you absolutely need to create a workflow run manually without using Workflow.run or Standalone.run, you can use hatchet.runs.create. This replaces the old hatchet.admin.run_workflow.

Other miscellaneous changes:

  1. As shown in the Pydantic example above, there is no longer a spawn_workflow(s) method on the Context. run is now the preferred method for spawning workflows, which will automatically propagate the parent’s metadata to the child workflow.
  2. All times and durations, such as execution_timeout and schedule_timeout, now allow datetime.timedelta objects instead of only allowing strings (e.g. "10s" can be timedelta(seconds=10)).

Other New features

There are a handful of other new features that will make interfacing with the SDK easier, which are listed below.

  1. Concurrency keys using the input to a workflow are now checked for validity at runtime. If the workflow’s input_validator does not contain a field that’s used in a key, Hatchet will reject the workflow when it’s created. For example, if the key is input.user_id, the input_validator Pydantic model must contain a user_id field.
  2. There is now an on_success_task on the Workflow object, which works just like an on-failure task, but it runs after all upstream tasks in the workflow have succeeded.
  3. We’ve exposed feature clients on the Hatchet client to make it easier to interact with and control your environment.

For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them.

hatchet = Hatchet()
 
workflows = hatchet.workflows.list()
 
assert workflows.rows
 
workflow = workflows.rows[0]
 
workflow_runs = hatchet.runs.list(workflow_ids=[workflow.metadata.id])
 
workflow_run_ids = [workflow_run.metadata.id for workflow_run in workflow_runs.rows]
 
bulk_cancel_by_ids = BulkCancelReplayOpts(ids=workflow_run_ids)
 
hatchet.runs.bulk_cancel(bulk_cancel_by_ids)
 
bulk_cancel_by_filters = BulkCancelReplayOpts(
    filters=RunFilter(
        since=datetime.today() - timedelta(days=1),
        until=datetime.now(),
        statuses=[V1TaskStatus.RUNNING],
        workflow_ids=[workflow.metadata.id],
        additional_metadata={"key": "value"},
    )
)
 
hatchet.runs.bulk_cancel(bulk_cancel_by_filters)

The hatchet client also has clients for workflows (declarations), schedules, crons, metrics (i.e. queue depth), events, and workers.