Documentation Index
Fetch the complete documentation index at: https://mnah05-boiler-go-21-79.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The scheduler package provides a client wrapper around Asynq for scheduling and enqueueing background tasks. It simplifies task creation and provides methods for enqueuing tasks with or without tracking IDs.
Types
Client
Wraps asynq.Client for task scheduling.
type Client struct {
client *asynq.Client
}
Functions
NewClient
Creates a new scheduler client connected to Redis.
func NewClient(redisOpt asynq.RedisClientOpt) *Client
Redis connection configuration for Asynq
Returns: *Client - Scheduler client instance
Close
Closes the client connection to Redis.
func (c *Client) Close() error
Returns: error - Error if close fails
Enqueue
Enqueues a task for background processing.
func (c *Client) Enqueue(
ctx context.Context,
taskType string,
payload []byte,
opts ...asynq.Option,
) error
Context for cancellation and timeout
Task type identifier (e.g., worker:ping)
Task payload as JSON-encoded bytes
Optional Asynq options for queue, retry, timeout, etc.
Returns: error - Error if enqueue fails
EnqueueWithID
Enqueues a task and returns the task ID for tracking.
func (c *Client) EnqueueWithID(
ctx context.Context,
taskType string,
payload []byte,
opts ...asynq.Option,
) (string, error)
Context for cancellation and timeout
Task payload as JSON-encoded bytes
Returns:
string - Task ID for tracking
error - Error if enqueue fails
Usage
Initialize scheduler client
import (
"boiler-go/internal/scheduler"
"github.com/hibiken/asynq"
)
func main() {
// Create scheduler client
client := scheduler.NewClient(asynq.RedisClientOpt{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
defer client.Close()
// Client is ready to enqueue tasks
}
Example from API server
// From cmd/api/main.go
func main() {
cfg := config.Load(logger.New())
logg := newLogger(cfg, "logs/api.log")
// Initialize scheduler client for worker task enqueueing
schedulerClient := scheduler.NewClient(asynq.RedisClientOpt{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
})
logg.Info().Msg("scheduler client initialized")
defer schedulerClient.Close()
// Pass to router for handlers
router := handler.NewRouter(logg, cfg, db.Get(), rdb, schedulerClient)
}
Enqueue a simple task
import (
"context"
"encoding/json"
"boiler-go/internal/queue"
"boiler-go/internal/tasks"
"github.com/hibiken/asynq"
)
// Define payload
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
// Enqueue task
payload := EmailPayload{
To: "user@example.com",
Subject: "Welcome",
Body: "Thanks for signing up!",
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return err
}
err = client.Enqueue(
ctx,
"email:send",
payloadBytes,
asynq.Queue(queue.QueueDefault),
asynq.MaxRetry(3),
)
Enqueue with task tracking
// From internal/handler/worker.go
func (h *WorkerHandler) Ping(c echo.Context) error {
// Build payload
payload := PingTaskPayload{
Message: "ping from API",
RequestID: c.Request().Header.Get("X-Request-ID"),
QueuedAt: time.Now().UTC(),
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return c.JSON(500, map[string]string{"error": "failed to marshal payload"})
}
// Enqueue with ID for tracking
taskID, err := h.scheduler.EnqueueWithID(
c.Request().Context(),
tasks.TypeWorkerPing,
payloadBytes,
asynq.Queue(queue.QueueDefault),
asynq.MaxRetry(3),
asynq.Timeout(30*time.Second),
)
if err != nil {
return c.JSON(503, map[string]string{"error": "failed to enqueue"})
}
// Return task ID to client
return c.JSON(202, map[string]any{
"task_id": taskID,
"status": "queued",
})
}
Advanced task options
import (
"time"
"github.com/hibiken/asynq"
)
// Schedule task for later
err = client.Enqueue(
ctx,
"report:generate",
payloadBytes,
asynq.Queue(queue.QueueDefault),
asynq.ProcessIn(1*time.Hour), // Run in 1 hour
)
// High priority task
err = client.Enqueue(
ctx,
"alert:send",
payloadBytes,
asynq.Queue(queue.QueueCritical),
asynq.MaxRetry(5),
asynq.Timeout(2*time.Minute),
)
// Unique task (deduplicated)
err = client.Enqueue(
ctx,
"cache:refresh",
payloadBytes,
asynq.Queue(queue.QueueLow),
asynq.Unique(5*time.Minute), // Only one instance per 5 minutes
)
Task options
Common Asynq options available:
asynq.Queue(name) - Specify queue (default, critical, low)
asynq.MaxRetry(n) - Maximum retry attempts
asynq.Timeout(duration) - Task execution timeout
asynq.ProcessIn(duration) - Delay before processing
asynq.ProcessAt(time) - Schedule for specific time
asynq.Unique(ttl) - Prevent duplicate tasks
asynq.Retention(duration) - Keep completed task info
Error handling
taskID, err := client.EnqueueWithID(ctx, taskType, payload, opts...)
if err != nil {
// Handle enqueueing errors
log.Error().
Err(err).
Str("task_type", taskType).
Msg("failed to enqueue task")
return fmt.Errorf("task enqueue failed: %w", err)
}
log.Info().
Str("task_id", taskID).
Str("task_type", taskType).
Msg("task enqueued successfully")