<?php
/**
 * Copyright © Magento, Inc. All rights reserved.
 * See COPYING.txt for license details.
 */
namespace Magento\MysqlMq\Model;

use Magento\Framework\MessageQueue\PublisherInterface;

/**
 * Test for MySQL publisher class.
 *
 * @magentoDbIsolation disabled
 */
class PublisherConsumerTest extends \PHPUnit\Framework\TestCase
{
    const MAX_NUMBER_OF_TRIALS = 3;

    /**
     * @var \Magento\Framework\MessageQueue\PublisherInterface
     */
    protected $publisher;

    /**
     * @var \Magento\Framework\ObjectManagerInterface
     */
    protected $objectManager;

    protected function setUp()
    {
        $this->markTestIncomplete('Should be converted to queue config v2.');
        $this->objectManager = \Magento\TestFramework\Helper\Bootstrap::getObjectManager();

        $configPath = __DIR__ . '/../etc/queue.xml';
        $fileResolverMock = $this->createMock(\Magento\Framework\Config\FileResolverInterface::class);
        $fileResolverMock->expects($this->any())
            ->method('get')
            ->willReturn([$configPath => file_get_contents(($configPath))]);

        /** @var \Magento\Framework\MessageQueue\Config\Reader\Xml $xmlReader */
        $xmlReader = $this->objectManager->create(
            \Magento\Framework\MessageQueue\Config\Reader\Xml::class,
            ['fileResolver' => $fileResolverMock]
        );

        $newData = $xmlReader->read();

        /** @var \Magento\Framework\MessageQueue\Config\Data $configData */
        $configData = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
        $configData->reset();
        $configData->merge($newData);

        $this->publisher = $this->objectManager->create(\Magento\Framework\MessageQueue\PublisherInterface::class);
    }

