diff options
author | Frédéric Guillot <fred@kanboard.net> | 2018-04-04 15:21:13 -0700 |
---|---|---|
committer | Frédéric Guillot <fred@kanboard.net> | 2018-04-04 15:21:13 -0700 |
commit | a4642d17e0e1ea018b128efdcc3db281461458b1 (patch) | |
tree | 00210c3d0abd0adea7f8817e6ba1d82c1ea4b50e /libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php | |
parent | 62178b1f2b4ad6ed8eafbcd3be8ef2f46b041b82 (diff) |
Move custom libs to the source tree
Diffstat (limited to 'libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php')
-rw-r--r-- | libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Pheanstalk\Job as BeanstalkJob; +use Pheanstalk\Pheanstalk; +use Pheanstalk\PheanstalkInterface; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class BeanstalkQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class BeanstalkQueueAdapter implements QueueAdapterInterface +{ + /** + * @var PheanstalkInterface + */ + protected $beanstalk; + + /** + * @var string + */ + protected $queueName = ''; + + /** + * BeanstalkQueueAdapter constructor. + * + * @param PheanstalkInterface $beanstalk + * @param string $queueName + */ + public function __construct(PheanstalkInterface $beanstalk, $queueName) + { + $this->beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} |