Skip to content

Message Handler

This guide provides detailed information about the TranslationJobMessageHandler that processes translation jobs asynchronously via Symfony Messenger.

Handler Overview

The TranslationJobMessageHandler is the core component responsible for processing translation job items in the background. It implements Symfony Messenger's message handler interface and provides robust error handling and retry logic.

Handler Registration

Service Definition

The handler is not explicitly registered in services.yml. It is auto-discovered via the #[AsMessageHandler] PHP attribute combined with the module's _defaults block:

# tmgmt_laratranslate.services.yml
services:
  _defaults:
    autowire: true
    autoconfigure: true

  # The handler (Drupal\tmgmt_laratranslate\Messenger\TranslationJobMessageHandler)
  # is auto-discovered via #[AsMessageHandler] — no explicit entry needed.

Handler Annotation

#[AsMessageHandler]
final class TranslationJobMessageHandler {
    // Handler implementation
}

The #[AsMessageHandler] attribute automatically registers this class as a message handler for the TranslationJobMessage type.

Handler Dependencies

Required Services

  • EntityTypeManager: For loading TMGMT job items
  • Logger: For detailed logging and debugging

Constructor

public function __construct(
    EntityTypeManagerInterface $entityTypeManager,
    #[Autowire(service: 'logger.channel.tmgmt_laratranslate')]
    private readonly LoggerInterface $logger,
) {
    $this->jobItemStorage = $entityTypeManager->getStorage('tmgmt_job_item');
}

Processing Flow

Main Handler Method

public function __invoke(TranslationJobMessage $message): void {
    $jobItemId = $message->getJobItemId();

    try {
        $jobItem = $this->loadAndValidateJobItem($jobItemId);

        // Early exit if job item is no longer active
        if (!$jobItem->isActive()) {
            $this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
                '@id' => $jobItemId,
                '@state' => $jobItem->getState(),
            ]);
            return;
        }

        $plugin = $this->getTranslatorPlugin($jobItem);
        $this->processTranslation($jobItem, $plugin);
    }
    catch (UnrecoverableMessageHandlingException $e) {
        // Rethrow unrecoverable exceptions without modification
        throw $e;
    }
    catch (\Throwable $e) {
        $this->handleProcessingError($e, $jobItemId);
    }
}

Step-by-Step Processing

1. Message Extraction

$jobItemId = $message->getJobItemId();

Extracts the job item ID from the message.

2. Job Item Loading and Validation

private function loadAndValidateJobItem(int $jobItemId): JobItemInterface {
    $jobItem = $this->jobItemStorage->load($jobItemId);

    if (!$jobItem instanceof JobItemInterface) {
        $this->logger->error('Job item @id not found.', ['@id' => $jobItemId]);
        throw new UnrecoverableMessageHandlingException(
            sprintf('Job item %d not found', $jobItemId)
        );
    }

    return $jobItem;
}

3. State Validation

if (!$jobItem->isActive()) {
    $this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
        '@id' => $jobItemId,
        '@state' => $jobItem->getState(),
    ]);
    return;
}

4. Plugin Retrieval

private function getTranslatorPlugin(JobItemInterface $jobItem): LaraTranslatorInterface {
    $job = $jobItem->getJob();
    if (!$job) {
        throw new UnrecoverableMessageHandlingException(
            sprintf('Job for item %d not found', $jobItem->id())
        );
    }

    $translator = $job->getTranslator();
    if (!$translator) {
        throw new UnrecoverableMessageHandlingException(
            sprintf('Translator for job item %d not found', $jobItem->id())
        );
    }

    $plugin = $translator->getPlugin();
    if (!$plugin instanceof LaraTranslatorInterface) {
        throw new UnrecoverableMessageHandlingException(
            sprintf('Invalid translator plugin for job item %d', $jobItem->id())
        );
    }

    return $plugin;
}

5. Translation Processing

private function processTranslation(JobItemInterface $jobItem, LaraTranslatorInterface $plugin): void {
    $this->logger->info('Processing job item @id via Symfony Messenger.', [
        '@id' => $jobItem->id(),
    ]);

    try {
        $plugin->processJobItem($jobItem);
    }
    catch (TMGMTException | LaraException $e) {
        throw new TMGMTException($e->getMessage(), [], $e->getCode());
    }

    $this->logger->info('Successfully processed job item @id.', [
        '@id' => $jobItem->id(),
    ]);
}

Error Handling Strategy

Error Classification

The handler distinguishes between different types of errors:

Transient Errors (Auto-Retry)

  • Quota exceeded: API limit reached, will retry later
  • Network issues: Connection problems, timeouts
  • HTTP 429: Rate limiting
  • HTTP 5xx: Server-side issues

Permanent Errors (No Retry)

  • Invalid job item: Job item doesn't exist
  • Wrong plugin: Not a Lara translator instance
  • Authentication failures: Invalid API credentials
  • Unsupported languages: Language mapping issues

Error Detection Logic

private function isTransientError(\Throwable $exception): bool {
    $message = strtolower($exception->getMessage());
    $code = $exception->getCode();

    // HTTP status codes indicating transient errors
    $http_code_errors = [
        429, // Too Many Requests
        500, // Internal Server Error
        502, // Bad Gateway
        503, // Service Unavailable
        504, // Gateway Timeout
    ];

    if (in_array($code, $http_code_errors, true)) {
        return TRUE;
    }

    // Quota exceeded errors
    if (str_contains($message, 'quota') && str_contains($message, 'exceeded')) {
        return TRUE;
    }

    // Network-related errors
    $networkKeywords = ['timeout', 'connection', 'network', 'timed out', 'unreachable'];
    foreach ($networkKeywords as $keyword) {
        if (str_contains($message, $keyword)) {
            return TRUE;
        }
    }

    return FALSE;
}

