Context
The Hatchet Context class provides helper methods and useful data to tasks at runtime. It is passed as the second argument to all tasks and durable tasks.
There are two types of context classes you’ll encounter:
Context
: The standard context for regular tasks with methods for logging, task output retrieval, cancellation, and more.DurableContext
: An extended context for durable tasks that includes additional methods for durable execution likeaio_wait_for
andaio_sleep_for
.
Context
Methods
Name | Description |
---|---|
was_skipped | Check if a given task was skipped. You can read about skipping in the docs. |
task_output | Get the output of a parent task in a DAG. |
cancel | Cancel the current task run. This will call the Hatchet API to cancel the step run and set the exit flag to True. |
aio_cancel | Cancel the current task run. This will call the Hatchet API to cancel the step run and set the exit flag to True. |
done | Check if the current task run has been cancelled. |
log | Log a line to the Hatchet API. This will send the log line to the Hatchet API and return immediately. |
release_slot | Manually release the slot for the current step run to free up a slot on the worker. Note that this is an advanced feature and should be used with caution. |
put_stream | Put a stream event to the Hatchet API. This will send the data to the Hatchet API and return immediately. You can then subscribe to the stream from a separate consumer. |
refresh_timeout | Refresh the timeout for the current task run. You can read about refreshing timeouts in the docs. |
fetch_task_run_error | A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run. |
Attributes
was_triggered_by_event
A property that indicates whether the workflow was triggered by an event.
Returns:
Type | Description |
---|---|
bool | True if the workflow was triggered by an event, False otherwise. |
workflow_input
The input to the workflow, as a dictionary. It’s recommended to use the input
parameter to the task (the first argument passed into the task at runtime) instead of this property.
Returns:
Type | Description |
---|---|
JSONSerializableMapping | The input to the workflow. |
lifespan
The worker lifespan, if it exists. You can read about lifespans in the docs.
Note: You’ll need to cast the return type of this property to the type returned by your lifespan generator.
workflow_run_id
The id of the current workflow run.
Returns:
Type | Description |
---|---|
str | The id of the current workflow run. |
retry_count
The retry count of the current task run, which corresponds to the number of times the task has been retried.
Returns:
Type | Description |
---|---|
int | The retry count of the current task run. |
attempt_number
The attempt number of the current task run, which corresponds to the number of times the task has been attempted, including the initial attempt. This is one more than the retry count.
Returns:
Type | Description |
---|---|
int | The attempt number of the current task run. |
additional_metadata
The additional metadata sent with the current task run.
Returns:
Type | Description |
---|---|
JSONSerializableMapping | None | The additional metadata sent with the current task run, or None if no additional metadata was sent. |
parent_workflow_run_id
The parent workflow run id of the current task run, if it exists. This is useful for knowing which workflow run spawned this run as a child.
Returns:
Type | Description |
---|---|
str | None | The parent workflow run id of the current task run, or None if it does not exist. |
priority
The priority that the current task was run with.
Returns:
Type | Description |
---|---|
int | None | The priority of the current task run, or None if no priority was set. |
workflow_id
The id of the workflow that this task belongs to.
Returns:
Type | Description |
---|---|
str | None | The id of the workflow that this task belongs to. |
workflow_version_id
The id of the workflow version that this task belongs to.
Returns:
Type | Description |
---|---|
str | None | The id of the workflow version that this task belongs to. |
task_run_errors
A helper intended to be used in an on-failure step to retrieve the errors that occurred in upstream task runs.
Returns:
Type | Description |
---|---|
dict[str, str] | A dictionary mapping task names to their error messages. |
Functions
was_skipped
Check if a given task was skipped. You can read about skipping in the docs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | Task[TWorkflowInput, R] | The task to check the status of (skipped or not). | required |
Returns:
Type | Description |
---|---|
bool | True if the task was skipped, False otherwise. |
task_output
Get the output of a parent task in a DAG.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | Task[TWorkflowInput, R] | The task whose output you want to retrieve. | required |
Returns:
Type | Description |
---|---|
R | The output of the parent task, validated against the task’s validators. |
Raises:
Type | Description |
---|---|
ValueError | If the task was skipped or if the step output for the task is not found. |
cancel
Cancel the current task run. This will call the Hatchet API to cancel the step run and set the exit flag to True.
Returns:
Type | Description |
---|---|
None | None |
aio_cancel
Cancel the current task run. This will call the Hatchet API to cancel the step run and set the exit flag to True.
Returns:
Type | Description |
---|---|
None | None |
done
Check if the current task run has been cancelled.
Returns:
Type | Description |
---|---|
bool | True if the task run has been cancelled, False otherwise. |
log
Log a line to the Hatchet API. This will send the log line to the Hatchet API and return immediately.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
line | str | JSONSerializableMapping | The line to log. Can be a string or a JSON serializable mapping. | required |
raise_on_error | bool | If True, will raise an exception if the log fails. Defaults to False. | False |
Returns:
Type | Description |
---|---|
None | None |
release_slot
Manually release the slot for the current step run to free up a slot on the worker. Note that this is an advanced feature and should be used with caution.
Returns:
Type | Description |
---|---|
None | None |
put_stream
Put a stream event to the Hatchet API. This will send the data to the Hatchet API and return immediately. You can then subscribe to the stream from a separate consumer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data | str | bytes | The data to send to the Hatchet API. Can be a string or bytes. | required |
Returns:
Type | Description |
---|---|
None | None |
refresh_timeout
Refresh the timeout for the current task run. You can read about refreshing timeouts in the docs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
increment_by | str | timedelta | The amount of time to increment the timeout by. Can be a string (e.g. “5m”) or a timedelta object. | required |
Returns:
Type | Description |
---|---|
None | None |
fetch_task_run_error
A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | Task[TWorkflowInput, R] | The task whose error you want to retrieve. | required |
Returns:
Type | Description |
---|---|
str | None | The error message of the task run, or None if no error occurred. |
DurableContext
Bases: Context
Methods
Name | Description |
---|---|
aio_wait_for | Durably wait for either a sleep or an event. |
aio_sleep_for | Lightweight wrapper for durable sleep. Allows for shorthand usage of ctx.aio_wait_for when specifying a sleep condition. |
Functions
aio_wait_for
Durably wait for either a sleep or an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal_key | str | The key to use for the durable event. This is used to identify the event in the Hatchet API. | required |
*conditions | SleepCondition | UserEventCondition | The conditions to wait for. Can be a SleepCondition or UserEventCondition. | () |
Returns:
Type | Description |
---|---|
dict[str, Any] | A dictionary containing the results of the wait. |
Raises:
Type | Description |
---|---|
ValueError | If the durable event listener is not available. |
aio_sleep_for
Lightweight wrapper for durable sleep. Allows for shorthand usage of ctx.aio_wait_for
when specifying a sleep condition.
For more complicated conditions, use ctx.aio_wait_for
directly.