Message Handler
This guide provides detailed information about the TranslationJobMessageHandler that processes translation jobs asynchronously via Symfony Messenger.
Handler Overview
The TranslationJobMessageHandler is the core component responsible for processing translation job items in the background. It implements Symfony Messenger's message handler interface and provides robust error handling and retry logic.
Handler Registration
Service Definition
The handler is not explicitly registered in services.yml. It is auto-discovered via the #[AsMessageHandler] PHP attribute combined with the module's _defaults block:
# tmgmt_laratranslate.services.yml
services:
_defaults:
autowire: true
autoconfigure: true
# The handler (Drupal\tmgmt_laratranslate\Messenger\TranslationJobMessageHandler)
# is auto-discovered via #[AsMessageHandler] — no explicit entry needed.
Handler Annotation
#[AsMessageHandler]
final class TranslationJobMessageHandler {
// Handler implementation
}
The #[AsMessageHandler] attribute automatically registers this class as a message handler for the TranslationJobMessage type.
Handler Dependencies
Required Services
- EntityTypeManager: For loading TMGMT job items
- Logger: For detailed logging and debugging
Constructor
public function __construct(
EntityTypeManagerInterface $entityTypeManager,
#[Autowire(service: 'logger.channel.tmgmt_laratranslate')]
private readonly LoggerInterface $logger,
) {
$this->jobItemStorage = $entityTypeManager->getStorage('tmgmt_job_item');
}
Processing Flow
Main Handler Method
public function __invoke(TranslationJobMessage $message): void {
$jobItemId = $message->getJobItemId();
try {
$jobItem = $this->loadAndValidateJobItem($jobItemId);
// Early exit if job item is no longer active
if (!$jobItem->isActive()) {
$this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
'@id' => $jobItemId,
'@state' => $jobItem->getState(),
]);
return;
}
$plugin = $this->getTranslatorPlugin($jobItem);
$this->processTranslation($jobItem, $plugin);
}
catch (UnrecoverableMessageHandlingException $e) {
// Rethrow unrecoverable exceptions without modification
throw $e;
}
catch (\Throwable $e) {
$this->handleProcessingError($e, $jobItemId);
}
}
Step-by-Step Processing
1. Message Extraction
$jobItemId = $message->getJobItemId();
Extracts the job item ID from the message.
2. Job Item Loading and Validation
private function loadAndValidateJobItem(int $jobItemId): JobItemInterface {
$jobItem = $this->jobItemStorage->load($jobItemId);
if (!$jobItem instanceof JobItemInterface) {
$this->logger->error('Job item @id not found.', ['@id' => $jobItemId]);
throw new UnrecoverableMessageHandlingException(
sprintf('Job item %d not found', $jobItemId)
);
}
return $jobItem;
}
3. State Validation
if (!$jobItem->isActive()) {
$this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
'@id' => $jobItemId,
'@state' => $jobItem->getState(),
]);
return;
}
4. Plugin Retrieval
private function getTranslatorPlugin(JobItemInterface $jobItem): LaraTranslatorInterface {
$job = $jobItem->getJob();
if (!$job) {
throw new UnrecoverableMessageHandlingException(
sprintf('Job for item %d not found', $jobItem->id())
);
}
$translator = $job->getTranslator();
if (!$translator) {
throw new UnrecoverableMessageHandlingException(
sprintf('Translator for job item %d not found', $jobItem->id())
);
}
$plugin = $translator->getPlugin();
if (!$plugin instanceof LaraTranslatorInterface) {
throw new UnrecoverableMessageHandlingException(
sprintf('Invalid translator plugin for job item %d', $jobItem->id())
);
}
return $plugin;
}
5. Translation Processing
private function processTranslation(JobItemInterface $jobItem, LaraTranslatorInterface $plugin): void {
$this->logger->info('Processing job item @id via Symfony Messenger.', [
'@id' => $jobItem->id(),
]);
try {
$plugin->processJobItem($jobItem);
}
catch (TMGMTException | LaraException $e) {
throw new TMGMTException($e->getMessage(), [], $e->getCode());
}
$this->logger->info('Successfully processed job item @id.', [
'@id' => $jobItem->id(),
]);
}
Error Handling Strategy
Error Classification
The handler distinguishes between different types of errors:
Transient Errors (Auto-Retry)
- Quota exceeded: API limit reached, will retry later
- Network issues: Connection problems, timeouts
- HTTP 429: Rate limiting
- HTTP 5xx: Server-side issues
Permanent Errors (No Retry)
- Invalid job item: Job item doesn't exist
- Wrong plugin: Not a Lara translator instance
- Authentication failures: Invalid API credentials
- Unsupported languages: Language mapping issues
Error Detection Logic
private function isTransientError(\Throwable $exception): bool {
$message = strtolower($exception->getMessage());
$code = $exception->getCode();
// HTTP status codes indicating transient errors
$http_code_errors = [
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
];
if (in_array($code, $http_code_errors, true)) {
return TRUE;
}
// Quota exceeded errors
if (str_contains($message, 'quota') && str_contains($message, 'exceeded')) {
return TRUE;
}
// Network-related errors
$networkKeywords = ['timeout', 'connection', 'network', 'timed out', 'unreachable'];
foreach ($networkKeywords as $keyword) {
if (str_contains($message, $keyword)) {
return TRUE;
}
}
return FALSE;
}
Error Processing
private function handleProcessingError(\Throwable $exception, int $jobItemId): void {
if ($this->isTransientError($exception)) {
$this->logger->warning('Transient error processing job item @id: @error. Will retry.', [
'@id' => $jobItemId,
'@error' => $exception->getMessage(),
]);
// Re-throw to trigger Messenger retry mechanism
throw $exception;
}
// Permanent error - log and mark as unrecoverable
$this->logger->error('Permanent error processing job item @id: @error', [
'@id' => $jobItemId,
'@error' => $exception->getMessage(),
]);
$this->markJobItemAsFailed($jobItemId, $exception);
throw new UnrecoverableMessageHandlingException(
sprintf('Permanent error processing job item %d: %s', $jobItemId, $exception->getMessage()),
0,
$exception
);
}
Logging Strategy
Log Levels Used
- INFO: Normal processing progress and state changes
- WARNING: Transient errors and retry attempts
- ERROR: Permanent errors and unrecoverable failures
Key Log Messages
// Processing start
$this->logger->info('Processing job item @id via Symfony Messenger.', ['@id' => $jobItem->id()]);
// Successful completion
$this->logger->info('Successfully processed job item @id.', ['@id' => $jobItem->id()]);
// Transient error (will retry)
$this->logger->warning('Transient error processing job item @id: @error. Will retry.', [
'@id' => $jobItemId,
'@error' => $exception->getMessage(),
]);
// Permanent error
$this->logger->error('Permanent error processing job item @id: @error', [
'@id' => $jobItemId,
'@error' => $exception->getMessage(),
]);
// Job item not active
$this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
'@id' => $jobItemId,
'@state' => $jobItem->getState(),
]);
Performance Considerations
Memory Management
- Lazy loading: Job items loaded only when needed
- Minimal state: Handler maintains no persistent state
- Exception cleanup: Proper exception handling prevents memory leaks
Processing Efficiency
- Early validation: Quick checks before expensive operations
- State verification: Skip processing for inactive items
- Plugin caching: Reuse translator plugin instances
Scalability Features
- Stateless design: Multiple handler instances possible
- Concurrent processing: Multiple job items processed simultaneously
- Resource isolation: Translation processing separate from web requests
Testing the Handler
Unit Testing Example
public function testHandlerProcessesValidJobItem() {
// Mock dependencies
$entityTypeManager = $this->createMock(EntityTypeManagerInterface::class);
$logger = $this->createMock(LoggerInterface::class);
// Create handler
$handler = new TranslationJobMessageHandler($entityTypeManager, $logger);
// Create test message
$message = new TranslationJobMessage(123);
// Mock job item
$jobItem = $this->createMock(JobItemInterface::class);
$jobItem->method('isActive')->willReturn(true);
// Test processing
$handler($message);
// Verify expectations
$this->assertTrue($jobItem->isActive());
}
Integration Testing
public function testEndToEndProcessing() {
// Create test job item
$jobItem = $this->createTestJobItem();
// Dispatch message
$message = new TranslationJobMessage($jobItem->id());
$this->messageBus->dispatch($message);
// Process messages
$this->processMessengerMessages();
// Verify results
$this->assertJobItemCompleted($jobItem);
}
Debugging and Troubleshooting
Common Handler Issues
Job Item Not Found
Symptoms: "Job item @id not found" error
Debugging:
# Check if job item exists
drush tmgmt-job-item:list --id=123
# Verify job item state
drush tmgmt-job-item:show 123
Plugin Not Found
Symptoms: "Invalid translator plugin" error
Debugging:
# Check translator configuration
drush config:get tmgmt.translator.lara_translate
# Verify plugin class
drush tmgmt:translator-info lara_translate
State Validation Issues
Symptoms: Job item skipped due to state
Debugging:
# Check job item state
drush tmgmt-job-item:list --status=active
# Verify job state
drush tmgmt-job:show --id=456
Handler Performance Issues
Slow Processing
Debugging:
# Check message queue depth
drush messenger:info
# Monitor processing time
drush messenger:consume async --limit=1 --verbose
Memory Issues
Debugging:
# Monitor memory usage
drush status --format=json | jq '.["php-memory"]'
# Check for memory leaks
drush messenger:consume async --limit=10 --memory-limit=256M
Configuration Options
Retry Configuration
# messenger.settings.yml
transports:
async:
retry_strategy: 'exponential'
max_retries: 5
delay: 30
multiplier: 2
Logging Configuration
// settings.php
$settings['tmgmt_laratranslate']['log_level'] = 'debug';
$settings['tmgmt_laratranslate']['log_transients'] = true;
Best Practices
Handler Design
- Stateless: Don't maintain state between invocations
- Idempotent: Multiple calls should produce same result
- Exception handling: Proper exception classification
- Logging: Comprehensive logging for debugging
Error Handling
- Clear classification: Distinguish transient from permanent errors
- User-friendly messages: Provide actionable error information
- Retry logic: Let Messenger handle retries when appropriate
- Cleanup: Proper resource cleanup in error conditions
Performance
- Early validation: Quick checks before expensive operations
- Minimal dependencies: Only load what's needed
- Efficient queries: Use entity storage efficiently
- Memory awareness: Avoid memory leaks
Related Documentation
- Architecture Overview - Complete system architecture
- Configuration Guide - Setup and configuration
- Troubleshooting Guide - Common issues and solutions
- Operations Guide - Production deployment