messageResource = $messageResource; $this->scopeConfig = $scopeConfig; $this->dateTime = $dateTime; $this->messageStatusCollectionFactory = $messageStatusCollectionFactory; } /** * Add message to all specified queues. * * @param string $topic * @param string $message * @param string[] $queueNames * @return $this */ public function addMessageToQueues($topic, $message, $queueNames) { $messageId = $this->messageResource->saveMessage($topic, $message); $this->messageResource->linkQueues($messageId, $queueNames); return $this; } /** * Add messages to all specified queues. * * @param string $topic * @param array $messages * @param string[] $queueNames * @return $this * @since 100.2.0 */ public function addMessagesToQueues($topic, $messages, $queueNames) { $messageIds = $this->messageResource->saveMessages($topic, $messages); $this->messageResource->linkMessagesWithQueues($messageIds, $queueNames); return $this; } /** * Delete marked messages * * Mark messages to be deleted if sufficient amount of time passed since last update * * @return void */ public function markMessagesForDelete() { $collection = $this->messageStatusCollectionFactory->create() ->addFieldToFilter( 'status', ['in' => $this->getStatusesToClear()] ); /** * Update messages if lifetime is expired */ foreach ($collection as $messageStatus) { $this->processMessagePerStatus($messageStatus); } /** * Delete all messages which has To BE DELETED status in all the queues */ $this->messageResource->deleteMarkedMessages(); } /** * Based on message status, updated date and timeout for the status, move it to next state * * @param MessageStatus $messageStatus * @return void */ private function processMessagePerStatus($messageStatus) { $now = $this->dateTime->gmtTimestamp(); if ($messageStatus->getStatus() == self::MESSAGE_STATUS_COMPLETE && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getCompletedMessageLifetime())) { $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED) ->save(); } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_ERROR && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getErrorMessageLifetime())) { $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED) ->save(); } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_IN_PROGRESS && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getInProgressRetryAfter()) ) { $this->pushToQueueForRetry($messageStatus->getId()); } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_NEW && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getNewMessageLifetime()) ) { $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED) ->save(); } } /** * Compose a set of statuses to track for deletion based on configuration. * * @return array */ private function getStatusesToClear() { /** * Do not mark messages for deletion if configuration has 0 lifetime configured. */ $statusesToDelete = []; if ($this->getCompletedMessageLifetime() > 0) { $statusesToDelete[] = self::MESSAGE_STATUS_COMPLETE; } if ($this->getErrorMessageLifetime() > 0) { $statusesToDelete[] = self::MESSAGE_STATUS_ERROR; } if ($this->getNewMessageLifetime() > 0) { $statusesToDelete[] = self::MESSAGE_STATUS_NEW; } if ($this->getInProgressRetryAfter() > 0) { $statusesToDelete[] = self::MESSAGE_STATUS_IN_PROGRESS; } return $statusesToDelete; } /** * Completed message lifetime * * Indicates how long message in COMPLETE state will stay in table with statuses * * @return int */ private function getCompletedMessageLifetime() { return 60 * (int)$this->scopeConfig->getValue( self::XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME, \Magento\Store\Model\ScopeInterface::SCOPE_STORE ); } /** * Failure message life time * * Indicates how long message in ERROR state will stay in table with statuses * * @return int */ private function getErrorMessageLifetime() { return 60 * (int)$this->scopeConfig->getValue( self::XML_PATH_FAILED_MESSAGES_LIFETIME, \Magento\Store\Model\ScopeInterface::SCOPE_STORE ); } /** * In progress message delay before retry * * Indicates how long message will stay in IN PROGRESS status before attempted to retry * * @return int */ private function getInProgressRetryAfter() { return 60 * (int)$this->scopeConfig->getValue( self::XML_PATH_RETRY_IN_PROGRESS_AFTER, \Magento\Store\Model\ScopeInterface::SCOPE_STORE ); } /** * New message life time * * Indicates how long message in NEW state will stay in table with statuses * * @return int */ private function getNewMessageLifetime() { return 60 * (int)$this->scopeConfig->getValue( self::XML_PATH_NEW_MESSAGES_LIFETIME, \Magento\Store\Model\ScopeInterface::SCOPE_STORE ); } /** * Read the specified number of messages from the specified queue. * * If queue does not contain enough messages, method is not waiting for more messages. * * @param string $queue * @param int|null $maxMessagesNumber * @return array
     * [
     *     [
     *          self::MESSAGE_ID => $messageId,
     *          self::MESSAGE_QUEUE_ID => $queuId,
     *          self::MESSAGE_TOPIC => $topic,
     *          self::MESSAGE_BODY => $body,
     *          self::MESSAGE_STATUS => $status,
     *          self::MESSAGE_UPDATED_AT => $updatedAt,
     *          self::MESSAGE_QUEUE_NAME => $queueName
     *          self::MESSAGE_QUEUE_RELATION_ID => $relationId
     *     ],
     *     ...
     * ]
*/ public function readMessages($queue, $maxMessagesNumber = null) { $selectedMessages = $this->messageResource->getMessages($queue, $maxMessagesNumber); /* The logic below allows to prevent the same message being processed by several consumers in parallel */ $selectedMessagesRelatedIds = []; foreach ($selectedMessages as &$message) { /* Set message status here to avoid extra reading from DB after it is updated */ $message[self::MESSAGE_STATUS] = self::MESSAGE_STATUS_IN_PROGRESS; $selectedMessagesRelatedIds[] = $message[self::MESSAGE_QUEUE_RELATION_ID]; } $takenMessagesRelationIds = $this->messageResource->takeMessagesInProgress($selectedMessagesRelatedIds); if (count($selectedMessages) == count($takenMessagesRelationIds)) { return $selectedMessages; } else { $selectedMessages = array_combine($selectedMessagesRelatedIds, array_values($selectedMessages)); return array_intersect_key($selectedMessages, array_flip($takenMessagesRelationIds)); } } /** * Push message back to queue for one more processing trial. Affects message in particular queue only. * * @param int $messageRelationId * @return void */ public function pushToQueueForRetry($messageRelationId) { $this->messageResource->pushBackForRetry($messageRelationId); } /** * Change status of messages. * * @param int[] $messageRelationIds * @param int $status * @return void */ public function changeStatus($messageRelationIds, $status) { $this->messageResource->changeStatus($messageRelationIds, $status); } }