diff options
Diffstat (limited to 'vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php')
-rw-r--r-- | vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php new file mode 100644 index 00000000..047658d7 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php @@ -0,0 +1,109 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Disque\Client as DisqueClient; +use Disque\Queue\Job as DisqueJob; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class DisqueQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class DisqueQueueAdapter implements QueueAdapterInterface +{ + /** + * @var DisqueClient + */ + protected $disque; + + /** + * @var string + */ + protected $queueName; + + /** + * DisqueQueueAdapter constructor. + * + * @param DisqueClient $disque + * @param string $queueName + */ + public function __construct(DisqueClient $disque, $queueName) + { + $this->disque = $disque; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->disque->queue($this->queueName)->push(new DisqueJob($job->getBody())); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $this->disque->queue($this->queueName)->schedule(new DisqueJob($job->serialize()), $dateTime); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $disqueJob = $this->disque->queue($this->queueName)->pull(); + + if ($disqueJob === null) { + return null; + } + + return new Job($disqueJob->getBody(), $disqueJob->getId()); + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->disque->queue($this->queueName)->processed(new DisqueJob($job->getBody(), $job->getId())); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->disque->queue($this->queueName)->failed(new DisqueJob($job->getBody(), $job->getId())); + return $this; + } +} |