PublisherConsumerController.php 6.12 KB
<?php
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */

declare(strict_types=1);

namespace Magento\TestFramework\MessageQueue;

use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Framework\OsInfo;
use Magento\TestFramework\Helper\Amqp;

/**
 * Publisher Consumer Controller
 */
class PublisherConsumerController
{
    /**
     * @var string[]
     */
    private $consumers = [];

    /**
     * @var PublisherInterface
     */
    private $publisher;

    /**
     * @var string
     */
    private $logFilePath;

    /**
     * @var int|null
     */
    private $maxMessages = null;

    /**
     * @var OsInfo
     */
    private $osInfo;

    /**
     * @var array
     */
    private $appInitParams;

    /**
     * @var Amqp
     */
    private $amqpHelper;

    /**
     * PublisherConsumerController constructor.
     * @param PublisherInterface $publisher
     * @param OsInfo $osInfo
     * @param Amqp $amqpHelper
     * @param string $logFilePath
     * @param array $consumers
     * @param array $appInitParams
     * @param null|int $maxMessages
     */
    public function __construct(
        PublisherInterface $publisher,
        OsInfo $osInfo,
        Amqp $amqpHelper,
        $logFilePath,
        $consumers,
        $appInitParams,
        $maxMessages = null
    ) {
        $this->consumers = $consumers;
        $this->publisher = $publisher;
        $this->logFilePath = $logFilePath;
        $this->maxMessages = $maxMessages;
        $this->osInfo = $osInfo;
        $this->appInitParams = $appInitParams;
        $this->amqpHelper = $amqpHelper;
    }

    /**
     * Initialize Environment and Consumers
     *
     * @throws EnvironmentPreconditionException
     * @throws PreconditionFailedException
     */
    public function initialize()
    {
        $this->validateEnvironmentPreconditions();

        $connections = $this->amqpHelper->getConnections();
        foreach (array_keys($connections) as $connectionName) {
            $this->amqpHelper->deleteConnection($connectionName);
        }
        $this->amqpHelper->clearQueue("async.operations.all");
        foreach ($this->consumers as $consumer) {
            foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
                exec("kill {$consumerProcessId}");
            }
        }
        foreach ($this->consumers as $consumer) {
            if (!$this->getConsumerProcessIds($consumer)) {
                exec("{$this->getConsumerStartCommand($consumer, true)} > /dev/null &");
            }
            sleep(5);
        }

        if (file_exists($this->logFilePath)) {
            // try to remove before failing the test
            unlink($this->logFilePath);
            if (file_exists($this->logFilePath)) {
                throw new PreconditionFailedException(
                    "Precondition failed: test log ({$this->logFilePath}) cannot be deleted before test execution."
                );
            }
        }
    }

    /**
     * Validate environment preconditions
     *
     * @throws EnvironmentPreconditionException
     * @throws PreconditionFailedException
     */
    private function validateEnvironmentPreconditions()
    {
        if ($this->osInfo->isWindows()) {
            throw new EnvironmentPreconditionException(
                "This test relies on *nix shell and should be skipped in Windows environment."
            );
        }

        if (!$this->amqpHelper->isAvailable()) {
            throw new PreconditionFailedException(
                'This test relies on RabbitMQ Management Plugin.'
            );
        }
    }

    /**
     * Stop Consumers
     */
    public function stopConsumers()
    {
        foreach ($this->consumers as $consumer) {
            foreach ($this->getConsumerProcessIds($consumer) as $consumerProcessId) {
                exec("kill {$consumerProcessId}");
            }
        }
    }

    /**
     * Get Consumers ProcessIds
     *
     * @return array
     */
    public function getConsumersProcessIds()
    {
        $consumers = [];
        foreach ($this->consumers as $consumer) {
            $consumers[$consumer] = $this->getConsumerProcessIds($consumer);
        }
        return $consumers;
    }

    /**
     * Get Consumer ProcessIds
     *
     * @param string $consumer
     * @return string[]
     */
    private function getConsumerProcessIds($consumer)
    {
        exec("ps ax | grep -v grep | grep '{$this->getConsumerStartCommand($consumer)}' | awk '{print $1}'", $output);
        return $output;
    }

    /**
     * Get CLI command for starting specified consumer.
     *
     * @param string $consumer
     * @param bool $withEnvVariables
     * @return string
     */
    private function getConsumerStartCommand($consumer, $withEnvVariables = false)
    {
        $binDirectory = realpath(INTEGRATION_TESTS_DIR . '/bin/');
        $magentoCli = $binDirectory . '/magento';
        $consumerStartCommand = "php {$magentoCli} queue:consumers:start -vvv " . $consumer;
        if ($this->maxMessages) {
            $consumerStartCommand .= " --max-messages={$this->maxMessages}";
        }
        if ($withEnvVariables) {
            $params = $this->appInitParams;
            $params['MAGE_DIRS']['base']['path'] = BP;
            $params = 'INTEGRATION_TEST_PARAMS="' . urldecode(http_build_query($params)) . '"';
            $consumerStartCommand = $params . ' ' . $consumerStartCommand;
        }
        return $consumerStartCommand;
    }

    /**
     * Wait for asynchronous result
     *
     * @param callable $condition
     * @param array $params
     * @throws PreconditionFailedException
     */
    public function waitForAsynchronousResult(callable $condition, $params)
    {
        $i = 0;
        do {
            sleep(1);
            $assertion = call_user_func_array($condition, $params);
        } while (!$assertion && ($i++ < 180));

        if (!$assertion) {
            throw new PreconditionFailedException("No asynchronous messages were processed.");
        }
    }

    /**
     * Get publisher
     *
     * @return PublisherInterface
     */
    public function getPublisher()
    {
        return $this->publisher;
    }
}