Creating Jobs
Jobs extend the baseJob class and define a handle() method:
Copy
use Lyger\Jobs\Job;
class SendWelcomeEmail extends Job
{
protected int $tries = 3;
protected int $timeout = 60;
protected int $backoff = 60;
protected ?string $queue = 'emails';
public function __construct(
private string $email,
private string $name
) {}
public function handle(): void
{
// Send the email
mail(
$this->email,
'Welcome!',
"Hello {$this->name}, welcome to our platform!"
);
logger()->info("Welcome email sent to {$this->email}");
}
public function failed(\Throwable $exception): void
{
// Log failure
logger()->error("Failed to send email to {$this->email}: " . $exception->getMessage());
// Notify admin
notifyAdmin("Email job failed for {$this->email}");
}
}
The
$tries property defines how many times to retry on failure. The $backoff property sets the delay (in seconds) between retries.Dispatching Jobs
Immediate Dispatch
Queue a job for background processing:Copy
use Lyger\Jobs\Queue;
// Create job
$job = new SendWelcomeEmail('user@example.com', 'John Doe');
// Queue it
$jobId = Queue::getInstance()->push($job);
echo "Job queued with ID: {$jobId}";
Delayed Dispatch
Queue a job to run after a delay:Copy
// Send email in 5 minutes (300 seconds)
$jobId = Queue::getInstance()->later(300, $job);
// Send reminder in 24 hours
$reminderJob = new SendReminderEmail('user@example.com');
Queue::getInstance()->later(86400, $reminderJob);
Using Dispatchable Trait
Make jobs easier to dispatch with theDispatchable trait:
Copy
use Lyger\Jobs\Job;
use Lyger\Jobs\Dispatchable;
class ProcessVideo extends Job
{
use Dispatchable;
public function __construct(
private int $videoId
) {}
public function handle(): void
{
$video = Video::find($this->videoId);
// Process video
$this->transcode($video);
$this->generateThumbnail($video);
$this->extractMetadata($video);
$video->status = 'processed';
$video->save();
}
}
// Dispatch with trait
ProcessVideo::dispatch($videoId);
// Dispatch after delay
ProcessVideo::dispatchAfter(60, $videoId);
Queue Workers
Process queued jobs with a worker:Start Worker
Copy
use Lyger\Jobs\Queue;
$queue = Queue::getInstance();
// Load persisted jobs
$queue->loadJobs();
// Process all jobs (blocks)
$queue->work();
Worker Script
Create a worker script:Copy
// worker.php
<?php
require __DIR__ . '/vendor/autoload.php';
use Lyger\Jobs\Queue;
$queue = Queue::getInstance();
$queue->loadJobs();
echo "Worker started. Processing jobs...\n";
while (true) {
$job = $queue->pop();
if ($job === null) {
sleep(1);
continue;
}
echo "Processing job: {$job['class']}\n";
$success = $queue->process($job);
if ($success) {
echo "Job completed successfully\n";
} else {
echo "Job failed\n";
}
}
Copy
php worker.php
Workers run indefinitely. Use a process manager like Supervisor to ensure they restart if they crash.
Job Properties
Configure job behavior with properties:Copy
class ImportUsers extends Job
{
// Maximum attempts before giving up
protected int $tries = 5;
// Maximum execution time (seconds)
protected int $timeout = 300;
// Delay between retries (seconds)
protected int $backoff = 120;
// Queue name
protected ?string $queue = 'imports';
public function handle(): void
{
// Import logic
}
public function retryUntil(): int
{
// Give up after 1 hour
return time() + 3600;
}
}
Property Reference
| Property | Type | Description | Default |
|---|---|---|---|
$tries | int | Max retry attempts | 3 |
$timeout | int | Max execution time | 60 |
$backoff | int | Delay between retries | 60 |
$queue | string | Queue name | ’default’ |
Multiple Queues
Organize jobs into different queues:Copy
// High priority
class ProcessPayment extends Job
{
protected ?string $queue = 'high';
}
// Normal priority
class SendEmail extends Job
{
protected ?string $queue = 'default';
}
// Low priority
class GenerateReport extends Job
{
protected ?string $queue = 'low';
}
// Process specific queue
$queue = Queue::getInstance();
$queue->work('high'); // Only process high priority jobs
Queue Sizes
Check queue status:Copy
$queue = Queue::getInstance();
// Total jobs
$total = $queue->size();
// Jobs in specific queue
$highPriority = $queue->size('high');
$default = $queue->size('default');
$lowPriority = $queue->size('low');
echo "Total: {$total}, High: {$highPriority}, Default: {$default}, Low: {$lowPriority}";
Job Chaining
Execute jobs in sequence:Copy
use Lyger\Jobs\Dispatcher;
$dispatcher = new Dispatcher();
// Chain jobs
$dispatcher->chain([
new DownloadVideo($url),
new ProcessVideo($videoId),
new GenerateThumbnail($videoId),
new NotifyUser($userId)
])->dispatch();
Chained jobs run sequentially. If one fails, the chain stops.
Task Scheduling
Schedule recurring tasks with theScheduler:
Basic Scheduling
Copy
use Lyger\Jobs\Scheduler;
// Run every minute
Scheduler::everyMinute(function() {
echo "This runs every minute\n";
});
// Run every 5 minutes
Scheduler::everyFiveMinutes(function() {
cleanupTempFiles();
});
// Run hourly
Scheduler::hourly(function() {
generateHourlyReport();
});
// Run daily at midnight
Scheduler::daily(function() {
sendDailyNewsletter();
});
// Run weekly on Sunday
Scheduler::weekly(function() {
generateWeeklyReport();
});
// Run monthly on 1st
Scheduler::monthly(function() {
processMonthlyBilling();
});
// Check and run pending tasks
Scheduler::runPending();
Cron-Style Scheduling
Use cron expressions for precise scheduling:Copy
// Custom schedule (cron format: minute hour day month weekday)
Scheduler::schedule('30 9 * * *', function() {
// Runs at 9:30 AM every day
sendMorningBriefing();
});
Scheduler::schedule('0 0 * * 0', function() {
// Runs at midnight every Sunday
weeklyMaintenance();
});
Scheduler::schedule('*/15 * * * *', function() {
// Runs every 15 minutes
syncWithThirdParty();
});
Scheduler::schedule('0 9-17 * * 1-5', function() {
// Runs every hour from 9 AM to 5 PM, Monday to Friday
businessHoursTask();
});
Schedule Runner
Run scheduled tasks:Copy
// scheduler.php
<?php
require __DIR__ . '/vendor/autoload.php';
use Lyger\Jobs\Scheduler;
// Register all scheduled tasks
require __DIR__ . '/App/Schedule.php';
// Run pending tasks
while (true) {
Scheduler::runPending();
sleep(60); // Check every minute
}
Real-World Examples
Email Notification System
Copy
class SendNotificationEmail extends Job
{
use Dispatchable;
protected ?string $queue = 'emails';
protected int $tries = 5;
public function __construct(
private int $userId,
private string $subject,
private string $message
) {}
public function handle(): void
{
$user = User::find($this->userId);
if (!$user) {
throw new \Exception("User {$this->userId} not found");
}
// Send email
$sent = mail(
$user->email,
$this->subject,
$this->message,
"From: noreply@example.com\r\n"
);
if (!$sent) {
throw new \Exception("Failed to send email");
}
// Log success
logger()->info("Email sent to {$user->email}");
}
public function failed(\Throwable $exception): void
{
logger()->error("Email failed: " . $exception->getMessage());
// Update notification status
Notification::where('user_id', $this->userId)
->update(['status' => 'failed']);
}
}
Image Processing
Copy
class ProcessUploadedImage extends Job
{
use Dispatchable;
protected ?string $queue = 'images';
protected int $timeout = 300;
public function __construct(
private string $imagePath,
private int $imageId
) {}
public function handle(): void
{
$image = Image::find($this->imageId);
// Generate thumbnails
$this->generateThumbnail($this->imagePath, 150, 150);
$this->generateThumbnail($this->imagePath, 400, 400);
$this->generateThumbnail($this->imagePath, 800, 800);
// Optimize original
$this->optimizeImage($this->imagePath);
// Extract metadata
$metadata = $this->extractMetadata($this->imagePath);
// Update database
$image->status = 'processed';
$image->metadata = json_encode($metadata);
$image->save();
}
private function generateThumbnail(string $path, int $width, int $height): void
{
// Thumbnail generation logic
}
private function optimizeImage(string $path): void
{
// Image optimization logic
}
private function extractMetadata(string $path): array
{
// Metadata extraction logic
return [];
}
}
// Usage
$imagePath = '/uploads/photo.jpg';
$imageId = 123;
ProcessUploadedImage::dispatch($imagePath, $imageId);
Report Generation
Copy
class GenerateMonthlyReport extends Job
{
use Dispatchable;
protected ?string $queue = 'reports';
protected int $timeout = 600; // 10 minutes
protected int $tries = 3;
public function __construct(
private int $userId,
private int $month,
private int $year
) {}
public function handle(): void
{
$user = User::find($this->userId);
// Gather data
$orders = Order::where('user_id', $this->userId)
->whereMonth('created_at', $this->month)
->whereYear('created_at', $this->year)
->get();
$totalRevenue = $orders->sum('total');
$totalOrders = $orders->count();
// Generate PDF
$pdf = $this->generatePDF([
'user' => $user,
'orders' => $orders,
'total_revenue' => $totalRevenue,
'total_orders' => $totalOrders,
'month' => $this->month,
'year' => $this->year
]);
// Save report
$filename = "report_{$this->userId}_{$this->year}_{$this->month}.pdf";
file_put_contents("/reports/{$filename}", $pdf);
// Notify user
SendNotificationEmail::dispatch(
$this->userId,
'Your Monthly Report is Ready',
"Your report for {$this->month}/{$this->year} is ready to download."
);
}
private function generatePDF(array $data): string
{
// PDF generation logic
return '';
}
}
// Schedule monthly
Scheduler::monthly(function() {
$users = User::where('subscription', 'premium')->get();
foreach ($users as $user) {
GenerateMonthlyReport::dispatch(
$user->id,
(int)date('m'),
(int)date('Y')
);
}
});
Failed Jobs
Handle and inspect failed jobs:Copy
$queue = Queue::getInstance();
// Get failed jobs
$failed = $queue->getFailed();
foreach ($failed as $failure) {
echo "Job: {$failure['job']['class']}\n";
echo "Error: {$failure['exception']}\n";
echo "Failed at: " . date('Y-m-d H:i:s', $failure['failed_at']) . "\n";
echo "---\n";
}
// Retry failed job
$jobData = $failed[0]['job'];
$queue->process($jobData);
Supervisor Configuration
Keep workers running in production:Copy
[program:lyger-worker-default]
command=php /var/www/worker.php default
autostart=true
autorestart=true
user=www-data
numprocs=2
stdout_logfile=/var/log/lyger-worker.log
stderr_logfile=/var/log/lyger-worker-error.log
[program:lyger-worker-high]
command=php /var/www/worker.php high
autostart=true
autorestart=true
user=www-data
numprocs=4
stdout_logfile=/var/log/lyger-worker-high.log
[program:lyger-scheduler]
command=php /var/www/scheduler.php
autostart=true
autorestart=true
user=www-data
stdout_logfile=/var/log/lyger-scheduler.log
Performance Tips
Job Batching
Job Batching
Process multiple items in one job:
Copy
class ProcessBatchEmails extends Job
{
public function __construct(private array $userIds) {}
public function handle(): void
{
// Process all emails in one job
foreach ($this->userIds as $userId) {
$this->sendEmail($userId);
}
}
}
// Batch 100 users per job
$users = User::all();
$batches = array_chunk($users, 100);
foreach ($batches as $batch) {
$userIds = array_column($batch, 'id');
ProcessBatchEmails::dispatch($userIds);
}
Queue Priorities
Queue Priorities
Use separate workers for priority queues:
Copy
# 4 workers for high priority
php worker.php high &
php worker.php high &
php worker.php high &
php worker.php high &
# 2 workers for default
php worker.php default &
php worker.php default &
# 1 worker for low priority
php worker.php low &
Memory Management
Memory Management
Prevent memory leaks by limiting jobs per worker:
Copy
$queue = Queue::getInstance();
$queue->loadJobs();
$processed = 0;
$maxJobs = 1000;
while ($processed < $maxJobs) {
$job = $queue->pop();
if ($job === null) {
sleep(1);
continue;
}
$queue->process($job);
$processed++;
}
// Worker exits and supervisor restarts it
exit(0);
Next Steps
Cache
High-performance caching
Events
Event system and broadcasting