Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mnah05-boiler-go-21-79.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

The tasks package defines shared task type constants used across the codebase. It ensures consistency between task enqueueing in handlers and task processing in workers.

Constants

Task types

const (
    // TypeWorkerPing is used to verify the worker is alive and processing tasks.
    TypeWorkerPing = "worker:ping"
)
TypeWorkerPing
string
default:"worker:ping"
Task type for worker health verification

Usage

Enqueueing tasks

import (
    "boiler-go/internal/queue"
    "boiler-go/internal/scheduler"
    "boiler-go/internal/tasks"
    "github.com/hibiken/asynq"
)

// Enqueue a worker ping task
taskID, err := client.EnqueueWithID(
    ctx,
    tasks.TypeWorkerPing,  // Use shared constant
    payloadBytes,
    asynq.Queue(queue.QueueDefault),
    asynq.MaxRetry(3),
)

Example from worker handler

// From internal/handler/worker.go
func (h *WorkerHandler) Ping(c echo.Context) error {
    log := logger.FromEchoContext(c)
    requestID := c.Request().Header.Get("X-Request-ID")

    // Build payload
    payload := PingTaskPayload{
        Message:   "ping from API",
        RequestID: requestID,
        QueuedAt:  time.Now().UTC(),
    }
    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        return c.JSON(500, map[string]string{"error": "failed to marshal payload"})
    }

    // Enqueue using shared task type constant
    taskID, err := h.scheduler.EnqueueWithID(
        c.Request().Context(),
        tasks.TypeWorkerPing,  // Shared constant ensures consistency
        payloadBytes,
        asynq.Queue(queue.QueueDefault),
        asynq.MaxRetry(3),
        asynq.Timeout(30*time.Second),
    )
    if err != nil {
        log.Error().Err(err).Msg("failed to enqueue worker ping task")
        return c.JSON(503, map[string]string{"error": "failed to enqueue task"})
    }

    log.Info().
        Str("task_id", taskID).
        Str("task_type", tasks.TypeWorkerPing).
        Str("request_id", requestID).
        Msg("worker ping task enqueued")

    return c.JSON(202, map[string]any{
        "task_id":   taskID,
        "task_type": tasks.TypeWorkerPing,
        "status":    "queued",
    })
}

Processing tasks in worker

// From cmd/worker/main.go
import (
    "boiler-go/internal/tasks"
    "github.com/hibiken/asynq"
)

func main() {
    mux := asynq.NewServeMux()

    // Register handler using shared task type constant
    mux.HandleFunc(tasks.TypeWorkerPing, func(ctx context.Context, t *asynq.Task) error {
        var payload PingTaskPayload
        if err := json.Unmarshal(t.Payload(), &payload); err != nil {
            log.Error().Err(err).Msg("failed to unmarshal payload")
            return err
        }

        log.Info().
            Str("task_type", t.Type()).
            Str("payload", payload.Message).
            Str("request_id", payload.RequestID).
            Msg("worker ping task processed - worker is alive!")

        return nil
    })

    srv := asynq.NewServer(redisOpt, asynq.Config{Concurrency: 10})
    srv.Run(mux)
}

Adding custom task types

To add new task types, define them as constants in this package:
const (
    // TypeWorkerPing is used to verify the worker is alive and processing tasks.
    TypeWorkerPing = "worker:ping"
    
    // TypeEmailSend sends an email to a user.
    TypeEmailSend = "email:send"
    
    // TypeReportGenerate generates a report.
    TypeReportGenerate = "report:generate"
    
    // TypeDataSync synchronizes data with external service.
    TypeDataSync = "data:sync"
)

Example: Email sending task

// Define constant
const TypeEmailSend = "email:send"

// Enqueue in handler
type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

payload := EmailPayload{
    To:      "user@example.com",
    Subject: "Welcome",
    Body:    "Thanks for signing up!",
}
payloadBytes, _ := json.Marshal(payload)

err := client.Enqueue(
    ctx,
    tasks.TypeEmailSend,
    payloadBytes,
    asynq.Queue(queue.QueueDefault),
)

// Process in worker
mux.HandleFunc(tasks.TypeEmailSend, func(ctx context.Context, t *asynq.Task) error {
    var payload EmailPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return err
    }
    
    // Send email
    return emailService.Send(payload.To, payload.Subject, payload.Body)
})

Task naming conventions

Follow these conventions for task type names:
  • Format: category:action
  • Category: Logical grouping (worker, email, report, etc.)
  • Action: Operation to perform (ping, send, generate, etc.)
  • Case: Lowercase with colon separator
Examples:
  • worker:ping - Worker health check
  • email:send - Send email
  • report:generate - Generate report
  • data:sync - Synchronize data
  • cache:refresh - Refresh cache
  • notification:push - Send push notification

Task payloads

Define payload structs for type safety:
// PingTaskPayload for worker:ping task
type PingTaskPayload struct {
    Message   string    `json:"message"`
    RequestID string    `json:"request_id"`
    QueuedAt  time.Time `json:"queued_at"`
}

// EmailPayload for email:send task
type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

// ReportPayload for report:generate task
type ReportPayload struct {
    UserID    string    `json:"user_id"`
    StartDate time.Time `json:"start_date"`
    EndDate   time.Time `json:"end_date"`
    Format    string    `json:"format"`
}

Best practices

  1. Use shared constants - Always use tasks.TypeX instead of string literals
  2. Consistent naming - Follow category:action naming convention
  3. Type-safe payloads - Define structs for task payloads
  4. Include correlation IDs - Add request_id or trace_id to payloads
  5. Document task types - Add comments explaining what each task does
  6. Version payloads - Consider adding version field for payload evolution

Testing tasks

Use the worker ping endpoint to verify task processing:
curl -X POST http://localhost:8080/worker/ping \
  -H "Content-Type: application/json" \
  -d '{"message": "testing worker"}'
Response:
{
  "success": true,
  "task_id": "d290f1ee-6c54-4b01-90e6-d701748f0851",
  "task_type": "worker:ping",
  "queued_at": "2024-01-15T10:30:00Z",
  "message": "Task queued successfully. Check worker logs to verify processing."
}
Check worker logs for task processing:
{"level":"info","task_type":"worker:ping","task_id":"d290f1ee...","message":"task started"}
{"level":"info","task_type":"worker:ping","payload":"testing worker","message":"worker ping task processed - worker is alive!"}
{"level":"info","task_type":"worker:ping","task_id":"d290f1ee...","duration":15,"message":"task completed"}