summaryrefslogtreecommitdiff
path: root/libs/SimpleQueue
diff options
context:
space:
mode:
Diffstat (limited to 'libs/SimpleQueue')
-rw-r--r--libs/SimpleQueue/Adapter/AmqpQueueAdapter.php138
-rw-r--r--libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php120
-rw-r--r--libs/SimpleQueue/Exception/NotSupportedException.php14
-rw-r--r--libs/SimpleQueue/Job.php98
-rw-r--r--libs/SimpleQueue/Queue.php92
-rw-r--r--libs/SimpleQueue/QueueAdapterInterface.php58
6 files changed, 520 insertions, 0 deletions
diff --git a/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php
new file mode 100644
index 00000000..379dd9b8
--- /dev/null
+++ b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php
@@ -0,0 +1,138 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use PhpAmqpLib\Channel\AMQPChannel;
+use PhpAmqpLib\Message\AMQPMessage;
+use PhpAmqpLib\Wire\AMQPTable;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+
+/**
+ * Class AmqpQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class AmqpQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var AMQPChannel
+ */
+ protected $channel;
+
+ /**
+ * @var string
+ */
+ protected $exchange = '';
+
+ /**
+ * @var string
+ */
+ protected $queue = '';
+
+ /**
+ * AmqpQueueAdapter constructor.
+ *
+ * @param AMQPChannel $channel
+ * @param string $queue
+ * @param string $exchange
+ */
+ public function __construct(AMQPChannel $channel, $queue, $exchange)
+ {
+ $this->channel = $channel;
+ $this->exchange = $exchange;
+ $this->queue = $queue;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain'));
+ $this->channel->basic_publish($message, $this->exchange);
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $now = new DateTime();
+ $when = clone($dateTime);
+ $delay = $when->getTimestamp() - $now->getTimestamp();
+
+ $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2));
+ $message->set('application_headers', new AMQPTable(array('x-delay' => $delay)));
+
+ $this->channel->basic_publish($message, $this->exchange);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $message = null;
+
+ $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) {
+ $message = $msg;
+ $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
+ });
+
+ while (count($this->channel->callbacks)) {
+ $this->channel->wait();
+ }
+
+ if ($message === null) {
+ return null;
+ }
+
+ $job = new Job();
+ $job->setId($message->get('delivery_tag'));
+ $job->unserialize($message->getBody());
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->channel->basic_ack($job->getId());
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->channel->basic_nack($job->getId());
+ return $this;
+ }
+}
diff --git a/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php
new file mode 100644
index 00000000..407f60e2
--- /dev/null
+++ b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php
@@ -0,0 +1,120 @@
+<?php
+
+namespace SimpleQueue\Adapter;
+
+use DateTime;
+use Pheanstalk\Job as BeanstalkJob;
+use Pheanstalk\Pheanstalk;
+use Pheanstalk\PheanstalkInterface;
+use SimpleQueue\Job;
+use SimpleQueue\QueueAdapterInterface;
+
+/**
+ * Class BeanstalkQueueAdapter
+ *
+ * @package SimpleQueue\Adapter
+ */
+class BeanstalkQueueAdapter implements QueueAdapterInterface
+{
+ /**
+ * @var PheanstalkInterface
+ */
+ protected $beanstalk;
+
+ /**
+ * @var string
+ */
+ protected $queueName = '';
+
+ /**
+ * BeanstalkQueueAdapter constructor.
+ *
+ * @param PheanstalkInterface $beanstalk
+ * @param string $queueName
+ */
+ public function __construct(PheanstalkInterface $beanstalk, $queueName)
+ {
+ $this->beanstalk = $beanstalk;
+ $this->queueName = $queueName;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->beanstalk->putInTube($this->queueName, $job->serialize());
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $now = new DateTime();
+ $when = clone($dateTime);
+ $delay = $when->getTimestamp() - $now->getTimestamp();
+
+ $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName);
+
+ if ($beanstalkJob === false) {
+ return null;
+ }
+
+ $job = new Job();
+ $job->setId($beanstalkJob->getId());
+ $job->unserialize($beanstalkJob->getData());
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
+ $this->beanstalk->delete($beanstalkJob);
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
+ $this->beanstalk->bury($beanstalkJob);
+ return $this;
+ }
+}
diff --git a/libs/SimpleQueue/Exception/NotSupportedException.php b/libs/SimpleQueue/Exception/NotSupportedException.php
new file mode 100644
index 00000000..36106659
--- /dev/null
+++ b/libs/SimpleQueue/Exception/NotSupportedException.php
@@ -0,0 +1,14 @@
+<?php
+
+namespace SimpleQueue\Exception;
+
+use Exception;
+
+/**
+ * Class NotSupportedException
+ *
+ * @package SimpleQueue\Exception
+ */
+class NotSupportedException extends Exception
+{
+}
diff --git a/libs/SimpleQueue/Job.php b/libs/SimpleQueue/Job.php
new file mode 100644
index 00000000..799bbba8
--- /dev/null
+++ b/libs/SimpleQueue/Job.php
@@ -0,0 +1,98 @@
+<?php
+
+namespace SimpleQueue;
+
+/**
+ * Class Job
+ *
+ * @package SimpleQueue
+ */
+class Job
+{
+ protected $id;
+ protected $body;
+
+ /**
+ * Job constructor.
+ *
+ * @param null $body
+ * @param null $id
+ */
+ public function __construct($body = null, $id = null)
+ {
+ $this->body = $body;
+ $this->id = $id;
+ }
+
+ /**
+ * Unserialize a payload
+ *
+ * @param string $payload
+ * @return $this
+ */
+ public function unserialize($payload)
+ {
+ $this->body = json_decode($payload, true);
+ return $this;
+ }
+
+ /**
+ * Serialize the body
+ *
+ * @return string
+ */
+ public function serialize()
+ {
+ return json_encode($this->body);
+ }
+
+ /**
+ * Set body
+ *
+ * @param mixed $body
+ * @return Job
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ return $this;
+ }
+
+ /**
+ * Get body
+ *
+ * @return mixed
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * Set job ID
+ *
+ * @param mixed $jobId
+ * @return Job
+ */
+ public function setId($jobId)
+ {
+ $this->id = $jobId;
+ return $this;
+ }
+
+ /**
+ * Get job ID
+ * @return mixed
+ */
+ public function getId()
+ {
+ return $this->id;
+ }
+
+ /**
+ * Execute job
+ */
+ public function execute()
+ {
+ }
+}
diff --git a/libs/SimpleQueue/Queue.php b/libs/SimpleQueue/Queue.php
new file mode 100644
index 00000000..a88b55cb
--- /dev/null
+++ b/libs/SimpleQueue/Queue.php
@@ -0,0 +1,92 @@
+<?php
+
+namespace SimpleQueue;
+
+use DateTime;
+
+/**
+ * Class Queue
+ *
+ * @package SimpleQueue
+ */
+class Queue implements QueueAdapterInterface
+{
+ /**
+ * @var QueueAdapterInterface
+ */
+ protected $queueAdapter;
+
+ /**
+ * Queue constructor.
+ *
+ * @param QueueAdapterInterface $queueAdapter
+ */
+ public function __construct(QueueAdapterInterface $queueAdapter)
+ {
+ $this->queueAdapter = $queueAdapter;
+ }
+
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job)
+ {
+ $this->queueAdapter->push($job);
+ return $this;
+ }
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime)
+ {
+ $this->queueAdapter->schedule($job, $dateTime);
+ return $this;
+ }
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull()
+ {
+ return $this->queueAdapter->pull();
+ }
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job)
+ {
+ $this->queueAdapter->completed($job);
+ return $this;
+ }
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job)
+ {
+ $this->queueAdapter->failed($job);
+ return $this;
+ }
+}
diff --git a/libs/SimpleQueue/QueueAdapterInterface.php b/libs/SimpleQueue/QueueAdapterInterface.php
new file mode 100644
index 00000000..9bda3070
--- /dev/null
+++ b/libs/SimpleQueue/QueueAdapterInterface.php
@@ -0,0 +1,58 @@
+<?php
+
+namespace SimpleQueue;
+
+use DateTime;
+
+/**
+ * Interface AdapterInterface
+ *
+ * @package SimpleQueue\Adapter
+ */
+interface QueueAdapterInterface
+{
+ /**
+ * Send a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function push(Job $job);
+
+ /**
+ * Schedule a job in the future
+ *
+ * @access public
+ * @param Job $job
+ * @param DateTime $dateTime
+ * @return $this
+ */
+ public function schedule(Job $job, DateTime $dateTime);
+
+ /**
+ * Wait and get job from a queue
+ *
+ * @access public
+ * @return Job|null
+ */
+ public function pull();
+
+ /**
+ * Acknowledge a job
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function completed(Job $job);
+
+ /**
+ * Mark a job as failed
+ *
+ * @access public
+ * @param Job $job
+ * @return $this
+ */
+ public function failed(Job $job);
+}