phpExecutableFinder = $phpExecutableFinder; $this->consumerConfig = $consumerConfig; $this->deploymentConfig = $deploymentConfig; $this->shellBackground = $shellBackground; $this->pidConsumerManager = $pidConsumerManager; $this->mqConnectionTypeResolver = $mqConnectionTypeResolver ?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class); $this->logger = $logger ?: ObjectManager::getInstance()->get(LoggerInterface::class); } /** * Runs consumers processes */ public function run() { $runByCron = $this->deploymentConfig->get('cron_consumers_runner/cron_run', true); if (!$runByCron) { return; } $maxMessages = (int) $this->deploymentConfig->get('cron_consumers_runner/max_messages', 10000); $allowedConsumers = $this->deploymentConfig->get('cron_consumers_runner/consumers', []); $php = $this->phpExecutableFinder->find() ?: 'php'; foreach ($this->consumerConfig->getConsumers() as $consumer) { if (!$this->canBeRun($consumer, $allowedConsumers)) { continue; } $consumerName = $consumer->getName(); $arguments = [ $consumerName, '--pid-file-path=' . $this->getPidFilePath($consumerName), ]; if ($maxMessages) { $arguments[] = '--max-messages=' . $maxMessages; } $command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s' . ($maxMessages ? ' %s' : ''); $this->shellBackground->execute($command, $arguments); } } /** * Checks that the consumer can be run * * @param ConsumerConfigItemInterface $consumerConfig The consumer config * @param array $allowedConsumers The list of allowed consumers * If $allowedConsumers is empty it means that all consumers are allowed * @return bool Returns true if the consumer can be run * @throws \Magento\Framework\Exception\FileSystemException */ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $allowedConsumers = []): bool { $consumerName = $consumerConfig->getName(); if (!empty($allowedConsumers) && !in_array($consumerName, $allowedConsumers)) { return false; } if ($this->pidConsumerManager->isRun($this->getPidFilePath($consumerName))) { return false; } $connectionName = $consumerConfig->getConnection(); try { $this->mqConnectionTypeResolver->getConnectionType($connectionName); } catch (\LogicException $e) { $this->logger->info(sprintf( 'Consumer "%s" skipped as required connection "%s" is not configured. %s', $consumerName, $connectionName, $e->getMessage() )); return false; } return true; } /** * Returns default path to file with PID by consumers name * * @param string $consumerName The consumers name * @return string The path to file with PID */ private function getPidFilePath($consumerName) { $sanitizedHostname = preg_replace('/[^a-z0-9]/i', '', gethostname()); return $consumerName . '-' . $sanitizedHostname . static::PID_FILE_EXT; } }