Hatchet Go V1 Migration Guide
This guide will help you migrate from the V0 SDK to the V1 SDK. Note that the v1 engine will continue to support v0 tasks until September 30th, 2025.
The v1 Go SDK can be found in github.com/hatchet-dev/hatchet/pkg/v1
. You can instantiate a new Hatchet client via:
package main
import (
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
)
func main() {
hatchet, err := v1.NewHatchetClient()
// ...
}
Declaring Tasks and Workflows
In the new SDKs, tasks use Go generics to make typing easier. As a result, there are a number of improvements made to the context methods and the way tasks are defined.
Single Function Tasks
Single function tasks are much easier to define than before using the factory
package in the V1 SDK. Here’s an example of how to define a simple task with the V1 SDK:
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
type SimpleInput struct {
Message string
}
type SimpleResult struct {
TransformedMessage string
}
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet, // a Hatchet client instance
)
The simple
task automatically inherits type definitions from the function signature, so invoking simple
will automatically provide type-checking when the task is run:
result, err := simple.Run(ctx, SimpleInput{
Message: "Hello, World!",
})
if err != nil {
return err
}
fmt.Println(result.TransformedMessage)
Complex Workflows
Complex, multi-task workflows can be defined using the NewWorkflow
method on the factory
package. Here’s an example of how to define a workflow with multiple tasks:
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
simple := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
},
hatchet,
)
step1 := simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step1",
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
var step1Output SimpleOutput
err := ctx.ParentOutput(step1, &step1Output)
if err != nil {
return nil, err
}
return &SimpleOutput{
Step: 2,
}, nil
},
)
return simple
Note that due to limitations with Go’s type system, any methods defined on a
workflow from factory.NewWorkflow
will need to return a interface{}
type.
Task Configuration
In the V1 SDK, tasks are configured using the create
package. The create
package provides a number of options for configuring tasks, such as setting the task name, setting the task’s parent tasks, and setting the task’s wait conditions:
// import the create package
import "github.com/hatchet-dev/hatchet/pkg/client/create"
// utilize the following structs to configure tasks
type WorkflowTask[I, O any] struct {
// (required) The name of the task
Name string
// (optional) The version of the task
Version string
// (optional) The human-readable description of the task
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// WaitFor represents a set of conditions which must be satisfied before the task can run.
WaitFor condition.Condition
// SkipIf represents a set of conditions which, if satisfied, will cause the task to be skipped.
SkipIf condition.Condition
// CancelIf represents a set of conditions which, if satisfied, will cause the task to be canceled.
CancelIf condition.Condition
// (optional) Parents are the tasks that must successfully complete before this task can start
Parents []NamedTask
}
type WorkflowOnFailureTask[I, O any] struct {
// (optional) The version of the task
Version string
// (optional) The human-readable description of the task
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
}
// TaskCreateOpts defines options for creating a standalone task.
type StandaloneTask struct {
// (required) The name of the task
Name string
// (optional) The version of the task
Version string
// (optional) The human-readable description of the task
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// (optional) The event names that trigger the workflow
OnEvents []string
// (optional) The cron expressions for scheduled workflow runs
OnCron []string
}
Workers
To declare workers, you can use the Worker
method on the v1
Hatchet client. Note that instead of calling RegisterWorkflow
on the worker, you can pass the workflows directly to the Worker
method:
package main
import (
"context"
"fmt"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
"github.com/hatchet-dev/hatchet/pkg/v1/worker"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
)
func main() {
hatchet, err := v1.NewHatchetClient()
if err != nil {
panic(err)
}
worker, err := hatchet.Worker(
create.WorkerOpts{
Name: fmt.Sprintf("%s-worker", workflowName),
Workflows: []workflow.WorkflowBase{workflow}, // add your workflow here
},
)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
worker.StartBlocking(ctx)
}