1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
*/
namespace Magento\Framework\MessageQueue;
use Magento\Framework\Exception\NotFoundException;
use Magento\Framework\Phrase;
class MessageController
{
/**
* @var \Magento\Framework\MessageQueue\LockInterfaceFactory
*/
private $lockFactory;
/**
* @var \Magento\Framework\MessageQueue\Lock\ReaderInterface
*/
private $reader;
/**
* @var \Magento\Framework\MessageQueue\Lock\WriterInterface
*/
private $writer;
/**
* Initialize dependencies.
*
* @param \Magento\Framework\MessageQueue\LockInterfaceFactory $lockFactory
* @param Lock\ReaderInterface $reader
* @param Lock\WriterInterface $writer
*/
public function __construct(
\Magento\Framework\MessageQueue\LockInterfaceFactory $lockFactory,
\Magento\Framework\MessageQueue\Lock\ReaderInterface $reader,
\Magento\Framework\MessageQueue\Lock\WriterInterface $writer
) {
$this->lockFactory = $lockFactory;
$this->reader = $reader;
$this->writer = $writer;
}
/**
* Create lock corresponding to the provided message. Throw MessageLockException if lock is already created.
*
* @param EnvelopeInterface $envelope
* @param string $consumerName
* @return LockInterface
* @throws MessageLockException
* @throws NotFoundException
*/
public function lock(EnvelopeInterface $envelope, $consumerName)
{
$lock = $this->lockFactory->create();
$properties = $envelope->getProperties();
if (empty($properties['message_id'])) {
throw new NotFoundException(new Phrase("Property 'message_id' not found in properties."));
}
$code = $consumerName . '-' . $properties['message_id'];
$code = md5($code);
$this->reader->read($lock, $code);
if ($lock->getId()) {
throw new MessageLockException(new Phrase('The "%1" message code was already processed.', [$code]));
}
$this->writer->saveLock($lock);
return $lock;
}
}