Skip to main content

Background Job System (BullMQ)

Overview

Asynchronous task processing using BullMQ + Redis for:

  • Guild roster syncing
  • Avatar fetching (fire-and-forget)
  • Future: Email notifications, data exports, analytics

Architecture

flowchart LR
A[Schedule] --> B[Job Service]
B --> C[BullMQ Queue]
C --> D[Worker Processor]
D --> E[(Database)]
C <--> F[(Redis)]

Jobs

1. Journal Instance Sync (Queue Worker)

Queue: journal-instance-sync Concurrency: 1 (single worker to respect Blizzard rate limits) Purpose: Sync raids and dungeons from Blizzard Game Data API into JournalExpansion, JournalInstance, JournalEncounter

Manual Trigger (admin only):

curl -X POST http://localhost:3001/api/v1/admin/sync-instances \
-H "Authorization: Bearer <admin-token>"

Or via Jobs controller:

curl -X POST http://localhost:3001/api/v1/jobs/sync-instances \
-H "Authorization: Bearer <admin-token>"

Response: { message, sessionId } — use sessionId to subscribe to WebSocket for real-time progress.

Process:

  1. Create JournalSyncSession (pending)
  2. Fetch expansions from Blizzard
  3. For each expansion, fetch journal instances (raids + dungeons)
  4. For each instance, fetch encounters (bosses)
  5. Upsert JournalExpansion, JournalInstance, JournalEncounter
  6. Emit progress via InstanceSyncGateway (WebSocket)
  7. Mark session completed/failed

Job Data: { sessionId: string }

2. Guild Roster Sync (Cron Job)

Schedule: Daily at 3 AM Queue: N/A (direct execution) Purpose: Sync all guild rosters from Battle.net API

@Cron(CronExpression.EVERY_DAY_AT_3AM)
async syncAllGuildRosters()

Process:

  1. Find guilds with at least one claimed member
  2. Get Battle.net access token from user
  3. Fetch roster data from API
  4. Upsert guild members in database
  5. Mark departed members as leftGuildAt
  6. Queue avatar fetch jobs
  7. Auto-claim matching characters

Manual Trigger:

curl -X POST http://localhost:3001/api/v1/jobs/sync-rosters \
-H "Authorization: Bearer <admin-token>"

3. Avatar Fetch (Queue Worker)

Queue: avatar-fetch Concurrency: 10 workers Purpose: Asynchronously fetch character avatars from Battle.net

@Process()
async handleAvatarFetch(job: Job<AvatarFetchJob>)

Job Data:

{
guildMemberId: string;
characterName: string;
realm: string;
accessToken: string;
}

Behavior:

  • Fire-and-forget (no retries)
  • Removes completed jobs from queue
  • Logs failures without throwing

Configuration

Redis Connection

Environment variables:

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD= # Optional

Queue Options

Configured in jobs.module.ts:

BullModule.registerQueue({
name: 'avatar-fetch',
// Default options: 3 attempts, exponential backoff
});

Monitoring

Queue Health Endpoint

Check queue status:

curl http://localhost:3001/api/v1/jobs/health \
-H "Authorization: Bearer <admin-token>"

Returns:

{
"queues": {
"avatar-fetch": {
"active": 5,
"waiting": 23,
"completed": 150,
"failed": 2,
"recentFailures": []
},
"journal-instance-sync": {
"active": 0,
"waiting": 0,
"completed": 12,
"failed": 0
}
},
"status": "active",
"timestamp": "2026-01-31T23:00:00.000Z"
}

Note: Use GET /admin/jobs or GET /jobs/health (admin only).

Metrics

  • Job Completion Rate: completed / (completed + failed)
  • Active Workers: Number of processors
  • Queue Depth: Number of waiting jobs
  • Average Processing Time: Track via logs

Adding New Jobs

1. Define Job Data Interface

// src/jobs/email-notification.processor.ts
export interface EmailNotificationJob {
userId: string;
emailType: 'event-reminder' | 'roster-update';
metadata: Record<string, any>;
}

2. Create Processor

@Processor('email-notification')
export class EmailNotificationProcessor {
@Process()
async handleEmail(job: Job<EmailNotificationJob>) {
// Send email...
}
}

3. Register Queue

// jobs.module.ts
BullModule.registerQueue({
name: 'email-notification',
}),

4. Queue Jobs

@InjectQueue('email-notification')
private emailQueue: Queue<EmailNotificationJob>;

await this.emailQueue.add({
userId: '123',
emailType: 'event-reminder',
metadata: { eventId: 'abc' },
});

Best Practices

  1. Fire-and-forget for non-critical jobs

    • Avatar fetches, notifications
    • Use attempts: 1, removeOnComplete: true
  2. Retry with backoff for critical jobs

    • Payment processing, data exports
    • Use exponential backoff: backoff: { type: 'exponential', delay: 2000 }
  3. Batch operations when possible

    • Use queue.addBulk() instead of multiple queue.add() calls
    • Example: Roster sync queues all avatar jobs at once
  4. Don't block request handlers

    • Queue jobs, don't await them
    • Return immediately to user
  5. Idempotent job handlers

    • Safe to retry without side effects
    • Check if work is already done
  6. Monitor queue depth

    • Alert if queue grows too large
    • Scale workers if needed

Performance

Typical throughput:

  • Avatar fetches: ~10/second
  • Roster syncs: ~1 guild/5 seconds

Redis memory usage:

  • ~1MB per 1000 jobs in queue
  • Completed jobs removed automatically

Troubleshooting

Jobs stuck in queue

# Check Redis
redis-cli
> KEYS bull:avatar-fetch:*
> LLEN bull:avatar-fetch:wait

Solutions:

  • Restart API server (workers will resume)
  • Manually remove stuck jobs: await queue.clean(0, 'failed')

High failure rate

Check logs for common errors:

Failed to fetch avatar: 401 Unauthorized
→ Access token expired, user needs to re-authenticate

Queue growing too fast

  • Check if workers are processing
  • Increase concurrency in processor
  • Add more worker instances (horizontal scaling)

Future Enhancements

  • Bull Board UI for monitoring
  • Job priority levels (high/normal/low)
  • Rate limiting per API (Battle.net, Discord)
  • Job result persistence for analytics
  • Dead letter queue for failed jobs
  • Webhook notifications for job completion