Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mnah05-boiler-go-21-79.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

The scheduler package provides a client wrapper around Asynq for scheduling and enqueueing background tasks. It simplifies task creation and provides methods for enqueuing tasks with or without tracking IDs.

Types

Client

Wraps asynq.Client for task scheduling.
type Client struct {
    client *asynq.Client
}

Functions

NewClient

Creates a new scheduler client connected to Redis.
func NewClient(redisOpt asynq.RedisClientOpt) *Client
redisOpt
asynq.RedisClientOpt
Redis connection configuration for Asynq
Returns: *Client - Scheduler client instance

Close

Closes the client connection to Redis.
func (c *Client) Close() error
Returns: error - Error if close fails

Enqueue

Enqueues a task for background processing.
func (c *Client) Enqueue(
    ctx context.Context,
    taskType string,
    payload []byte,
    opts ...asynq.Option,
) error
ctx
context.Context
Context for cancellation and timeout
taskType
string
Task type identifier (e.g., worker:ping)
payload
[]byte
Task payload as JSON-encoded bytes
opts
...asynq.Option
Optional Asynq options for queue, retry, timeout, etc.
Returns: error - Error if enqueue fails

EnqueueWithID

Enqueues a task and returns the task ID for tracking.
func (c *Client) EnqueueWithID(
    ctx context.Context,
    taskType string,
    payload []byte,
    opts ...asynq.Option,
) (string, error)
ctx
context.Context
Context for cancellation and timeout
taskType
string
Task type identifier
payload
[]byte
Task payload as JSON-encoded bytes
opts
...asynq.Option
Optional Asynq options
Returns:
  • string - Task ID for tracking
  • error - Error if enqueue fails

Usage

Initialize scheduler client

import (
    "boiler-go/internal/scheduler"
    "github.com/hibiken/asynq"
)

func main() {
    // Create scheduler client
    client := scheduler.NewClient(asynq.RedisClientOpt{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })
    defer client.Close()
    
    // Client is ready to enqueue tasks
}

Example from API server

// From cmd/api/main.go
func main() {
    cfg := config.Load(logger.New())
    logg := newLogger(cfg, "logs/api.log")
    
    // Initialize scheduler client for worker task enqueueing
    schedulerClient := scheduler.NewClient(asynq.RedisClientOpt{
        Addr:     cfg.RedisAddr,
        Password: cfg.RedisPassword,
        DB:       cfg.RedisDB,
    })
    logg.Info().Msg("scheduler client initialized")
    defer schedulerClient.Close()
    
    // Pass to router for handlers
    router := handler.NewRouter(logg, cfg, db.Get(), rdb, schedulerClient)
}

Enqueue a simple task

import (
    "context"
    "encoding/json"
    "boiler-go/internal/queue"
    "boiler-go/internal/tasks"
    "github.com/hibiken/asynq"
)

// Define payload
type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

// Enqueue task
payload := EmailPayload{
    To:      "user@example.com",
    Subject: "Welcome",
    Body:    "Thanks for signing up!",
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
    return err
}

err = client.Enqueue(
    ctx,
    "email:send",
    payloadBytes,
    asynq.Queue(queue.QueueDefault),
    asynq.MaxRetry(3),
)

Enqueue with task tracking

// From internal/handler/worker.go
func (h *WorkerHandler) Ping(c echo.Context) error {
    // Build payload
    payload := PingTaskPayload{
        Message:   "ping from API",
        RequestID: c.Request().Header.Get("X-Request-ID"),
        QueuedAt:  time.Now().UTC(),
    }
    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        return c.JSON(500, map[string]string{"error": "failed to marshal payload"})
    }

    // Enqueue with ID for tracking
    taskID, err := h.scheduler.EnqueueWithID(
        c.Request().Context(),
        tasks.TypeWorkerPing,
        payloadBytes,
        asynq.Queue(queue.QueueDefault),
        asynq.MaxRetry(3),
        asynq.Timeout(30*time.Second),
    )
    if err != nil {
        return c.JSON(503, map[string]string{"error": "failed to enqueue"})
    }

    // Return task ID to client
    return c.JSON(202, map[string]any{
        "task_id": taskID,
        "status":  "queued",
    })
}

Advanced task options

import (
    "time"
    "github.com/hibiken/asynq"
)

// Schedule task for later
err = client.Enqueue(
    ctx,
    "report:generate",
    payloadBytes,
    asynq.Queue(queue.QueueDefault),
    asynq.ProcessIn(1*time.Hour), // Run in 1 hour
)

// High priority task
err = client.Enqueue(
    ctx,
    "alert:send",
    payloadBytes,
    asynq.Queue(queue.QueueCritical),
    asynq.MaxRetry(5),
    asynq.Timeout(2*time.Minute),
)

// Unique task (deduplicated)
err = client.Enqueue(
    ctx,
    "cache:refresh",
    payloadBytes,
    asynq.Queue(queue.QueueLow),
    asynq.Unique(5*time.Minute), // Only one instance per 5 minutes
)

Task options

Common Asynq options available:
  • asynq.Queue(name) - Specify queue (default, critical, low)
  • asynq.MaxRetry(n) - Maximum retry attempts
  • asynq.Timeout(duration) - Task execution timeout
  • asynq.ProcessIn(duration) - Delay before processing
  • asynq.ProcessAt(time) - Schedule for specific time
  • asynq.Unique(ttl) - Prevent duplicate tasks
  • asynq.Retention(duration) - Keep completed task info

Error handling

taskID, err := client.EnqueueWithID(ctx, taskType, payload, opts...)
if err != nil {
    // Handle enqueueing errors
    log.Error().
        Err(err).
        Str("task_type", taskType).
        Msg("failed to enqueue task")
    return fmt.Errorf("task enqueue failed: %w", err)
}

log.Info().
    Str("task_id", taskID).
    Str("task_type", taskType).
    Msg("task enqueued successfully")