diff options
Diffstat (limited to 'app/Core/Queue')
-rw-r--r-- | app/Core/Queue/JobHandler.php | 50 | ||||
-rw-r--r-- | app/Core/Queue/QueueManager.php | 71 |
2 files changed, 121 insertions, 0 deletions
diff --git a/app/Core/Queue/JobHandler.php b/app/Core/Queue/JobHandler.php new file mode 100644 index 00000000..a2c4a2c7 --- /dev/null +++ b/app/Core/Queue/JobHandler.php @@ -0,0 +1,50 @@ +<?php + +namespace Kanboard\Core\Queue; + +use Kanboard\Core\Base; +use Kanboard\Job\BaseJob; +use SimpleQueue\Job; + +/** + * Class JobHandler + * + * @package Kanboard\Core\Queue + * @author Frederic Guillot + */ +class JobHandler extends Base +{ + /** + * Serialize a job + * + * @access public + * @param BaseJob $job + * @return Job + */ + public function serializeJob(BaseJob $job) + { + return new Job(array( + 'class' => get_class($job), + 'params' => $job->getJobParams(), + )); + } + + /** + * Execute a job + * + * @access public + * @param Job $job + */ + public function executeJob(Job $job) + { + $payload = $job->getBody(); + $className = $payload['class']; + + if (DEBUG) { + $this->logger->debug(__METHOD__.' Received job => '.$className); + } + + $worker = new $className($this->container); + call_user_func_array(array($worker, 'execute'), $payload['params']); + } +} diff --git a/app/Core/Queue/QueueManager.php b/app/Core/Queue/QueueManager.php new file mode 100644 index 00000000..f34cb220 --- /dev/null +++ b/app/Core/Queue/QueueManager.php @@ -0,0 +1,71 @@ +<?php + +namespace Kanboard\Core\Queue; + +use Kanboard\Core\Base; +use Kanboard\Job\BaseJob; +use LogicException; +use SimpleQueue\Queue; + +/** + * Class QueueManager + * + * @package Kanboard\Core\Queue + * @author Frederic Guillot + */ +class QueueManager extends Base +{ + /** + * @var Queue + */ + protected $queue = null; + + /** + * Set queue driver + * + * @access public + * @param Queue $queue + * @return $this + */ + public function setQueue(Queue $queue) + { + $this->queue = $queue; + return $this; + } + + /** + * Send a new job to the queue + * + * @access public + * @param BaseJob $job + * @return $this + */ + public function push(BaseJob $job) + { + if ($this->queue !== null) { + $this->queue->push(JobHandler::getInstance($this->container)->serializeJob($job)); + } else { + call_user_func_array(array($job, 'execute'), $job->getJobParams()); + } + + return $this; + } + + /** + * Wait for new jobs + * + * @access public + * @throws LogicException + */ + public function listen() + { + if ($this->queue === null) { + throw new LogicException('No Queue Driver defined!'); + } + + while ($job = $this->queue->pull()) { + JobHandler::getInstance($this->container)->executeJob($job); + $this->queue->completed($job); + } + } +} |