summaryrefslogtreecommitdiff
path: root/buildscripts/phing/classes/phing/contrib/DocBlox/Parallel/Manager.php
blob: 6fe711070cc5837493a02dcec07644a35b3c6ccb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
<?php
/**
 * DocBlox
 *
 * PHP Version 5
 *
 * @category  DocBlox
 * @package   Parallel
 * @author    Mike van Riel <mike.vanriel@naenius.com>
 * @copyright 2010-2011 Mike van Riel / Naenius (http://www.naenius.com)
 * @license   http://www.opensource.org/licenses/mit-license.php MIT
 * @link      http://docblox-project.org
 */

/**
 * Manager class for Parallel processes.
 *
 * This class will manage the workers and make sure all processes are executed
 * in parallel and not too many at the same time.
 *
 * @category DocBlox
 * @package  Parallel
 * @author   Mike van Riel <mike.vanriel@naenius.com>
 * @license  http://www.opensource.org/licenses/mit-license.php MIT
 * @link     http://docblox-project.org
 */
class DocBlox_Parallel_Manager extends ArrayObject
{
    /** @var int The maximum number of processes to run simultaneously */
    protected $process_limit = 2;

    /** @var boolean Tracks whether this manager is currently executing */
    protected $is_running = false;

    /**
     * Tries to autodetect the optimal number of process by counting the number
     * of processors.
     *
     * @param array  $input          Input for the array object.
     * @param int    $flags          flags for the array object.
     * @param string $iterator_class Iterator class for this array object.
     */
    public function __construct(
        $input = array(), $flags = 0, $iterator_class = "ArrayIterator"
    ) {
        parent::__construct($input, $flags, $iterator_class);

        if (is_readable('/proc/cpuinfo')) {
            $processors = 0;
            exec("cat /proc/cpuinfo | grep processor | wc -l", $processors);
            $this->setProcessLimit(reset($processors));
        }
    }

    /**
     * Adds a worker to to the queue.
     *
     * This method will prepare a worker to be executed in parallel once the
     * execute method is invoked.
     * A fluent interface is provided so that you can chain multiple workers
     * in one call.
     *
     * Example:
     *
     *    $cb1 = function() { var_dump('a'); sleep(1); };
     *    $cb2 = function() { var_dump('b'); sleep(1); };
     *
     *    $mgr = new DocBlox_Parallel_Manager();
     *    $mgr->setProcessLimit(2)
     *        ->addWorker(new DocBlox_Parallel_Worker($cb1))
     *        ->addWorker(new DocBlox_Parallel_Worker($cb2))
     *        ->execute();
     *
     * @param int                     $index  The key for this worker.
     * @param DocBlox_Parallel_Worker $newval The worker to add onto the queue.
     *
     * @see DocBlox_Parallel_Manager::execute()
     *
     * @throws RuntimeException         if this method is invoked while the
     *     manager is busy executing tasks.
     * @throws InvalidArgumentException if the provided element is not of type
     *     DocBlox_Parallel_Worker.
     *
     * @return void
     */
    public function offsetSet($index, $newval)
    {
        if (!$newval instanceof DocBlox_Parallel_Worker) {
            throw new InvalidArgumentException(
                'Provided element must be of type DocBlox_Parallel_Worker'
            );
        }
        if ($this->isRunning()) {
            throw new RuntimeException(
                'Workers may not be added during execution of the manager'
            );
        }

        parent::offsetSet($index, $newval);
    }

    /**
     * Convenience method to make the addition of workers explicit and allow a
     * fluent interface.
     *
     * @param DocBlox_Parallel_Worker $worker The worker to add onto the queue.
     *
     * @return self
     */
    public function addWorker(DocBlox_Parallel_Worker $worker)
    {
        $this[] = $worker;

        return $this;
    }

    /**
     * Sets how many processes at most to execute at the same time.
     *
     * A fluent interface is provided so that you can chain multiple workers
     * in one call.
     *
     * @param int $process_limit The limit, minimum of 1
     *
     * @see DocBlox_Parallel_Manager::addWorker() for an example
     *
     * @return self
     */
    public function setProcessLimit($process_limit)
    {
        if ($process_limit < 1) {
            throw new InvalidArgumentException(
                'Number of simultaneous processes may not be less than 1'
            );
        }

        $this->process_limit = $process_limit;

        return $this;
    }

