identityService = $identityService; $this->itemStatusInterfaceFactory = $itemStatusInterfaceFactory; $this->asyncResponseFactory = $asyncResponseFactory; $this->bulkManagement = $bulkManagement; $this->logger = $logger; $this->operationRepository = $operationRepository; $this->userContext = $userContext ?: ObjectManager::getInstance()->get(UserContextInterface::class); } /** * Schedule new bulk operation based on the list of entities * * @param string $topicName * @param array $entitiesArray * @param string $groupId * @param string $userId * @return AsyncResponseInterface * @throws BulkException * @throws LocalizedException */ public function publishMass($topicName, array $entitiesArray, $groupId = null, $userId = null) { $bulkDescription = __('Topic %1', $topicName); if ($userId == null) { $userId = $this->userContext->getUserId(); } if ($groupId == null) { $groupId = $this->identityService->generateId(); /** create new bulk without operations */ if (!$this->bulkManagement->scheduleBulk($groupId, [], $bulkDescription, $userId)) { throw new LocalizedException( __('Something went wrong while processing the request.') ); } } $operations = []; $requestItems = []; $bulkException = new BulkException(); foreach ($entitiesArray as $key => $entityParams) { /** @var \Magento\AsynchronousOperations\Api\Data\ItemStatusInterface $requestItem */ $requestItem = $this->itemStatusInterfaceFactory->create(); try { $operations[] = $this->operationRepository->createByTopic($topicName, $entityParams, $groupId); $requestItem->setId($key); $requestItem->setStatus(ItemStatusInterface::STATUS_ACCEPTED); $requestItems[] = $requestItem; } catch (\Exception $exception) { $this->logger->error($exception); $requestItem->setId($key); $requestItem->setStatus(ItemStatusInterface::STATUS_REJECTED); $requestItem->setErrorMessage($exception); $requestItem->setErrorCode($exception); $requestItems[] = $requestItem; $bulkException->addException(new LocalizedException( __('Error processing %key element of input data', ['key' => $key]), $exception )); } } if (!$this->bulkManagement->scheduleBulk($groupId, $operations, $bulkDescription, $userId)) { throw new LocalizedException( __('Something went wrong while processing the request.') ); } /** @var AsyncResponseInterface $asyncResponse */ $asyncResponse = $this->asyncResponseFactory->create(); $asyncResponse->setBulkUuid($groupId); $asyncResponse->setRequestItems($requestItems); if ($bulkException->wasErrorAdded()) { $asyncResponse->setErrors(true); $bulkException->addData($asyncResponse); throw $bulkException; } else { $asyncResponse->setErrors(false); } return $asyncResponse; } }