Boiler-Go uses Asynq for background job processing. Tasks are enqueued from the API server and processed asynchronously by the worker.
Architecture
The background job system consists of three main components:
Scheduler client
Enqueues tasks from the API server
Redis queue
Stores pending and active tasks
Worker server
Processes tasks asynchronously
Queue priorities
Boiler-Go supports three priority levels defined in internal/queue/queue.go:
const (
QueueCritical = "critical" // Highest priority
QueueDefault = "default" // Standard priority
QueueLow = "low" // Lowest priority
)
func Priorities() map[string]int {
return map[string]int{
QueueCritical: 6,
QueueDefault: 3,
QueueLow: 1,
}
}
Higher weight values indicate higher priority. The worker processes tasks from higher priority queues more frequently.
Task types
Task types are defined as constants in internal/tasks/tasks.go to ensure consistency:
const (
TypeWorkerPing = "worker:ping"
)
Always define task types as constants in the tasks package. This prevents typos and ensures both the API and worker use identical task identifiers.
Enqueuing tasks
The API server uses the scheduler client to enqueue tasks. Initialize the client in cmd/api/main.go:80-85:
schedulerClient := scheduler.NewClient(asynq.RedisClientOpt{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
})
defer schedulerClient.Close()
Basic enqueue
import "boiler-go/internal/scheduler"
func EnqueueTask(ctx context.Context, client *scheduler.Client) error {
payload := map[string]string{"message": "hello"}
jsonPayload, _ := json.Marshal(payload)
return client.Enqueue(
ctx,
tasks.TypeWorkerPing,
jsonPayload,
asynq.Queue(queue.QueueDefault),
)
}
Enqueue with options
client.Enqueue(ctx, taskType, payload,
asynq.Queue(queue.QueueCritical), // Set priority queue
asynq.MaxRetry(5), // Retry up to 5 times
asynq.Timeout(5*time.Minute), // Task timeout
asynq.ProcessIn(10*time.Second), // Delay execution
asynq.Unique(24*time.Hour), // Prevent duplicates
)
Get task ID
taskID, err := client.EnqueueWithID(
ctx,
tasks.TypeWorkerPing,
payload,
asynq.Queue(queue.QueueDefault),
)
if err != nil {
return fmt.Errorf("enqueue failed: %w", err)
}
fmt.Printf("Task enqueued with ID: %s\n", taskID)
Worker configuration
The worker is configured in cmd/worker/main.go:79-105 with production-ready settings:
srv := asynq.NewServer(
redisOpt,
asynq.Config{
Concurrency: 10, // Process 10 tasks concurrently
Queues: queue.Priorities(), // Queue priority map
// Exponential backoff retry strategy
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
// 1s, 2s, 4s, 8s, 16s...
return time.Duration(1<<uint(n)) * time.Second
},
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
logg.Error().
Err(err).
Str("task_type", task.Type()).
Str("task_id", taskID).
Msg("task processing failed")
}),
},
)
Set concurrency
The Concurrency parameter controls how many tasks can run simultaneously
Configure queues
Queue priorities determine task processing order
Define retry strategy
Failed tasks are retried with exponential backoff
Handle errors
The error handler logs all task failures for debugging
Setting Concurrency too high can overwhelm your database or external services. Start with a conservative value and increase based on monitoring.
Processing tasks
Register task handlers using asynq.ServeMux in cmd/worker/main.go:107-132:
mux := asynq.NewServeMux()
// Add logging middleware
mux.Use(loggingMiddleware(logg))
// Register task handler
mux.HandleFunc(tasks.TypeWorkerPing, func(ctx context.Context, t *asynq.Task) error {
var payload PingTaskPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("invalid payload: %w", err)
}
logg.Info().
Str("task_type", t.Type()).
Str("message", payload.Message).
Msg("task processed successfully")
return nil
})
Middleware
The worker includes logging middleware that tracks task execution:
func loggingMiddleware(logg zerolog.Logger) asynq.MiddlewareFunc {
return func(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
start := time.Now()
logg.Info().
Str("task_type", task.Type()).
Msg("task started")
err := next.ProcessTask(ctx, task)
duration := time.Since(start)
if err != nil {
logg.Error().
Err(err).
Dur("duration", duration).
Msg("task failed")
} else {
logg.Info().
Dur("duration", duration).
Msg("task completed")
}
return err
})
}
}
Retry behavior
The worker uses exponential backoff for retries:
| Attempt | Delay |
|---|
| 1 | 1s |
| 2 | 2s |
| 3 | 4s |
| 4 | 8s |
| 5 | 16s |
| 6 | 32s |
The default maximum retry count is 25. Configure per-task retries using asynq.MaxRetry() when enqueuing.
Graceful shutdown
The worker implements graceful shutdown in cmd/worker/main.go:154-178:
srv.Stop() // Stop accepting new tasks
logg.Info().Msg("worker stopped accepting new tasks")
shutdownCtx, cancel := context.WithTimeout(context.Background(), cfg.WorkerShutdownTimeout)
defer cancel()
done := make(chan struct{})
go func() {
srv.Shutdown() // Wait for in-flight tasks
close(done)
}()
select {
case <-done:
logg.Info().Msg("worker shutdown completed gracefully")
case <-shutdownCtx.Done():
logg.Warn().Msg("worker shutdown timed out, forcing exit")
}
Stop accepting tasks
The worker stops pulling new tasks from Redis
Wait for completion
In-flight tasks are given time to complete
Enforce timeout
If tasks don’t complete within WORKER_SHUTDOWN_TIMEOUT, the worker forcefully exits
Best practices
- Idempotent handlers: Tasks may be retried, so ensure handlers are idempotent
- Timeout tasks: Always set reasonable timeouts using
asynq.Timeout()
- Monitor queues: Use the Asynq dashboard to monitor queue health
- Structured payloads: Use JSON for task payloads with versioned schemas
- Error handling: Return descriptive errors to aid debugging in logs
// Good: Idempotent task handler
func ProcessPayment(ctx context.Context, t *asynq.Task) error {
var payload PaymentPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("unmarshal failed: %w", err)
}
// Check if already processed (idempotency)
if alreadyProcessed(payload.PaymentID) {
return nil // Skip processing
}
// Process payment...
return nil
}