Error Processing

private function handleProcessingError(\Throwable $exception, int $jobItemId): void {
    if ($this->isTransientError($exception)) {
        $this->logger->warning('Transient error processing job item @id: @error. Will retry.', [
            '@id' => $jobItemId,
            '@error' => $exception->getMessage(),
        ]);
        // Re-throw to trigger Messenger retry mechanism
        throw $exception;
    }

    // Permanent error - log and mark as unrecoverable
    $this->logger->error('Permanent error processing job item @id: @error', [
        '@id' => $jobItemId,
        '@error' => $exception->getMessage(),
    ]);

    $this->markJobItemAsFailed($jobItemId, $exception);

    throw new UnrecoverableMessageHandlingException(
        sprintf('Permanent error processing job item %d: %s', $jobItemId, $exception->getMessage()),
        0,
        $exception
    );
}

Logging Strategy

Log Levels Used

  • INFO: Normal processing progress and state changes
  • WARNING: Transient errors and retry attempts
  • ERROR: Permanent errors and unrecoverable failures

Key Log Messages

// Processing start
$this->logger->info('Processing job item @id via Symfony Messenger.', ['@id' => $jobItem->id()]);

// Successful completion
$this->logger->info('Successfully processed job item @id.', ['@id' => $jobItem->id()]);

// Transient error (will retry)
$this->logger->warning('Transient error processing job item @id: @error. Will retry.', [
    '@id' => $jobItemId,
    '@error' => $exception->getMessage(),
]);

// Permanent error
$this->logger->error('Permanent error processing job item @id: @error', [
    '@id' => $jobItemId,
    '@error' => $exception->getMessage(),
]);

// Job item not active
$this->logger->info('Job item @id is no longer active (state: @state). Skipping.', [
    '@id' => $jobItemId,
    '@state' => $jobItem->getState(),
]);

Performance Considerations

Memory Management

  • Lazy loading: Job items loaded only when needed
  • Minimal state: Handler maintains no persistent state
  • Exception cleanup: Proper exception handling prevents memory leaks

Processing Efficiency

  • Early validation: Quick checks before expensive operations
  • State verification: Skip processing for inactive items
  • Plugin caching: Reuse translator plugin instances

Scalability Features

  • Stateless design: Multiple handler instances possible
  • Concurrent processing: Multiple job items processed simultaneously
  • Resource isolation: Translation processing separate from web requests

Testing the Handler

Unit Testing Example

public function testHandlerProcessesValidJobItem() {
    // Mock dependencies
    $entityTypeManager = $this->createMock(EntityTypeManagerInterface::class);
    $logger = $this->createMock(LoggerInterface::class);

    // Create handler
    $handler = new TranslationJobMessageHandler($entityTypeManager, $logger);

    // Create test message
    $message = new TranslationJobMessage(123);

    // Mock job item
    $jobItem = $this->createMock(JobItemInterface::class);
    $jobItem->method('isActive')->willReturn(true);

    // Test processing
    $handler($message);

    // Verify expectations
    $this->assertTrue($jobItem->isActive());
}

Integration Testing

public function testEndToEndProcessing() {
    // Create test job item
    $jobItem = $this->createTestJobItem();

    // Dispatch message
    $message = new TranslationJobMessage($jobItem->id());
    $this->messageBus->dispatch($message);

    // Process messages
    $this->processMessengerMessages();

    // Verify results
    $this->assertJobItemCompleted($jobItem);
}

Debugging and Troubleshooting

Common Handler Issues

Job Item Not Found

Symptoms: "Job item @id not found" error

Debugging:

# Check if job item exists
drush tmgmt-job-item:list --id=123

# Verify job item state
drush tmgmt-job-item:show 123

Plugin Not Found

Symptoms: "Invalid translator plugin" error

Debugging:

# Check translator configuration
drush config:get tmgmt.translator.lara_translate

# Verify plugin class
drush tmgmt:translator-info lara_translate

State Validation Issues

Symptoms: Job item skipped due to state

Debugging:

# Check job item state
drush tmgmt-job-item:list --status=active

# Verify job state
drush tmgmt-job:show --id=456

Handler Performance Issues

Slow Processing

Debugging:

# Check message queue depth
drush messenger:info

# Monitor processing time
drush messenger:consume async --limit=1 --verbose

Memory Issues

Debugging:

# Monitor memory usage
drush status --format=json | jq '.["php-memory"]'

# Check for memory leaks
drush messenger:consume async --limit=10 --memory-limit=256M

Configuration Options

Retry Configuration

# messenger.settings.yml
transports:
  async:
    retry_strategy: 'exponential'
    max_retries: 5
    delay: 30
    multiplier: 2

Logging Configuration

// settings.php
$settings['tmgmt_laratranslate']['log_level'] = 'debug';
$settings['tmgmt_laratranslate']['log_transients'] = true;

Best Practices

Handler Design

  • Stateless: Don't maintain state between invocations
  • Idempotent: Multiple calls should produce same result
  • Exception handling: Proper exception classification
  • Logging: Comprehensive logging for debugging

Error Handling

  • Clear classification: Distinguish transient from permanent errors
  • User-friendly messages: Provide actionable error information
  • Retry logic: Let Messenger handle retries when appropriate
  • Cleanup: Proper resource cleanup in error conditions

Performance

  • Early validation: Quick checks before expensive operations
  • Minimal dependencies: Only load what's needed
  • Efficient queries: Use entity storage efficiently
  • Memory awareness: Avoid memory leaks