1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
*/
namespace Magento\Framework\Amqp\Bulk;
use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Magento\Framework\Communication\ConfigInterface as CommunicationConfigInterface;
use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
/**
* Used to send messages in bulk in AMQP queue.
*/
class Exchange implements ExchangeInterface
{
/**
* @var \Magento\Framework\Amqp\Config
*/
private $amqpConfig;
/**
* @var CommunicationConfigInterface
*/
private $communicationConfig;
/**
* @var PublisherConfig
*/
private $publisherConfig;
/**
* @var \Magento\Framework\Amqp\Exchange
*/
private $exchange;
/**
* Initialize dependencies.
*
* @param \Magento\Framework\Amqp\Config $amqpConfig
* @param PublisherConfig $publisherConfig
* @param CommunicationConfigInterface $communicationConfig
* @param \Magento\Framework\Amqp\Exchange $exchange
*/
public function __construct(
\Magento\Framework\Amqp\Config $amqpConfig,
PublisherConfig $publisherConfig,
CommunicationConfigInterface $communicationConfig,
\Magento\Framework\Amqp\Exchange $exchange
) {
$this->amqpConfig = $amqpConfig;
$this->communicationConfig = $communicationConfig;
$this->publisherConfig = $publisherConfig;
$this->exchange = $exchange;
}
/**
* @inheritdoc
*/
public function enqueue($topic, array $envelopes)
{
$topicData = $this->communicationConfig->getTopic($topic);
$isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
if ($isSync) {
$responses = [];
foreach ($envelopes as $envelope) {
$responses[] = $this->exchange->enqueue($topic, $envelope);
}
return $responses;
}
$channel = $this->amqpConfig->getChannel();
$publisher = $this->publisherConfig->getPublisher($topic);
$exchange = $publisher->getConnection()->getExchange();
foreach ($envelopes as $envelope) {
$msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties());
$channel->batch_basic_publish($msg, $exchange, $topic);
}
$channel->publish_batch();
return null;
}
}