diff options
Diffstat (limited to 'buildscripts/phing/classes/phing/contrib')
5 files changed, 797 insertions, 0 deletions
diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php new file mode 100644 index 00000000..6fe71107 --- /dev/null +++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php @@ -0,0 +1,304 @@ +<?php +/** + * DocBlox + * + * PHP Version 5 + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com) + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ + +/** + * Manager class for Parallel processes. + * + * This class will manage the workers and make sure all processes are executed + * in parallel and not too many at the same time. + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ +class DocBlox_Parallel_Manager extends ArrayObject +{ + /** @var int The maximum number of processes to run simultaneously */ + protected $process_limit = 2; + + /** @var boolean Tracks whether this manager is currently executing */ + protected $is_running = false; + + /** + * Tries to autodetect the optimal number of process by counting the number + * of processors. + * + * @param array $input Input for the array object. + * @param int $flags flags for the array object. + * @param string $iterator_class Iterator class for this array object. + */ + public function __construct( + $input = array(), $flags = 0, $iterator_class = "ArrayIterator" + ) { + parent::__construct($input, $flags, $iterator_class); + + if (is_readable('/proc/cpuinfo')) { + $processors = 0; + exec("cat /proc/cpuinfo | grep processor | wc -l", $processors); + $this->setProcessLimit(reset($processors)); + } + } + + /** + * Adds a worker to to the queue. + * + * This method will prepare a worker to be executed in parallel once the + * execute method is invoked. + * A fluent interface is provided so that you can chain multiple workers + * in one call. + * + * Example: + * + * $cb1 = function() { var_dump('a'); sleep(1); }; + * $cb2 = function() { var_dump('b'); sleep(1); }; + * + * $mgr = new DocBlox_Parallel_Manager(); + * $mgr->setProcessLimit(2) + * ->addWorker(new DocBlox_Parallel_Worker($cb1)) + * ->addWorker(new DocBlox_Parallel_Worker($cb2)) + * ->execute(); + * + * @param int $index The key for this worker. + * @param DocBlox_Parallel_Worker $newval The worker to add onto the queue. + * + * @see DocBlox_Parallel_Manager::execute() + * + * @throws RuntimeException if this method is invoked while the + * manager is busy executing tasks. + * @throws InvalidArgumentException if the provided element is not of type + * DocBlox_Parallel_Worker. + * + * @return void + */ + public function offsetSet($index, $newval) + { + if (!$newval instanceof DocBlox_Parallel_Worker) { + throw new InvalidArgumentException( + 'Provided element must be of type DocBlox_Parallel_Worker' + ); + } + if ($this->isRunning()) { + throw new RuntimeException( + 'Workers may not be added during execution of the manager' + ); + } + + parent::offsetSet($index, $newval); + } + + /** + * Convenience method to make the addition of workers explicit and allow a + * fluent interface. + * + * @param DocBlox_Parallel_Worker $worker The worker to add onto the queue. + * + * @return self + */ + public function addWorker(DocBlox_Parallel_Worker $worker) + { + $this[] = $worker; + + return $this; + } + + /** + * Sets how many processes at most to execute at the same time. + * + * A fluent interface is provided so that you can chain multiple workers + * in one call. + * + * @param int $process_limit The limit, minimum of 1 + * + * @see DocBlox_Parallel_Manager::addWorker() for an example + * + * @return self + */ + public function setProcessLimit($process_limit) + { + if ($process_limit < 1) { + throw new InvalidArgumentException( + 'Number of simultaneous processes may not be less than 1' + ); + } + + $this->process_limit = $process_limit; + + return $this; + } + + /** + * Returns the current limit on the amount of processes that can be + * executed at the same time. + * + * @return int + */ + public function getProcessLimit() + { + return $this->process_limit; + } + + /** + * Returns whether the manager is executing the workers. + * + * @return boolean + */ + public function isRunning() + { + return $this->is_running; + } + + /** + * Executes each worker. + * + * This method loops through the list of workers and tries to fork as + * many times as the ProcessLimit dictates at the same time. + * + * @return void + */ + public function execute() + { + /** @var int[] $processes */ + $processes = $this->startExecution(); + + /** @var DocBlox_Parallel_Worker $worker */ + foreach ($this as $worker) { + + // if requirements are not met, execute workers in series. + if (!$this->checkRequirements()) { + $worker->execute(); + continue; + } + + $this->forkAndRun($worker, $processes); + } + + $this->stopExecution($processes); + } + + /** + * Notifies manager that execution has started, checks requirements and + * returns array for child processes. + * + * If forking is not available because library requirements are not met + * than the list of workers is processed in series and a E_USER_NOTICE is + * triggered. + * + * @return int[] + */ + protected function startExecution() + { + $this->is_running = true; + + // throw a E_USER_NOTICE if the requirements are not met. + if (!$this->checkRequirements()) { + trigger_error( + 'The PCNTL extension is not available, running workers in series ' + . 'instead of parallel', + E_USER_NOTICE + ); + } + + return array(); + } + + /** + * Waits for all processes to have finished and notifies the manager that + * execution has stopped. + * + * @param int[] &$processes List of running processes. + * + * @return void + */ + protected function stopExecution(array &$processes) + { + // starting of processes has ended but some processes might still be + // running wait for them to finish + while (!empty($processes)) { + pcntl_waitpid(array_shift($processes), $status); + } + + /** @var DocBlox_Parallel_Worker $worker */ + foreach ($this as $worker) { + $worker->pipe->push(); + } + + $this->is_running = false; + } + + /** + * Forks the current process and calls the Worker's execute method OR + * handles the parent process' execution. + * + * This is the really tricky part of the forking mechanism. Here we invoke + * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork} + * and either execute the forked process or deal with the parent's process + * based on in which process we are. + * + * To fully understand what is going on here it is recommended to read the + * PHP manual page on + * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork} + * and associated articles. + * + * If there are more workers than may be ran simultaneously then this method + * will wait until a slot becomes available and then starts the next worker. + * + * @param DocBlox_Parallel_Worker $worker The worker to process. + * @param int[] &$processes The list of running processes. + * + * @throws RuntimeException if we are unable to fork. + * + * @return void + */ + protected function forkAndRun( + DocBlox_Parallel_Worker $worker, array &$processes + ) { + $worker->pipe = new DocBlox_Parallel_WorkerPipe($worker); + + // fork the process and register the PID + $pid = pcntl_fork(); + + switch ($pid) { + case -1: + throw new RuntimeException('Unable to establish a fork'); + case 0: // Child process + $worker->execute(); + + $worker->pipe->pull(); + + // Kill -9 this process to prevent closing of shared file handlers. + // Not doing this causes, for example, MySQL connections to be cleaned. + posix_kill(getmypid(), SIGKILL); + default: // Parent process + // Keep track if the worker children + $processes[] = $pid; + + if (count($processes) >= $this->getProcessLimit()) { + pcntl_waitpid(array_shift($processes), $status); + } + break; + } + } + + /** + * Returns true when all requirements are met. + * + * @return bool + */ + protected function checkRequirements() + { + return (bool)(extension_loaded('pcntl')); + } +}
\ No newline at end of file diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/README.md b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/README.md new file mode 100644 index 00000000..272d7191 --- /dev/null +++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/README.md @@ -0,0 +1,106 @@ +Parallel +======== + +This is a library for introducing Parallelization into your project. +See the `example.php` file for an example how to use this library. + +Theory of Operation +------------------- + +This library will enable the developer to execute a given amount of tasks +(workers) in parallel. This is achieved by adding workers onto a manager, +optionally defining how many processes to run simultaneously and then execute +the manager. + +Under Linux this library will try to detect the number of processors and allow +a maximum number of processes to run equal to the number of processors. If this +cannot be determined or the user is running Windows then a default of 2 is used. + +Requirements and graceful degradation +------------------------------------- + +Parallelization has several requirements. But to allow distribution, without +adding several requirements to your application, will this library execute the +given tasks in serie if the requirements are not met. And throw a E_USER_NOTICE +php error that explains to the user that dependencies are missing. + +The requirements for this library are: + +* A *NIX compatible operating system +* Scripts must not run from an apache module +* the PCNTL PHP extension (http://php.net/manual/en/book.pcntl.php) + +Workers +------- + +Workers are basically wrappers around callback functions or methods. As such you +can use anything in your existing project and parallelize it. + +Do note that each parallel process is a duplicate of the original. This means +that, for example, if you pass an object (or other reference) and change that, +that the changes that you have made do not carry over to the caller. + +The return value of the given callback is stored as result on the worker and +can be read using the `getResult()` method. + +Any exception that is thrown will result in an error, where the `getReturnCode()` +method will return the exception code (be warned: this may be 0!) and the +`getError()` method will return the exception message. + +Errors and exceptions +--------------------- + +if a task throws an exception it is caught and registered as an error. The +exception's code is used as error number, where the message is used as error +message. + +By using this, instead of dying, you can continue execution of the other parallel +processes and handle errors yourself after all processes have been executed. + +Examples +-------- + +### Fluent interface + + $mgr = new DocBlox_Parallel_Manager(); + $mgr + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'a'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'b'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'c'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'd'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'e'; })) + ->execute(); + + /** @var DocBlox_Parallel_Worker $worker */ + foreach ($mgr as $worker) { + var_dump($worker->getResult()); + } + +### Array interface + + $mgr = new DocBlox_Parallel_Manager(); + $mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'f'; }); + $mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'g'; }); + $mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'h'; }); + $mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'i'; }); + $mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'j'; }); + $mgr->execute(); + + /** @var DocBlox_Parallel_Worker $worker */ + foreach ($mgr as $worker) { + var_dump($worker->getResult()); + } + +TODO +---- + +* Improve docs +* More intelligent process slots; currently only the oldest in a 'set' of slots + is waited on but if this runs for a longer time then the other slots than + those will not be filled as long as the first slot is occupied. +* Last parts of IPC (Inter-Process Communication), to be able to return + information from Workers to the Manager. + + * STDOUT + * STDERR + diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Worker.php b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Worker.php new file mode 100644 index 00000000..338f4b25 --- /dev/null +++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Worker.php @@ -0,0 +1,203 @@ +<?php +/** + * DocBlox + * + * PHP Version 5 + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com) + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ + +/** + * Class that represents the execution of a single task within a parallelized + * frame. + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ +class DocBlox_Parallel_Worker +{ + /** @var callback the task to execute for this worker */ + protected $task = null; + + /** @var mixed[] A list of argument to pass to the task */ + protected $arguments = array(); + + /** @var int The return code to tell the parent process how it went */ + protected $return_code = -1; + + /** @var mixed The result of the given task */ + protected $result = ''; + + /** @var string The error message, if an error occurred */ + protected $error = ''; + + /** + * Creates the worker and sets the task to execute optionally including + * the arguments that need to be passed to the task. + * + * @param callback $task The task to invoke upon execution. + * @param mixed[] $arguments The arguments to provide to the task. + */ + function __construct($task, array $arguments = array()) + { + $this->setTask($task); + $this->arguments = $arguments; + } + + /** + * Returns the list of arguments as provided int he constructor. + * + * @see DocBlox_Parallel_Worker::__construct() + * + * @return mixed[] + */ + public function getArguments() + { + return $this->arguments; + } + + /** + * Returns the task as provided in the constructor. + * + * @see DocBlox_Parallel_Worker::__construct() + * + * @return callback + */ + public function getTask() + { + return $this->task; + } + + /** + * Returns the available return code. + * + * This method may return -1 if no return code is available yet. + * + * @return int + */ + public function getReturnCode() + { + return $this->return_code; + } + + /** + * Sets the return code for this worker. + * + * Recommended is to use the same codes as are used with + * {@link http://www.gnu.org/software/bash/manual/html_node/Exit-Status.html + * exit codes}. + * + * In short: 0 means that the task succeeded and a any other positive value + * indicates an error condition. + * + * @param int $return_code Recommended to be a positive number + * + * @throw InvalidArgumentException if the code is not a number or negative + * + * @return void + */ + public function setReturnCode($return_code) + { + if (!is_numeric($return_code) || ($return_code < 0)) { + throw new InvalidArgumentException( + 'Expected the return code to be a positive number' + ); + } + + $this->return_code = $return_code; + } + + /** + * Returns the error message associated with the return code. + * + * @return string + */ + public function getError() + { + return $this->error; + } + + /** + * Sets the error message. + * + * @param string $error The error message. + * + * @return void + */ + public function setError($error) + { + $this->error = $error; + } + + /** + * Returns the result for this task run. + * + * @return null|mixed + */ + public function getResult() + { + return $this->result; + } + + /** + * Sets the result for this task run. + * + * @param mixed $result The value that is returned by the task; can be anything. + * + * @return void + */ + public function setResult($result) + { + $this->result = $result; + } + + /** + * Invokes the task with the given arguments and processes the output. + * + * @return void. + */ + public function execute() + { + $this->setReturnCode(0); + try { + $this->setResult( + call_user_func_array($this->getTask(), $this->getArguments()) + ); + } catch (Exception $e) { + $this->setError($e->getMessage()); + $this->setReturnCode($e->getCode()); + } + } + + /** + * Sets the task for this worker and validates whether it is callable. + * + * @param callback $task The task to execute when the execute method + * is invoked. + * + * @throws InvalidArgumentException if the given argument is not a callback. + * + * @see DocBlox_Parallel_Worker::__construct() + * @see DocBlox_Parallel_Worker::execute() + * + * @return void + */ + protected function setTask($task) + { + if (!is_callable($task)) { + throw new InvalidArgumentException( + 'Worker task is not a callable object' + ); + } + + $this->task = $task; + } +} diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/WorkerPipe.php b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/WorkerPipe.php new file mode 100644 index 00000000..3b7eb7fe --- /dev/null +++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/WorkerPipe.php @@ -0,0 +1,127 @@ +<?php +/** + * DocBlox + * + * PHP Version 5 + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com) + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ + +/** + * Class that represents a named pipe for a Worker. + * + * This class manages the named pipe for a worker and is able to push and pull + * specific data to facilitate IPC (interprocess communication). + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ +class DocBlox_Parallel_WorkerPipe +{ + /** @var DocBlox_Parallel_Worker worker class that is associated */ + protected $worker; + + /** @var string Path to the pipe */ + protected $path; + + /** + * Initializes the named pipe. + * + * @param DocBlox_Parallel_Worker $worker Associated worker. + */ + public function __construct(DocBlox_Parallel_Worker $worker) + { + $this->worker = $worker; + + $this->path = tempnam(sys_get_temp_dir(), 'dpm_'); + posix_mkfifo($this->path, 0750); + } + + /** + * If the named pipe was not cleaned up, do so now. + */ + public function __destruct() + { + if (file_exists($this->path)) { + $this->release(); + } + } + + /** + * Pull the worker data into the named pipe. + * + * @return void + */ + public function pull() + { + $this->writePipeContents(); + } + + /** + * Push the worker data back onto the worker and release the pipe. + * + * @return void + */ + public function push() + { + list($result, $error, $return_code) = $this->readPipeContents(); + $this->release(); + + $this->worker->setResult($result); + $this->worker->setError($error); + $this->worker->setReturnCode($return_code); + } + + /** + * Convenience method to show relation to readPipeContents. + * + * @return void + */ + protected function writePipeContents() + { + // push the gathered data onto a name pipe + $pipe = fopen($this->path, 'w'); + fwrite( + $pipe, serialize( + array( + $this->worker->getResult(), + $this->worker->getError(), + $this->worker->getReturnCode() + ) + ) + ); + fclose($pipe); + } + + /** + * Returns the unserialized contents of the pipe. + * + * @return array + */ + protected function readPipeContents() + { + $pipe = fopen($this->path, 'r+'); + $result = unserialize(fread($pipe, filesize($this->path))); + fclose($pipe); + + return $result; + } + + /** + * Releases the pipe. + * + * @return void + */ + protected function release() + { + unlink($this->path); + } +}
\ No newline at end of file diff --git a/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/example.php b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/example.php new file mode 100644 index 00000000..3bdcd408 --- /dev/null +++ b/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/example.php @@ -0,0 +1,57 @@ +<?php +/** + * DocBlox + * + * PHP Version 5 + * + * @category DocBlox + * @package Parallel + * @author Mike van Riel <mike.vanriel@naenius.com> + * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com) + * @license http://www.opensource.org/licenses/mit-license.php MIT + * @link http://docblox-project.org + */ + +/** Include the manager as we do not autoload */ +require_once 'Manager.php'; + +/** Include the worker as we do not autoload */ +require_once 'Worker.php'; + +/** Include the worker's pipe as we do not autoload */ +require_once 'WorkerPipe.php'; + +// ----------------------------------------------------------------------------- +// method 1: using a fluent interface and the addWorker helper. +// ----------------------------------------------------------------------------- + +$mgr = new DocBlox_Parallel_Manager(); +$mgr + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'a'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'b'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'c'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'd'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(1); return 'e'; })) + ->execute(); + +/** @var DocBlox_Parallel_Worker $worker */ +foreach ($mgr as $worker) { + var_dump($worker->getResult()); +} + +// ----------------------------------------------------------------------------- +// method 2: using the manager as worker array +// ----------------------------------------------------------------------------- + +$mgr = new DocBlox_Parallel_Manager(); +$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'f'; }); +$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'g'; }); +$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'h'; }); +$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'i'; }); +$mgr[] = new DocBlox_Parallel_Worker(function() { sleep(1); return 'j'; }); +$mgr->execute(); + +/** @var DocBlox_Parallel_Worker $worker */ +foreach ($mgr as $worker) { + var_dump($worker->getResult()); +} |