<?php /** * Copyright © Magento, Inc. All rights reserved. * See COPYING.txt for license details. */ namespace Magento\Deploy\Process; use Magento\Deploy\Package\Package; use Magento\Deploy\Service\DeployPackage; use Magento\Framework\App\ResourceConnection; use Psr\Log\LoggerInterface; use Magento\Framework\App\State as AppState; use Magento\Framework\Locale\ResolverInterface as LocaleResolver; /** * Deployment Queue * * Deploy packages in parallel forks (if available) */ class Queue { /** * Default max amount of processes */ const DEFAULT_MAX_PROCESSES_AMOUNT = 4; /** * Default max execution time */ const DEFAULT_MAX_EXEC_TIME = 400; /** * @var array */ private $packages = []; /** * @var int[] */ private $processIds = []; /** * @var Package[] */ private $inProgress = []; /** * @var int */ private $maxProcesses; /** * @var int */ private $maxExecTime; /** * @var AppState */ private $appState; /** * @var LocaleResolver */ private $localeResolver; /** * @var ResourceConnection */ private $resourceConnection; /** * @var LoggerInterface */ private $logger; /** * @var DeployPackage */ private $deployPackageService; /** * @var array */ private $options = []; /** * @var int */ private $start = 0; /** * @var int */ private $lastJobStarted = 0; /** * @param AppState $appState * @param LocaleResolver $localeResolver * @param ResourceConnection $resourceConnection * @param LoggerInterface $logger * @param DeployPackage $deployPackageService * @param array $options * @param int $maxProcesses * @param int $maxExecTime */ public function __construct( AppState $appState, LocaleResolver $localeResolver, ResourceConnection $resourceConnection, LoggerInterface $logger, DeployPackage $deployPackageService, array $options = [], $maxProcesses = self::DEFAULT_MAX_PROCESSES_AMOUNT, $maxExecTime = self::DEFAULT_MAX_EXEC_TIME ) { $this->appState = $appState; $this->localeResolver = $localeResolver; $this->resourceConnection = $resourceConnection; $this->logger = $logger; $this->deployPackageService = $deployPackageService; $this->options = $options; $this->maxProcesses = $maxProcesses; $this->maxExecTime = $maxExecTime; } /** * @param Package $package * @param Package[] $dependencies * @return bool true on success */ public function add(Package $package, array $dependencies = []) { $this->packages[$package->getPath()] = [ 'package' => $package, 'dependencies' => $dependencies ]; return true; } /** * @return Package[] */ public function getPackages() { return $this->packages; } /** * Process jobs * * @return int */ public function process() { $returnStatus = 0; $this->start = $this->lastJobStarted = time(); $packages = $this->packages; while (count($packages) && $this->checkTimeout()) { foreach ($packages as $name => $packageJob) { $this->assertAndExecute($name, $packages, $packageJob); } $this->logger->info('.'); sleep(3); foreach ($this->inProgress as $name => $package) { if ($this->isDeployed($package)) { unset($this->inProgress[$name]); } } } $this->awaitForAllProcesses(); return $returnStatus; } /** * Check that all depended packages deployed and execute * * @param string $name * @param array $packages * @param array $packageJob * @return void */ private function assertAndExecute($name, array & $packages, array $packageJob) { /** @var Package $package */ $package = $packageJob['package']; $dependenciesNotFinished = false; if ($package->getParent() && $package->getParent() !== $package) { foreach ($packageJob['dependencies'] as $dependencyName => $dependency) { if (!$this->isDeployed($dependency)) { //If it's not present in $packages then it's already //in progress so just waiting... if (!array_key_exists($dependencyName, $packages)) { $dependenciesNotFinished = true; } else { $this->assertAndExecute( $dependencyName, $packages, $packages[$dependencyName] ); } } } } $this->executePackage($package, $name, $packages, $dependenciesNotFinished); } /** * @param Package $package * @param string $name * @param array $packages * @param bool $dependenciesNotFinished * @return void */ private function executePackage( Package $package, string $name, array &$packages, bool $dependenciesNotFinished ) { if (!$dependenciesNotFinished && !$this->isDeployed($package) && ($this->maxProcesses < 2 || (count($this->inProgress) < $this->maxProcesses)) ) { unset($packages[$name]); $this->execute($package); } } /** * Need to wait till all processes finished * * @return void */ private function awaitForAllProcesses() { while ($this->inProgress && $this->checkTimeout()) { foreach ($this->inProgress as $name => $package) { if ($this->isDeployed($package)) { unset($this->inProgress[$name]); } } $this->logger->info('.'); sleep(5); } if ($this->isCanBeParalleled()) { // close connections only if ran with forks $this->resourceConnection->closeConnection(); } } /** * @return bool */ private function isCanBeParalleled() { return function_exists('pcntl_fork') && $this->maxProcesses > 1; } /** * @param Package $package * @return bool true on success for main process and exit for child process * @SuppressWarnings(PHPMD.ExitExpression) */ private function execute(Package $package) { $this->lastJobStarted = time(); $this->logger->info( "Execute: " . $package->getPath(), [ 'process' => $package->getPath(), 'count' => count($package->getFiles()), ] ); $this->appState->emulateAreaCode( $package->getArea() == Package::BASE_AREA ? 'global' : $package->getArea(), function () use ($package) { // emulate application locale needed for correct file path resolving $this->localeResolver->setLocale($package->getLocale()); // execute package pre-processors // (may add more files to deploy, so it needs to be executed in main thread) foreach ($package->getPreProcessors() as $processor) { $processor->process($package, $this->options); } } ); if ($this->isCanBeParalleled()) { $pid = pcntl_fork(); if ($pid === -1) { throw new \RuntimeException('Unable to fork a new process'); } if ($pid) { $this->inProgress[$package->getPath()] = $package; $this->processIds[$package->getPath()] = $pid; return true; } // process child process $this->inProgress = []; $this->deployPackageService->deploy($package, $this->options, true); exit(0); } else { $this->deployPackageService->deploy($package, $this->options); return true; } } /** * @param Package $package * @return bool */ private function isDeployed(Package $package) { if ($this->isCanBeParalleled()) { if ($package->getState() === null) { $pid = pcntl_waitpid($this->getPid($package), $status, WNOHANG); if ($pid === $this->getPid($package)) { $package->setState(Package::STATE_COMPLETED); unset($this->inProgress[$package->getPath()]); return pcntl_wexitstatus($status) === 0; } return false; } } return $package->getState(); } /** * @param Package $package * @return int|null */ private function getPid(Package $package) { return isset($this->processIds[$package->getPath()]) ? $this->processIds[$package->getPath()] : null; } /** * @return bool */ private function checkTimeout() { return time() - $this->lastJobStarted < $this->maxExecTime; } /** * Free resources * * Protect against zombie process * * @return void */ public function __destruct() { foreach ($this->inProgress as $package) { if (pcntl_waitpid($this->getPid($package), $status) === -1) { throw new \RuntimeException( 'Error while waiting for package deployed: ' . $this->getPid($package) . '; Status: ' . $status ); } } } }