summaryrefslogtreecommitdiff
path: root/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php
diff options
context:
space:
mode:
Diffstat (limited to 'buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php')
-rw-r--r--buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php304
1 files changed, 304 insertions, 0 deletions
diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php
new file mode 100644
index 00000000..6fe71107
--- /dev/null
+++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php
@@ -0,0 +1,304 @@
+<?php
+/**
+ * DocBlox
+ *
+ * PHP Version 5
+ *
+ * @category DocBlox
+ * @package Parallel
+ * @author Mike van Riel <mike.vanriel@naenius.com>
+ * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com)
+ * @license http://www.opensource.org/licenses/mit-license.php MIT
+ * @link http://docblox-project.org
+ */
+
+/**
+ * Manager class for Parallel processes.
+ *
+ * This class will manage the workers and make sure all processes are executed
+ * in parallel and not too many at the same time.
+ *
+ * @category DocBlox
+ * @package Parallel
+ * @author Mike van Riel <mike.vanriel@naenius.com>
+ * @license http://www.opensource.org/licenses/mit-license.php MIT
+ * @link http://docblox-project.org
+ */
+class DocBlox_Parallel_Manager extends ArrayObject
+{
+ /** @var int The maximum number of processes to run simultaneously */
+ protected $process_limit = 2;
+
+ /** @var boolean Tracks whether this manager is currently executing */
+ protected $is_running = false;
+
+ /**
+ * Tries to autodetect the optimal number of process by counting the number
+ * of processors.
+ *
+ * @param array $input Input for the array object.
+ * @param int $flags flags for the array object.
+ * @param string $iterator_class Iterator class for this array object.
+ */
+ public function __construct(
+ $input = array(), $flags = 0, $iterator_class = "ArrayIterator"
+ ) {
+ parent::__construct($input, $flags, $iterator_class);
+
+ if (is_readable('/proc/cpuinfo')) {
+ $processors = 0;
+ exec("cat /proc/cpuinfo | grep processor | wc -l", $processors);
+ $this->setProcessLimit(reset($processors));
+ }
+ }
+
+ /**
+ * Adds a worker to to the queue.
+ *
+ * This method will prepare a worker to be executed in parallel once the
+ * execute method is invoked.
+ * A fluent interface is provided so that you can chain multiple workers
+ * in one call.
+ *
+ * Example:
+ *
+ * $cb1 = function() { var_dump('a'); sleep(1); };
+ * $cb2 = function() { var_dump('b'); sleep(1); };
+ *
+ * $mgr = new DocBlox_Parallel_Manager();
+ * $mgr->setProcessLimit(2)
+ * ->addWorker(new DocBlox_Parallel_Worker($cb1))
+ * ->addWorker(new DocBlox_Parallel_Worker($cb2))
+ * ->execute();
+ *
+ * @param int $index The key for this worker.
+ * @param DocBlox_Parallel_Worker $newval The worker to add onto the queue.
+ *
+ * @see DocBlox_Parallel_Manager::execute()
+ *
+ * @throws RuntimeException if this method is invoked while the
+ * manager is busy executing tasks.
+ * @throws InvalidArgumentException if the provided element is not of type
+ * DocBlox_Parallel_Worker.
+ *
+ * @return void
+ */
+ public function offsetSet($index, $newval)
+ {
+ if (!$newval instanceof DocBlox_Parallel_Worker) {
+ throw new InvalidArgumentException(
+ 'Provided element must be of type DocBlox_Parallel_Worker'
+ );
+ }
+ if ($this->isRunning()) {
+ throw new RuntimeException(
+ 'Workers may not be added during execution of the manager'
+ );
+ }
+
+ parent::offsetSet($index, $newval);
+ }
+
+ /**
+ * Convenience method to make the addition of workers explicit and allow a
+ * fluent interface.
+ *
+ * @param DocBlox_Parallel_Worker $worker The worker to add onto the queue.
+ *
+ * @return self
+ */
+ public function addWorker(DocBlox_Parallel_Worker $worker)
+ {
+ $this[] = $worker;
+
+ return $this;
+ }
+
+ /**
+ * Sets how many processes at most to execute at the same time.
+ *
+ * A fluent interface is provided so that you can chain multiple workers
+ * in one call.
+ *
+ * @param int $process_limit The limit, minimum of 1
+ *
+ * @see DocBlox_Parallel_Manager::addWorker() for an example
+ *
+ * @return self
+ */
+ public function setProcessLimit($process_limit)
+ {
+ if ($process_limit < 1) {
+ throw new InvalidArgumentException(
+ 'Number of simultaneous processes may not be less than 1'
+ );
+ }
+
+ $this->process_limit = $process_limit;
+
+ return $this;
+ }
+
+ /**
+ * Returns the current limit on the amount of processes that can be
+ * executed at the same time.
+ *
+ * @return int
+ */
+ public function getProcessLimit()
+ {
+ return $this->process_limit;
+ }
+
+ /**
+ * Returns whether the manager is executing the workers.
+ *
+ * @return boolean
+ */
+ public function isRunning()
+ {
+ return $this->is_running;
+ }
+
+ /**
+ * Executes each worker.
+ *
+ * This method loops through the list of workers and tries to fork as
+ * many times as the ProcessLimit dictates at the same time.
+ *
+ * @return void
+ */
+ public function execute()
+ {
+ /** @var int[] $processes */
+ $processes = $this->startExecution();
+
+ /** @var DocBlox_Parallel_Worker $worker */
+ foreach ($this as $worker) {
+
+ // if requirements are not met, execute workers in series.
+ if (!$this->checkRequirements()) {
+ $worker->execute();
+ continue;
+ }
+
+ $this->forkAndRun($worker, $processes);
+ }
+
+ $this->stopExecution($processes);
+ }
+
+ /**
+ * Notifies manager that execution has started, checks requirements and
+ * returns array for child processes.
+ *
+ * If forking is not available because library requirements are not met
+ * than the list of workers is processed in series and a E_USER_NOTICE is
+ * triggered.
+ *
+ * @return int[]
+ */
+ protected function startExecution()
+ {
+ $this->is_running = true;
+
+ // throw a E_USER_NOTICE if the requirements are not met.
+ if (!$this->checkRequirements()) {
+ trigger_error(
+ 'The PCNTL extension is not available, running workers in series '
+ . 'instead of parallel',
+ E_USER_NOTICE
+ );
+ }
+
+ return array();
+ }
+
+ /**
+ * Waits for all processes to have finished and notifies the manager that
+ * execution has stopped.
+ *
+ * @param int[] &$processes List of running processes.
+ *
+ * @return void
+ */
+ protected function stopExecution(array &$processes)
+ {
+ // starting of processes has ended but some processes might still be
+ // running wait for them to finish
+ while (!empty($processes)) {
+ pcntl_waitpid(array_shift($processes), $status);
+ }
+
+ /** @var DocBlox_Parallel_Worker $worker */
+ foreach ($this as $worker) {
+ $worker->pipe->push();
+ }
+
+ $this->is_running = false;
+ }
+
+ /**
+ * Forks the current process and calls the Worker's execute method OR
+ * handles the parent process' execution.
+ *
+ * This is the really tricky part of the forking mechanism. Here we invoke
+ * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
+ * and either execute the forked process or deal with the parent's process
+ * based on in which process we are.
+ *
+ * To fully understand what is going on here it is recommended to read the
+ * PHP manual page on
+ * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
+ * and associated articles.
+ *
+ * If there are more workers than may be ran simultaneously then this method
+ * will wait until a slot becomes available and then starts the next worker.
+ *
+ * @param DocBlox_Parallel_Worker $worker The worker to process.
+ * @param int[] &$processes The list of running processes.
+ *
+ * @throws RuntimeException if we are unable to fork.
+ *
+ * @return void
+ */
+ protected function forkAndRun(
+ DocBlox_Parallel_Worker $worker, array &$processes
+ ) {
+ $worker->pipe = new DocBlox_Parallel_WorkerPipe($worker);
+
+ // fork the process and register the PID
+ $pid = pcntl_fork();
+
+ switch ($pid) {
+ case -1:
+ throw new RuntimeException('Unable to establish a fork');
+ case 0: // Child process
+ $worker->execute();
+
+ $worker->pipe->pull();
+
+ // Kill -9 this process to prevent closing of shared file handlers.
+ // Not doing this causes, for example, MySQL connections to be cleaned.
+ posix_kill(getmypid(), SIGKILL);
+ default: // Parent process
+ // Keep track if the worker children
+ $processes[] = $pid;
+
+ if (count($processes) >= $this->getProcessLimit()) {
+ pcntl_waitpid(array_shift($processes), $status);
+ }
+ break;
+ }
+ }
+
+ /**
+ * Returns true when all requirements are met.
+ *
+ * @return bool
+ */
+ protected function checkRequirements()
+ {
+ return (bool)(extension_loaded('pcntl'));
+ }
+} \ No newline at end of file