invoker = $invoker; $this->resource = $resource; $this->messageController = $messageController; $this->configuration = $configuration; $this->operationProcessor = $operationProcessorFactory->create([ 'configuration' => $configuration ]); $this->logger = $logger; } /** * {@inheritdoc} */ public function process($maxNumberOfMessages = null) { $queue = $this->configuration->getQueue(); if (!isset($maxNumberOfMessages)) { $queue->subscribe($this->getTransactionCallback($queue)); } else { $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue)); } } /** * Get transaction callback. This handles the case of async. * * @param QueueInterface $queue * @return \Closure */ private function getTransactionCallback(QueueInterface $queue) { return function (EnvelopeInterface $message) use ($queue) { /** @var LockInterface $lock */ $lock = null; try { $topicName = $message->getProperties()['topic_name']; $lock = $this->messageController->lock($message, $this->configuration->getConsumerName()); $allowedTopics = $this->configuration->getTopicNames(); if (in_array($topicName, $allowedTopics)) { $this->operationProcessor->process($message->getBody()); } else { $queue->reject($message); return; } $queue->acknowledge($message); } catch (MessageLockException $exception) { $queue->acknowledge($message); } catch (ConnectionLostException $e) { if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } catch (NotFoundException $e) { $queue->acknowledge($message); $this->logger->warning($e->getMessage()); } catch (\Exception $e) { $queue->reject($message, false, $e->getMessage()); if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } }; } }