diff options
Diffstat (limited to 'libs/SimpleQueue')
-rw-r--r-- | libs/SimpleQueue/Adapter/AmqpQueueAdapter.php | 138 | ||||
-rw-r--r-- | libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php | 120 | ||||
-rw-r--r-- | libs/SimpleQueue/Exception/NotSupportedException.php | 14 | ||||
-rw-r--r-- | libs/SimpleQueue/Job.php | 98 | ||||
-rw-r--r-- | libs/SimpleQueue/Queue.php | 92 | ||||
-rw-r--r-- | libs/SimpleQueue/QueueAdapterInterface.php | 58 |
6 files changed, 520 insertions, 0 deletions
diff --git a/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php new file mode 100644 index 00000000..379dd9b8 --- /dev/null +++ b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php @@ -0,0 +1,138 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class AmqpQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class AmqpQueueAdapter implements QueueAdapterInterface +{ + /** + * @var AMQPChannel + */ + protected $channel; + + /** + * @var string + */ + protected $exchange = ''; + + /** + * @var string + */ + protected $queue = ''; + + /** + * AmqpQueueAdapter constructor. + * + * @param AMQPChannel $channel + * @param string $queue + * @param string $exchange + */ + public function __construct(AMQPChannel $channel, $queue, $exchange) + { + $this->channel = $channel; + $this->exchange = $exchange; + $this->queue = $queue; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); + $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); + + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $message = null; + + $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { + $message = $msg; + $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); + }); + + while (count($this->channel->callbacks)) { + $this->channel->wait(); + } + + if ($message === null) { + return null; + } + + $job = new Job(); + $job->setId($message->get('delivery_tag')); + $job->unserialize($message->getBody()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->channel->basic_ack($job->getId()); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->channel->basic_nack($job->getId()); + return $this; + } +} diff --git a/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Pheanstalk\Job as BeanstalkJob; +use Pheanstalk\Pheanstalk; +use Pheanstalk\PheanstalkInterface; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class BeanstalkQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class BeanstalkQueueAdapter implements QueueAdapterInterface +{ + /** + * @var PheanstalkInterface + */ + protected $beanstalk; + + /** + * @var string + */ + protected $queueName = ''; + + /** + * BeanstalkQueueAdapter constructor. + * + * @param PheanstalkInterface $beanstalk + * @param string $queueName + */ + public function __construct(PheanstalkInterface $beanstalk, $queueName) + { + $this->beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} diff --git a/libs/SimpleQueue/Exception/NotSupportedException.php b/libs/SimpleQueue/Exception/NotSupportedException.php new file mode 100644 index 00000000..36106659 --- /dev/null +++ b/libs/SimpleQueue/Exception/NotSupportedException.php @@ -0,0 +1,14 @@ +<?php + +namespace SimpleQueue\Exception; + +use Exception; + +/** + * Class NotSupportedException + * + * @package SimpleQueue\Exception + */ +class NotSupportedException extends Exception +{ +} diff --git a/libs/SimpleQueue/Job.php b/libs/SimpleQueue/Job.php new file mode 100644 index 00000000..799bbba8 --- /dev/null +++ b/libs/SimpleQueue/Job.php @@ -0,0 +1,98 @@ +<?php + +namespace SimpleQueue; + +/** + * Class Job + * + * @package SimpleQueue + */ +class Job +{ + protected $id; + protected $body; + + /** + * Job constructor. + * + * @param null $body + * @param null $id + */ + public function __construct($body = null, $id = null) + { + $this->body = $body; + $this->id = $id; + } + + /** + * Unserialize a payload + * + * @param string $payload + * @return $this + */ + public function unserialize($payload) + { + $this->body = json_decode($payload, true); + return $this; + } + + /** + * Serialize the body + * + * @return string + */ + public function serialize() + { + return json_encode($this->body); + } + + /** + * Set body + * + * @param mixed $body + * @return Job + */ + public function setBody($body) + { + $this->body = $body; + return $this; + } + + /** + * Get body + * + * @return mixed + */ + public function getBody() + { + return $this->body; + } + + /** + * Set job ID + * + * @param mixed $jobId + * @return Job + */ + public function setId($jobId) + { + $this->id = $jobId; + return $this; + } + + /** + * Get job ID + * @return mixed + */ + public function getId() + { + return $this->id; + } + + /** + * Execute job + */ + public function execute() + { + } +} diff --git a/libs/SimpleQueue/Queue.php b/libs/SimpleQueue/Queue.php new file mode 100644 index 00000000..a88b55cb --- /dev/null +++ b/libs/SimpleQueue/Queue.php @@ -0,0 +1,92 @@ +<?php + +namespace SimpleQueue; + +use DateTime; + +/** + * Class Queue + * + * @package SimpleQueue + */ +class Queue implements QueueAdapterInterface +{ + /** + * @var QueueAdapterInterface + */ + protected $queueAdapter; + + /** + * Queue constructor. + * + * @param QueueAdapterInterface $queueAdapter + */ + public function __construct(QueueAdapterInterface $queueAdapter) + { + $this->queueAdapter = $queueAdapter; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->queueAdapter->push($job); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $this->queueAdapter->schedule($job, $dateTime); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + return $this->queueAdapter->pull(); + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->queueAdapter->completed($job); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->queueAdapter->failed($job); + return $this; + } +} diff --git a/libs/SimpleQueue/QueueAdapterInterface.php b/libs/SimpleQueue/QueueAdapterInterface.php new file mode 100644 index 00000000..9bda3070 --- /dev/null +++ b/libs/SimpleQueue/QueueAdapterInterface.php @@ -0,0 +1,58 @@ +<?php + +namespace SimpleQueue; + +use DateTime; + +/** + * Interface AdapterInterface + * + * @package SimpleQueue\Adapter + */ +interface QueueAdapterInterface +{ + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job); + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime); + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull(); + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job); + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job); +} |