    /**
     * Returns the current limit on the amount of processes that can be
     * executed at the same time.
     *
     * @return int
     */
    public function getProcessLimit()
    {
        return $this->process_limit;
    }

    /**
     * Returns whether the manager is executing the workers.
     *
     * @return boolean
     */
    public function isRunning()
    {
        return $this->is_running;
    }

    /**
     * Executes each worker.
     *
     * This method loops through the list of workers and tries to fork as
     * many times as the ProcessLimit dictates at the same time.
     *
     * @return void
     */
    public function execute()
    {
        /** @var int[] $processes */
        $processes = $this->startExecution();

        /** @var DocBlox_Parallel_Worker $worker */
        foreach ($this as $worker) {

            // if requirements are not met, execute workers in series.
            if (!$this->checkRequirements()) {
                $worker->execute();
                continue;
            }

            $this->forkAndRun($worker, $processes);
        }

        $this->stopExecution($processes);
    }

    /**
     * Notifies manager that execution has started, checks requirements and
     * returns array for child processes.
     *
     * If forking is not available because library requirements are not met
     * than the list of workers is processed in series and a E_USER_NOTICE is
     * triggered.
     *
     * @return int[]
     */
    protected function startExecution()
    {
        $this->is_running = true;

        // throw a E_USER_NOTICE if the requirements are not met.
        if (!$this->checkRequirements()) {
            trigger_error(
                'The PCNTL extension is not available, running workers in series '
                . 'instead of parallel',
                E_USER_NOTICE
            );
        }

        return array();
    }

    /**
     * Waits for all processes to have finished and notifies the manager that
     * execution has stopped.
     *
     * @param int[] &$processes List of running processes.
     *
     * @return void
     */
    protected function stopExecution(array &$processes)
    {
        // starting of processes has ended but some processes might still be
        // running wait for them to finish
        while (!empty($processes)) {
            pcntl_waitpid(array_shift($processes), $status);
        }

        /** @var DocBlox_Parallel_Worker $worker */
        foreach ($this as $worker) {
            $worker->pipe->push();
        }

        $this->is_running = false;
    }

    /**
     * Forks the current process and calls the Worker's execute method OR
     * handles the parent process' execution.
     *
     * This is the really tricky part of the forking mechanism. Here we invoke
     * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
     * and either execute the forked process or deal with the parent's process
     * based on in which process we are.
     *
     * To fully understand what is going on here it is recommended to read the
     * PHP manual page on
     * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork}
     * and associated articles.
     *
     * If there are more workers than may be ran simultaneously then this method
     * will wait until a slot becomes available and then starts the next worker.
     *
     * @param DocBlox_Parallel_Worker $worker     The worker to process.
     * @param int[]                   &$processes The list of running processes.
     *
     * @throws RuntimeException if we are unable to fork.
     *
     * @return void
     */
    protected function forkAndRun(
        DocBlox_Parallel_Worker $worker, array &$processes
    ) {
        $worker->pipe = new DocBlox_Parallel_WorkerPipe($worker);

        // fork the process and register the PID
        $pid = pcntl_fork();

        switch ($pid) {
        case -1:
            throw new RuntimeException('Unable to establish a fork');
        case 0: // Child process
            $worker->execute();

            $worker->pipe->pull();

            // Kill -9 this process to prevent closing of shared file handlers.
            // Not doing this causes, for example, MySQL connections to be cleaned.
            posix_kill(getmypid(), SIGKILL);
        default: // Parent process
            // Keep track if the worker children
            $processes[] = $pid;

            if (count($processes) >= $this->getProcessLimit()) {
                pcntl_waitpid(array_shift($processes), $status);
            }
            break;
        }
    }

    /**
     * Returns true when all requirements are met.
     *
     * @return bool
     */
    protected function checkRequirements()
    {
        return (bool)(extension_loaded('pcntl'));
    }
}