summaryrefslogtreecommitdiff
path: root/vendor/fguillot/simple-queue/src
diff options
context:
space:
mode:
authorFrederic Guillot <fred@kanboard.net>2017-10-25 16:22:10 -0700
committerFrederic Guillot <fred@kanboard.net>2017-10-25 16:22:10 -0700
commit9e2b2a32fd0e967ad3184e9a5d091a29953acb91 (patch)
tree00822e24aa1110c73ca455a8d096ef296c008cbc /vendor/fguillot/simple-queue/src
parentc507c5416251c505cb3e088a03c6664bed73c812 (diff)
Include composer dependencies in repo
Diffstat (limited to 'vendor/fguillot/simple-queue/src')
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php138
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php150
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php120
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php109
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php100
-rw-r--r--vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php14
-rw-r--r--vendor/fguillot/simple-queue/src/Job.php98
-rw-r--r--vendor/fguillot/simple-queue/src/Queue.php92
-rw-r--r--vendor/fguillot/simple-queue/src/QueueAdapterInterface.php58
9 files changed, 879 insertions, 0 deletions
diff --git a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php
new file mode 100644
index 00000000..379dd9b8
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php
@@ -0,0 +1,138 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use PhpAmqpLib\Channel\AMQPChannel;
+use PhpAmqpLib\Message\AMQPMessage;
+use PhpAmqpLib\Wire\AMQPTable;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+
+/**
+ * Class AmqpQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class AmqpQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var AMQPChannel
+ */
+ protected $channel;
+
+ /**
+ * @var string
+ */
+ protected $exchange = '';
+
+ /**
+ * @var string
+ */
+ protected $queue = '';
+
+ /**
+ * AmqpQueueAdapter constructor.
+ *
+ * @param AMQPChannel $channel
+ * @param string $queue
+ * @param string $exchange
+ */
+ public function __construct(AMQPChannel $channel, $queue, $exchange)
+ {
+ $this->channel = $channel;
+ $this->exchange = $exchange;
+ $this->queue = $queue;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain'));
+ $this->channel->basic_publish($message, $this->exchange);
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $now = new DateTime();
+ $when = clone($dateTime);
+ $delay = $when->getTimestamp() - $now->getTimestamp();
+
+ $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2));
+ $message->set('application_headers', new AMQPTable(array('x-delay' => $delay)));
+
+ $this->channel->basic_publish($message, $this->exchange);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $message = null;
+
+ $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) {
+ $message = $msg;
+ $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
+ });
+
+ while (count($this->channel->callbacks)) {
+ $this->channel->wait();
+ }
+
+ if ($message === null) {
+ return null;
+ }
+
+ $job = new Job();
+ $job->setId($message->get('delivery_tag'));
+ $job->unserialize($message->getBody());
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->channel->basic_ack($job->getId());
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->channel->basic_nack($job->getId());
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php
new file mode 100644
index 00000000..01377317
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php
@@ -0,0 +1,150 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use Aws\Sqs\SqsClient;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+use DateTime;
+
+/**
+ * Class AwsSqsQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ * @author George Webb <george@webb.uno>
+ */
+class AwsSqsQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var string
+ */
+ private $queueName;
+
+ /**
+ * @var SqsClient
+ */
+ private $sqsClient;
+
+ /**
+ * @var string
+ */
+ private $sqsUrl;
+
+ /**
+ * @var array
+ */
+ private $config;
+
+ /**
+ * AwsSqsQueueAdapter constructor.
+ *
+ * @param string $queueName The name of the SQS queue
+ * @param SqsClient $sqsClient An SQS client
+ * @param array $config Array of config values
+ */
+ public function __construct($queueName, SqsClient $sqsClient, $config = array())
+ {
+ $this->queueName = $queueName;
+ $this->sqsClient = $sqsClient;
+ $this->sqsUrl = $this->sqsClient->getQueueUrl(array('QueueName' => $this->queueName))->get('QueueUrl');
+ $this->config = $config;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->sqsClient->sendMessage(array(
+ 'QueueUrl' => $this->sqsUrl,
+ 'MessageBody' => $job->serialize()
+ ));
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $now = new DateTime();
+ $when = clone($dateTime);
+ $delay = $when->getTimestamp() - $now->getTimestamp();
+
+ $this->sqsClient->sendMessage(array(
+ 'QueueUrl' => $this->sqsUrl,
+ 'MessageBody' => $job->serialize(),
+ 'VisibilityTimeout' => $delay
+ ));
+
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $result = $this->sqsClient->receiveMessage(array(
+ 'QueueUrl' => $this->sqsUrl,
+ 'WaitTimeSeconds' => empty($this->config['LongPollingTime']) ? 0 : (int) $this->config['LongPollingTime']
+ ));
+
+ if ($result['Messages'] == null) {
+ return null;
+ }
+
+ $resultMessage = array_pop($result['Messages']);
+
+ $job = new Job();
+ $job->setId($resultMessage['ReceiptHandle']);
+ $job->unserialize($resultMessage['Body']);
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->sqsClient->deleteMessage(array(
+ 'QueueUrl' => $this->sqsUrl,
+ 'ReceiptHandle' => $job->getId()
+ ));
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->sqsClient->changeMessageVisibility(array(
+ 'QueueUrl' => $this->sqsUrl,
+ 'ReceiptHandle' => $job->getId(),
+ 'VisibilityTimeout' => 0
+ ));
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php
new file mode 100644
index 00000000..407f60e2
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php
@@ -0,0 +1,120 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use Pheanstalk\Job as BeanstalkJob;
+use Pheanstalk\Pheanstalk;
+use Pheanstalk\PheanstalkInterface;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+
+/**
+ * Class BeanstalkQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class BeanstalkQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var PheanstalkInterface
+ */
+ protected $beanstalk;
+
+ /**
+ * @var string
+ */
+ protected $queueName = '';
+
+ /**
+ * BeanstalkQueueAdapter constructor.
+ *
+ * @param PheanstalkInterface $beanstalk
+ * @param string $queueName
+ */
+ public function __construct(PheanstalkInterface $beanstalk, $queueName)
+ {
+ $this->beanstalk = $beanstalk;
+ $this->queueName = $queueName;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->beanstalk->putInTube($this->queueName, $job->serialize());
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $now = new DateTime();
+ $when = clone($dateTime);
+ $delay = $when->getTimestamp() - $now->getTimestamp();
+
+ $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName);
+
+ if ($beanstalkJob === false) {
+ return null;
+ }
+
+ $job = new Job();
+ $job->setId($beanstalkJob->getId());
+ $job->unserialize($beanstalkJob->getData());
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
+ $this->beanstalk->delete($beanstalkJob);
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
+ $this->beanstalk->bury($beanstalkJob);
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php
new file mode 100644
index 00000000..047658d7
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php
@@ -0,0 +1,109 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use Disque\Client as DisqueClient;
+use Disque\Queue\Job as DisqueJob;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+
+/**
+ * Class DisqueQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class DisqueQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var DisqueClient
+ */
+ protected $disque;
+
+ /**
+ * @var string
+ */
+ protected $queueName;
+
+ /**
+ * DisqueQueueAdapter constructor.
+ *
+ * @param DisqueClient $disque
+ * @param string $queueName
+ */
+ public function __construct(DisqueClient $disque, $queueName)
+ {
+ $this->disque = $disque;
+ $this->queueName = $queueName;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->disque->queue($this->queueName)->push(new DisqueJob($job->getBody()));
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $this->disque->queue($this->queueName)->schedule(new DisqueJob($job->serialize()), $dateTime);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $disqueJob = $this->disque->queue($this->queueName)->pull();
+
+ if ($disqueJob === null) {
+ return null;
+ }
+
+ return new Job($disqueJob->getBody(), $disqueJob->getId());
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->disque->queue($this->queueName)->processed(new DisqueJob($job->getBody(), $job->getId()));
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->disque->queue($this->queueName)->failed(new DisqueJob($job->getBody(), $job->getId()));
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php
new file mode 100644
index 00000000..36375d5b
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php
@@ -0,0 +1,100 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use Exception;
+use SimpleQueue\Exception\NotSupportedException;
+use SimpleQueue\QueueAdapterInterface;
+use SimpleQueue\Job;
+use SplQueue;
+
+/**
+ * Class MemoryAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class MemoryQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var SplQueue
+ */
+ protected $queue;
+
+ /**
+ * MemoryAdapter constructor.
+ */
+ public function __construct()
+ {
+ $this->queue = new SplQueue();
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->queue->enqueue($job->serialize());
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return bool
+ * @throws NotSupportedException
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ throw new NotSupportedException('Job delay is not supported by MemoryQueue');
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ try {
+ $job = new Job();
+ $payload = $this->queue->dequeue();
+ return $job->unserialize($payload);
+ } catch (Exception $e) {
+ return null;
+ }
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->queue->enqueue($job->serialize());
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php b/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php
new file mode 100644
index 00000000..36106659
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Exception/NotSupportedException.php
@@ -0,0 +1,14 @@
+<?php
+
+namespace SimpleQueue\Exception;
+
+use Exception;
+
+/**
+ * Class NotSupportedException
+ *
+ * @package SimpleQueue\Exception
+ */
+class NotSupportedException extends Exception
+{
+}
diff --git a/vendor/fguillot/simple-queue/src/Job.php b/vendor/fguillot/simple-queue/src/Job.php
new file mode 100644
index 00000000..799bbba8
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Job.php
@@ -0,0 +1,98 @@
+<?php
+
+namespace SimpleQueue;
+
+/**
+ * Class Job
+ *
+ * @package SimpleQueue
+ */
+class Job
+{
+ protected $id;
+ protected $body;
+
+ /**
+ * Job constructor.
+ *
+ * @param null $body
+ * @param null $id
+ */
+ public function __construct($body = null, $id = null)
+ {
+ $this->body = $body;
+ $this->id = $id;
+ }
+
+ /**
+ * Unserialize a payload
+ *
+ * @param string $payload
+ * @return $this
+ */
+ public function unserialize($payload)
+ {
+ $this->body = json_decode($payload, true);
+ return $this;
+ }
+
+ /**
+ * Serialize the body
+ *
+ * @return string
+ */
+ public function serialize()
+ {
+ return json_encode($this->body);
+ }
+
+ /**
+ * Set body
+ *
+ * @param mixed $body
+ * @return Job
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ return $this;
+ }
+
+ /**
+ * Get body
+ *
+ * @return mixed
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * Set job ID
+ *
+ * @param mixed $jobId
+ * @return Job
+ */
+ public function setId($jobId)
+ {
+ $this->id = $jobId;
+ return $this;
+ }
+
+ /**
+ * Get job ID
+ * @return mixed
+ */
+ public function getId()
+ {
+ return $this->id;
+ }
+
+ /**
+ * Execute job
+ */
+ public function execute()
+ {
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/Queue.php b/vendor/fguillot/simple-queue/src/Queue.php
new file mode 100644
index 00000000..a88b55cb
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/Queue.php
@@ -0,0 +1,92 @@
+<?php
+
+namespace SimpleQueue;
+
+use DateTime;
+
+/**
+ * Class Queue
+ *
+ * @package SimpleQueue
+ */
+class Queue implements QueueAdapterInterface
+{
+ /**
+ * @var QueueAdapterInterface
+ */
+ protected $queueAdapter;
+
+ /**
+ * Queue constructor.
+ *
+ * @param QueueAdapterInterface $queueAdapter
+ */
+ public function __construct(QueueAdapterInterface $queueAdapter)
+ {
+ $this->queueAdapter = $queueAdapter;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->queueAdapter->push($job);
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $this->queueAdapter->schedule($job, $dateTime);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ return $this->queueAdapter->pull();
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->queueAdapter->completed($job);
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->queueAdapter->failed($job);
+ return $this;
+ }
+}
diff --git a/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php b/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php
new file mode 100644
index 00000000..9bda3070
--- /dev/null
+++ b/vendor/fguillot/simple-queue/src/QueueAdapterInterface.php
@@ -0,0 +1,58 @@
+<?php
+
+namespace SimpleQueue;
+
+use DateTime;
+
+/**
+ * Interface AdapterInterface
+ *
+ * @package SimpleQueue\Adapter
+ */
+interface QueueAdapterInterface
+{
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job);
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime);
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull();
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job);
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job);
+}