diff options
Diffstat (limited to 'vendor/fguillot/simple-queue')
10 files changed, 900 insertions, 0 deletions
diff --git a/vendor/fguillot/simple-queue/LICENSE b/vendor/fguillot/simple-queue/LICENSE new file mode 100644 index 00000000..a19d63a3 --- /dev/null +++ b/vendor/fguillot/simple-queue/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Frédéric Guillot + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php new file mode 100644 index 00000000..379dd9b8 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php @@ -0,0 +1,138 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class AmqpQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class AmqpQueueAdapter implements QueueAdapterInterface +{ + /** + * @var AMQPChannel + */ + protected $channel; + + /** + * @var string + */ + protected $exchange = ''; + + /** + * @var string + */ + protected $queue = ''; + + /** + * AmqpQueueAdapter constructor. + * + * @param AMQPChannel $channel + * @param string $queue + * @param string $exchange + */ + public function __construct(AMQPChannel $channel, $queue, $exchange) + { + $this->channel = $channel; + $this->exchange = $exchange; + $this->queue = $queue; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); + $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); + + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $message = null; + + $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { + $message = $msg; + $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); + }); + + while (count($this->channel->callbacks)) { + $this->channel->wait(); + } + + if ($message === null) { + return null; + } + + $job = new Job(); + $job->setId($message->get('delivery_tag')); + $job->unserialize($message->getBody()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->channel->basic_ack($job->getId()); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->channel->basic_nack($job->getId()); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php new file mode 100644 index 00000000..01377317 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php @@ -0,0 +1,150 @@ +<?php + +namespace SimpleQueue\Adapter; + +use Aws\Sqs\SqsClient; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; +use DateTime; + +/** + * Class AwsSqsQueueAdapter + * + * @package SimpleQueue\Adapter + * @author George Webb <george@webb.uno> + */ +class AwsSqsQueueAdapter implements QueueAdapterInterface +{ + /** + * @var string + */ + private $queueName; + + /** + * @var SqsClient + */ + private $sqsClient; + + /** + * @var string + */ + private $sqsUrl; + + /** + * @var array + */ + private $config; + + /** + * AwsSqsQueueAdapter constructor. + * + * @param string $queueName The name of the SQS queue + * @param SqsClient $sqsClient An SQS client + * @param array $config Array of config values + */ + public function __construct($queueName, SqsClient $sqsClient, $config = array()) + { + $this->queueName = $queueName; + $this->sqsClient = $sqsClient; + $this->sqsUrl = $this->sqsClient->getQueueUrl(array('QueueName' => $this->queueName))->get('QueueUrl'); + $this->config = $config; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->sqsClient->sendMessage(array( + 'QueueUrl' => $this->sqsUrl, + 'MessageBody' => $job->serialize() + )); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->sqsClient->sendMessage(array( + 'QueueUrl' => $this->sqsUrl, + 'MessageBody' => $job->serialize(), + 'VisibilityTimeout' => $delay + )); + + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $result = $this->sqsClient->receiveMessage(array( + 'QueueUrl' => $this->sqsUrl, + 'WaitTimeSeconds' => empty($this->config['LongPollingTime']) ? 0 : (int) $this->config['LongPollingTime'] + )); + + if ($result['Messages'] == null) { + return null; + } + + $resultMessage = array_pop($result['Messages']); + + $job = new Job(); + $job->setId($resultMessage['ReceiptHandle']); + $job->unserialize($resultMessage['Body']); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->sqsClient->deleteMessage(array( + 'QueueUrl' => $this->sqsUrl, + 'ReceiptHandle' => $job->getId() + )); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->sqsClient->changeMessageVisibility(array( + 'QueueUrl' => $this->sqsUrl, + 'ReceiptHandle' => $job->getId(), + 'VisibilityTimeout' => 0 + )); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Pheanstalk\Job as BeanstalkJob; +use Pheanstalk\Pheanstalk; +use Pheanstalk\PheanstalkInterface; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class BeanstalkQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class BeanstalkQueueAdapter implements QueueAdapterInterface +{ + /** + * @var PheanstalkInterface + */ + protected $beanstalk; + + /** + * @var string + */ + protected $queueName = ''; + + /** + * BeanstalkQueueAdapter constructor. + * + * @param PheanstalkInterface $beanstalk + * @param string $queueName + */ + public function __construct(PheanstalkInterface $beanstalk, $queueName) + { + $this->beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php new file mode 100644 index 00000000..047658d7 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php @@ -0,0 +1,109 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Disque\Client as DisqueClient; +use Disque\Queue\Job as DisqueJob; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class DisqueQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class DisqueQueueAdapter implements QueueAdapterInterface +{ + /** + * @var DisqueClient + */ + protected $disque; + + /** + * @var string + */ + protected $queueName; + + /** + * DisqueQueueAdapter constructor. + * + * @param DisqueClient $disque + * @param string $queueName + */ + public function __construct(DisqueClient $disque, $queueName) + { + $this->disque = $disque; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->disque->queue($this->queueName)->push(new DisqueJob($job->getBody())); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $this->disque->queue($this->queueName)->schedule(new DisqueJob($job->serialize()), $dateTime); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $disqueJob = $this->disque->queue($this->queueName)->pull(); + + if ($disqueJob === null) { + return null; + } + + return new Job($disqueJob->getBody(), $disqueJob->getId()); + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->disque->queue($this->queueName)->processed(new DisqueJob($job->getBody(), $job->getId())); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->disque->queue($this->queueName)->failed(new DisqueJob($job->getBody(), $job->getId())); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php new file mode 100644 index 00000000..36375d5b --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php @@ -0,0 +1,100 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Exception; +use SimpleQueue\Exception\NotSupportedException; +use SimpleQueue\QueueAdapterInterface; +use SimpleQueue\Job; +use SplQueue; + +/** + * Class MemoryAdapter + * + * @package SimpleQueue\Adapter + */ +class MemoryQueueAdapter implements QueueAdapterInterface +{ + /** + * @var SplQueue + */ + protected $queue; + + /** + * MemoryAdapter constructor. + */ + public function __construct() + { + $this->queue = new SplQueue(); + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->queue->enqueue($job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return bool + * @throws NotSupportedException + */ + public function schedule(Job $job, DateTime $dateTime) + { + throw new NotSupportedException('Job delay is not supported by MemoryQueue'); + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + try { + $job = new Job(); + $payload = $this->queue->dequeue(); + return $job->unserialize($payload); + } catch (Exception $e) { + return null; + } + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->queue->enqueue($job->serialize()); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php b/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php new file mode 100644 index 00000000..36106659 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php @@ -0,0 +1,14 @@ +<?php + +namespace SimpleQueue\Exception; + +use Exception; + +/** + * Class NotSupportedException + * + * @package SimpleQueue\Exception + */ +class NotSupportedException extends Exception +{ +} diff --git a/vendor/fguillot/simple-queue/src/Job.php b/vendor/fguillot/simple-queue/src/Job.php new file mode 100644 index 00000000..799bbba8 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Job.php @@ -0,0 +1,98 @@ +<?php + +namespace SimpleQueue; + +/** + * Class Job + * + * @package SimpleQueue + */ +class Job +{ + protected $id; + protected $body; + + /** + * Job constructor. + * + * @param null $body + * @param null $id + */ + public function __construct($body = null, $id = null) + { + $this->body = $body; + $this->id = $id; + } + + /** + * Unserialize a payload + * + * @param string $payload + * @return $this + */ + public function unserialize($payload) + { + $this->body = json_decode($payload, true); + return $this; + } + + /** + * Serialize the body + * + * @return string + */ + public function serialize() + { + return json_encode($this->body); + } + + /** + * Set body + * + * @param mixed $body + * @return Job + */ + public function setBody($body) + { + $this->body = $body; + return $this; + } + + /** + * Get body + * + * @return mixed + */ + public function getBody() + { + return $this->body; + } + + /** + * Set job ID + * + * @param mixed $jobId + * @return Job + */ + public function setId($jobId) + { + $this->id = $jobId; + return $this; + } + + /** + * Get job ID + * @return mixed + */ + public function getId() + { + return $this->id; + } + + /** + * Execute job + */ + public function execute() + { + } +} diff --git a/vendor/fguillot/simple-queue/src/Queue.php b/vendor/fguillot/simple-queue/src/Queue.php new file mode 100644 index 00000000..a88b55cb --- /dev/null +++ b/vendor/fguillot/simple-queue/src/Queue.php @@ -0,0 +1,92 @@ +<?php + +namespace SimpleQueue; + +use DateTime; + +/** + * Class Queue + * + * @package SimpleQueue + */ +class Queue implements QueueAdapterInterface +{ + /** + * @var QueueAdapterInterface + */ + protected $queueAdapter; + + /** + * Queue constructor. + * + * @param QueueAdapterInterface $queueAdapter + */ + public function __construct(QueueAdapterInterface $queueAdapter) + { + $this->queueAdapter = $queueAdapter; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->queueAdapter->push($job); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $this->queueAdapter->schedule($job, $dateTime); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + return $this->queueAdapter->pull(); + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->queueAdapter->completed($job); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->queueAdapter->failed($job); + return $this; + } +} diff --git a/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php b/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php new file mode 100644 index 00000000..9bda3070 --- /dev/null +++ b/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php @@ -0,0 +1,58 @@ +<?php + +namespace SimpleQueue; + +use DateTime; + +/** + * Interface AdapterInterface + * + * @package SimpleQueue\Adapter + */ +interface QueueAdapterInterface +{ + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job); + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime); + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull(); + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job); + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job); +} |