1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<?php
/**
* Refer to LICENSE.txt distributed with the Temando Shipping module for notice of license
*/
namespace Temando\Shipping\Sync;
use Magento\Framework\Exception\CouldNotDeleteException;
use Magento\Framework\Exception\LocalizedException;
use Psr\Log\LoggerInterface;
use Temando\Shipping\Model\Config\ModuleConfigInterface;
use Temando\Shipping\Model\ResourceModel\EventStream\EventRepositoryInterface;
use Temando\Shipping\Model\StreamEventInterface;
/**
* Temando Event Stream Processor
*
* @package Temando\Shipping\Sync
* @author Benjamin Heuer <benjamin.heuer@netresearch.de>
* @license http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0)
* @link http://www.temando.com/
*/
class EventStreamProcessor
{
/**
* @var ModuleConfigInterface
*/
private $config;
/**
* @var EventListProcessor
*/
private $eventListProcessor;
/**
* @var EventRepositoryInterface
*/
private $streamEventRepository;
/**
* @var EventFilter
*/
private $streamEventFilter;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @param ModuleConfigInterface $config
* @param EventListProcessor $eventListProcessor
* @param EventRepositoryInterface $streamEventRepository
* @param EventFilter $streamEventFilter
* @param LoggerInterface $logger
*/
public function __construct(
ModuleConfigInterface $config,
EventListProcessor $eventListProcessor,
EventRepositoryInterface $streamEventRepository,
EventFilter $streamEventFilter,
LoggerInterface $logger
) {
$this->config = $config;
$this->eventListProcessor = $eventListProcessor;
$this->streamEventRepository = $streamEventRepository;
$this->streamEventFilter = $streamEventFilter;
$this->logger = $logger;
}
/**
* @param int $iterations
* @return void
*/
public function processEvents($iterations = 10)
{
$streamId = $this->config->getStreamId();
$eventList = EventList::fromArray([]);
do {
$iterations--;
try {
// obtain next events, pass on to list processor
$streamEvents = $this->streamEventRepository->getEventList($streamId);
$processableEvents = $this->streamEventFilter->filter($streamEvents);
$this->eventListProcessor->processEventList($processableEvents, $eventList);
} catch (LocalizedException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
continue;
}
$deleteStreamEvent = function (StreamEventInterface $streamEvent) use ($streamId, $eventList) {
if (!$eventList->hasEvent($streamEvent->getEventId())) {
$this->streamEventRepository->delete($streamId, $streamEvent->getEventId());
}
};
try {
// delete events that were processed successfully (removed from event list)
array_walk($streamEvents, $deleteStreamEvent);
} catch (CouldNotDeleteException $e) {
$this->logger->error($e->getMessage(), ['exception' => $e]);
}
} while ($iterations > 0 && !empty($streamEvents));
}
}