Hatchet Go V1 Migration Guide
This guide will help you migrate Hatchet workflows from the V0 SDK to the V1 SDK. Note that the v1 engine will continue to support v0 workflows until September 30th, 2025.
The v1 Go SDK can be found in github.com/hatchet-dev/hatchet/pkg/v1
. You can instantiate a new Hatchet client via:
package main
import (
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
)
func main() {
hatchet, err := v1.NewHatchetClient()
// ...
}
Declaring Tasks and Workflows
In the new SDKs, tasks and workflows use Go generics to make typing easier. As a result, there are a number of improvements made to the context methods and the way tasks are defined.
Single-Task Workflows
Single tasks are much easier to define than before using the factory
package in the V1 SDK. Here’s an example of how to define a simple workflow with the V1 SDK:
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
type SimpleInput struct {
Message string
}
type SimpleResult struct {
TransformedMessage string
}
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet, // a Hatchet client instance
)
The simple
task automatically inherits type definitions from the function signature, so invoking simple
will automatically provide type-checking when the task is run:
result, err := simple.Run(ctx, SimpleInput{
Message: "Hello, World!",
})
if err != nil {
return err
}
fmt.Println(result.TransformedMessage)
Complex Workflows
Complex, multi-task workflows can be defined using the NewWorkflow
method on the factory
package. Here’s an example of how to define a workflow with multiple tasks:
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
simple := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
},
hatchet,
)
step1 := simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step1",
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
var step1Output SimpleOutput
err := ctx.ParentOutput(step1, &step1Output)
if err != nil {
return nil, err
}
return &SimpleOutput{
Step: 2,
}, nil
},
)
return simple
Note that due to limitations with Go’s type system, any methods defined on a
workflow from factory.NewWorkflow
will need to return a interface{}
type.
Workflow and Task Configuration
In the V1 SDK, workflows and tasks are configured using the create
package. The create
package provides a number of options for configuring tasks and workflows, such as setting the task name, setting the task’s parent tasks, and setting the task’s wait conditions:
// import the create package
import "github.com/hatchet-dev/hatchet/pkg/client/create"
// utilize the following structs to configure tasks and workflows
type WorkflowTask[I, O any] struct {
// (required) The name of the task and workflow
Name string
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// WaitFor represents a set of conditions which must be satisfied before the task can run.
WaitFor condition.Condition
// SkipIf represents a set of conditions which, if satisfied, will cause the task to be skipped.
SkipIf condition.Condition
// CancelIf represents a set of conditions which, if satisfied, will cause the task to be canceled.
CancelIf condition.Condition
// (optional) Parents are the tasks that must successfully complete before this task can start
Parents []NamedTask
}
type WorkflowOnFailureTask[I, O any] struct {
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
}
// TaskCreateOpts defines options for creating a standalone task.
// This combines both workflow and task properties in a single type.
type StandaloneTask struct {
// (required) The name of the task and workflow
Name string
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// (optional) The event names that trigger the workflow
OnEvents []string
// (optional) The cron expressions for scheduled workflow runs
OnCron []string
}
Workers
To declare workers, you can use the Worker
method on the v1
Hatchet client. Note that instead of calling RegisterWorkflow
on the worker, you can pass the workflows directly to the Worker
method:
package main
import (
"context"
"fmt"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
"github.com/hatchet-dev/hatchet/pkg/v1/worker"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
)
func main() {
hatchet, err := v1.NewHatchetClient()
if err != nil {
panic(err)
}
worker, err := hatchet.Worker(
create.WorkerOpts{
Name: fmt.Sprintf("%s-worker", workflowName),
Workflows: []workflow.WorkflowBase{workflow}, // add your workflow here
},
)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
worker.StartBlocking(ctx)
}