Skip to main content
Boiler-Go uses Asynq for background job processing. Tasks are enqueued from the API server and processed asynchronously by the worker.

Architecture

The background job system consists of three main components:

Scheduler client

Enqueues tasks from the API server

Redis queue

Stores pending and active tasks

Worker server

Processes tasks asynchronously

Queue priorities

Boiler-Go supports three priority levels defined in internal/queue/queue.go:
const (
    QueueCritical = "critical"  // Highest priority
    QueueDefault  = "default"   // Standard priority
    QueueLow      = "low"       // Lowest priority
)

func Priorities() map[string]int {
    return map[string]int{
        QueueCritical: 6,
        QueueDefault:  3,
        QueueLow:      1,
    }
}
Higher weight values indicate higher priority. The worker processes tasks from higher priority queues more frequently.

Task types

Task types are defined as constants in internal/tasks/tasks.go to ensure consistency:
const (
    TypeWorkerPing = "worker:ping"
)
Always define task types as constants in the tasks package. This prevents typos and ensures both the API and worker use identical task identifiers.

Enqueuing tasks

The API server uses the scheduler client to enqueue tasks. Initialize the client in cmd/api/main.go:80-85:
schedulerClient := scheduler.NewClient(asynq.RedisClientOpt{
    Addr:     cfg.RedisAddr,
    Password: cfg.RedisPassword,
    DB:       cfg.RedisDB,
})
defer schedulerClient.Close()

Basic enqueue

import "boiler-go/internal/scheduler"

func EnqueueTask(ctx context.Context, client *scheduler.Client) error {
    payload := map[string]string{"message": "hello"}
    jsonPayload, _ := json.Marshal(payload)
    
    return client.Enqueue(
        ctx,
        tasks.TypeWorkerPing,
        jsonPayload,
        asynq.Queue(queue.QueueDefault),
    )
}

Enqueue with options

client.Enqueue(ctx, taskType, payload,
    asynq.Queue(queue.QueueCritical),  // Set priority queue
    asynq.MaxRetry(5),                  // Retry up to 5 times
    asynq.Timeout(5*time.Minute),       // Task timeout
    asynq.ProcessIn(10*time.Second),    // Delay execution
    asynq.Unique(24*time.Hour),         // Prevent duplicates
)

Get task ID

taskID, err := client.EnqueueWithID(
    ctx,
    tasks.TypeWorkerPing,
    payload,
    asynq.Queue(queue.QueueDefault),
)
if err != nil {
    return fmt.Errorf("enqueue failed: %w", err)
}
fmt.Printf("Task enqueued with ID: %s\n", taskID)

Worker configuration

The worker is configured in cmd/worker/main.go:79-105 with production-ready settings:
srv := asynq.NewServer(
    redisOpt,
    asynq.Config{
        Concurrency: 10,              // Process 10 tasks concurrently
        Queues: queue.Priorities(),   // Queue priority map
        
        // Exponential backoff retry strategy
        RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
            // 1s, 2s, 4s, 8s, 16s...
            return time.Duration(1<<uint(n)) * time.Second
        },
        
        ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
            logg.Error().
                Err(err).
                Str("task_type", task.Type()).
                Str("task_id", taskID).
                Msg("task processing failed")
        }),
    },
)
1

Set concurrency

The Concurrency parameter controls how many tasks can run simultaneously
2

Configure queues

Queue priorities determine task processing order
3

Define retry strategy

Failed tasks are retried with exponential backoff
4

Handle errors

The error handler logs all task failures for debugging
Setting Concurrency too high can overwhelm your database or external services. Start with a conservative value and increase based on monitoring.

Processing tasks

Register task handlers using asynq.ServeMux in cmd/worker/main.go:107-132:
mux := asynq.NewServeMux()

// Add logging middleware
mux.Use(loggingMiddleware(logg))

// Register task handler
mux.HandleFunc(tasks.TypeWorkerPing, func(ctx context.Context, t *asynq.Task) error {
    var payload PingTaskPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return fmt.Errorf("invalid payload: %w", err)
    }
    
    logg.Info().
        Str("task_type", t.Type()).
        Str("message", payload.Message).
        Msg("task processed successfully")
    
    return nil
})

Middleware

The worker includes logging middleware that tracks task execution:
func loggingMiddleware(logg zerolog.Logger) asynq.MiddlewareFunc {
    return func(next asynq.Handler) asynq.Handler {
        return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
            start := time.Now()
            
            logg.Info().
                Str("task_type", task.Type()).
                Msg("task started")
            
            err := next.ProcessTask(ctx, task)
            duration := time.Since(start)
            
            if err != nil {
                logg.Error().
                    Err(err).
                    Dur("duration", duration).
                    Msg("task failed")
            } else {
                logg.Info().
                    Dur("duration", duration).
                    Msg("task completed")
            }
            
            return err
        })
    }
}

Retry behavior

The worker uses exponential backoff for retries:
AttemptDelay
11s
22s
34s
48s
516s
632s
The default maximum retry count is 25. Configure per-task retries using asynq.MaxRetry() when enqueuing.

Graceful shutdown

The worker implements graceful shutdown in cmd/worker/main.go:154-178:
srv.Stop() // Stop accepting new tasks
logg.Info().Msg("worker stopped accepting new tasks")

shutdownCtx, cancel := context.WithTimeout(context.Background(), cfg.WorkerShutdownTimeout)
defer cancel()

done := make(chan struct{})
go func() {
    srv.Shutdown() // Wait for in-flight tasks
    close(done)
}()

select {
case <-done:
    logg.Info().Msg("worker shutdown completed gracefully")
case <-shutdownCtx.Done():
    logg.Warn().Msg("worker shutdown timed out, forcing exit")
}
1

Stop accepting tasks

The worker stops pulling new tasks from Redis
2

Wait for completion

In-flight tasks are given time to complete
3

Enforce timeout

If tasks don’t complete within WORKER_SHUTDOWN_TIMEOUT, the worker forcefully exits

Best practices

  1. Idempotent handlers: Tasks may be retried, so ensure handlers are idempotent
  2. Timeout tasks: Always set reasonable timeouts using asynq.Timeout()
  3. Monitor queues: Use the Asynq dashboard to monitor queue health
  4. Structured payloads: Use JSON for task payloads with versioned schemas
  5. Error handling: Return descriptive errors to aid debugging in logs
// Good: Idempotent task handler
func ProcessPayment(ctx context.Context, t *asynq.Task) error {
    var payload PaymentPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return fmt.Errorf("unmarshal failed: %w", err)
    }
    
    // Check if already processed (idempotency)
    if alreadyProcessed(payload.PaymentID) {
        return nil // Skip processing
    }
    
    // Process payment...
    return nil
}