    protected function tearDown()
    {
        $this->markTestIncomplete('Should be converted to queue config v2.');
        $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX);
        $this->consumeMessages('demoConsumerQueueTwo', PHP_INT_MAX);
        $this->consumeMessages('demoConsumerQueueThree', PHP_INT_MAX);
        $this->consumeMessages('demoConsumerQueueFour', PHP_INT_MAX);
        $this->consumeMessages('demoConsumerQueueFive', PHP_INT_MAX);
        $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX);

        $objectManagerConfiguration = [\Magento\Framework\MessageQueue\Config\Reader\Xml::class => [
                'arguments' => [
                    'fileResolver' => ['instance' => \Magento\Framework\Config\FileResolverInterface::class],
                ],
            ],
        ];
        $this->objectManager->configure($objectManagerConfiguration);
        /** @var \Magento\Framework\MessageQueue\Config\Data $queueConfig */
        $queueConfig = $this->objectManager->get(\Magento\Framework\MessageQueue\Config\Data::class);
        $queueConfig->reset();
    }

    /**
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
     */
    public function testPublishConsumeFlow()
    {
        /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
        $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
        /** @var \Magento\MysqlMq\Model\DataObject $object */
        $object = $objectFactory->create();
        for ($i = 0; $i < 10; $i++) {
            $object->setName('Object name ' . $i)->setEntityId($i);
            $this->publisher->publish('demo.object.created', $object);
        }
        for ($i = 0; $i < 5; $i++) {
            $object->setName('Object name ' . $i)->setEntityId($i);
            $this->publisher->publish('demo.object.updated', $object);
        }
        for ($i = 0; $i < 3; $i++) {
            $object->setName('Object name ' . $i)->setEntityId($i);
            $this->publisher->publish('demo.object.custom.created', $object);
        }

        $outputPattern = '/(Processed \d+\s)/';
        /** There are total of 10 messages in the first queue, total expected consumption is 7, 3 then 0 */
        $this->consumeMessages('demoConsumerQueueOne', 7, 7, $outputPattern);
        /** Consumer all messages which left in this queue */
        $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 3, $outputPattern);
        $this->consumeMessages('demoConsumerQueueOne', 7, 0, $outputPattern);

        /** Verify that messages were added correctly to second queue for update and create topics */
        $this->consumeMessages('demoConsumerQueueTwo', 20, 15, $outputPattern);

        /** Verify that messages were NOT added to fourth queue */
        $this->consumeMessages('demoConsumerQueueFour', 11, 0, $outputPattern);

        /** Verify that messages were added correctly by '*' pattern in bind config to third queue */
        $this->consumeMessages('demoConsumerQueueThree', 20, 15, $outputPattern);

        /** Verify that messages were added correctly by '#' pattern in bind config to fifth queue */
        $this->consumeMessages('demoConsumerQueueFive', 20, 18, $outputPattern);
    }

    /**
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
     */
    public function testPublishAndConsumeWithFailedJobs()
    {
        /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
        $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
        /** @var \Magento\MysqlMq\Model\DataObject $object */
        /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
        $object = $objectFactory->create();
        for ($i = 0; $i < 5; $i++) {
            $object->setName('Object name ' . $i)->setEntityId($i);
            $this->publisher->publish('demo.object.created', $object);
        }
        $outputPattern = '/(Processed \d+\s)/';
        for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS; $i++) {
            $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
        }
        $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);

        /** Try consume messages for MAX_NUMBER_OF_TRIALS+1 and then consumer them without exception */
        for ($i = 0; $i < 5; $i++) {
            $object->setName('Object name ' . $i)->setEntityId($i);
            $this->publisher->publish('demo.object.created', $object);
        }
        /** Try consume messages for MAX_NUMBER_OF_TRIALS and then consumer them without exception */
        for ($i = 0; $i < self::MAX_NUMBER_OF_TRIALS + 1; $i++) {
            $this->consumeMessages('demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
        }
        /** Make sure that messages are not accessible anymore after number of trials is exceeded */
        $this->consumeMessages('demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
    }

    /**
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
     */
    public function testPublishAndConsumeSchemaDefinedByMethod()
    {
        /** @var \Magento\MysqlMq\Model\DataObjectFactory $objectFactory */
        $objectFactory = $this->objectManager->create(\Magento\MysqlMq\Model\DataObjectFactory::class);
        /** @var \Magento\MysqlMq\Model\DataObject $object */
        $object = $objectFactory->create();
        $id = 33;
        $object->setName('Object name ' . $id)->setEntityId($id);
        $requiredStringParam = 'Required value';
        $optionalIntParam = 44;
        $this->publisher->publish('test.schema.defined.by.method', [$object, $requiredStringParam, $optionalIntParam]);
        $outputPattern = "/Processed '{$object->getEntityId()}'; "
            . "Required param '{$requiredStringParam}'; Optional param '{$optionalIntParam}'/";
        $this->consumeMessages('delayedOperationConsumer', PHP_INT_MAX, 1, $outputPattern);
    }

    /**
     * Make sure that consumers consume correct number of messages.
     *
     * @param string $consumerName
     * @param int|null $messagesToProcess
     * @param int|null $expectedNumberOfProcessedMessages
     * @param string|null $outputPattern
     */
    protected function consumeMessages(
        $consumerName,
        $messagesToProcess,
        $expectedNumberOfProcessedMessages = null,
        $outputPattern = null
    ) {
        /** @var \Magento\Framework\MessageQueue\ConsumerFactory $consumerFactory */
        $consumerFactory = $this->objectManager->create(\Magento\Framework\MessageQueue\ConsumerFactory::class);
        $consumer = $consumerFactory->get($consumerName);
        ob_start();
        $consumer->process($messagesToProcess);
        $consumersOutput = ob_get_contents();
        ob_end_clean();
        if ($outputPattern) {
            $this->assertEquals(
                $expectedNumberOfProcessedMessages,
                preg_match_all($outputPattern, $consumersOutput)
            );
        }
    }
}