From 9e2b2a32fd0e967ad3184e9a5d091a29953acb91 Mon Sep 17 00:00:00 2001 From: Frederic Guillot Date: Wed, 25 Oct 2017 16:22:10 -0700 Subject: Include composer dependencies in repo --- .../src/Adapter/BeanstalkQueueAdapter.php | 120 +++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php (limited to 'vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php') diff --git a/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} -- cgit v1.2.3