From a4642d17e0e1ea018b128efdcc3db281461458b1 Mon Sep 17 00:00:00 2001 From: Frédéric Guillot Date: Wed, 4 Apr 2018 15:21:13 -0700 Subject: Move custom libs to the source tree --- libs/SimpleQueue/Adapter/AmqpQueueAdapter.php | 138 +++++++++++++++++++++ libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php | 120 ++++++++++++++++++ .../Exception/NotSupportedException.php | 14 +++ libs/SimpleQueue/Job.php | 98 +++++++++++++++ libs/SimpleQueue/Queue.php | 92 ++++++++++++++ libs/SimpleQueue/QueueAdapterInterface.php | 58 +++++++++ 6 files changed, 520 insertions(+) create mode 100644 libs/SimpleQueue/Adapter/AmqpQueueAdapter.php create mode 100644 libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php create mode 100644 libs/SimpleQueue/Exception/NotSupportedException.php create mode 100644 libs/SimpleQueue/Job.php create mode 100644 libs/SimpleQueue/Queue.php create mode 100644 libs/SimpleQueue/QueueAdapterInterface.php (limited to 'libs/SimpleQueue') diff --git a/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php new file mode 100644 index 00000000..379dd9b8 --- /dev/null +++ b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php @@ -0,0 +1,138 @@ +channel = $channel; + $this->exchange = $exchange; + $this->queue = $queue; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); + $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); + + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $message = null; + + $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { + $message = $msg; + $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); + }); + + while (count($this->channel->callbacks)) { + $this->channel->wait(); + } + + if ($message === null) { + return null; + } + + $job = new Job(); + $job->setId($message->get('delivery_tag')); + $job->unserialize($message->getBody()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->channel->basic_ack($job->getId()); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->channel->basic_nack($job->getId()); + return $this; + } +} diff --git a/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} diff --git a/libs/SimpleQueue/Exception/NotSupportedException.php b/libs/SimpleQueue/Exception/NotSupportedException.php new file mode 100644 index 00000000..36106659 --- /dev/null +++ b/libs/SimpleQueue/Exception/NotSupportedException.php @@ -0,0 +1,14 @@ +body = $body; + $this->id = $id; + } + + /** + * Unserialize a payload + * + * @param string $payload + * @return $this + */ + public function unserialize($payload) + { + $this->body = json_decode($payload, true); + return $this; + } + + /** + * Serialize the body + * + * @return string + */ + public function serialize() + { + return json_encode($this->body); + } + + /** + * Set body + * + * @param mixed $body + * @return Job + */ + public function setBody($body) + { + $this->body = $body; + return $this; + } + + /** + * Get body + * + * @return mixed + */ + public function getBody() + { + return $this->body; + } + + /** + * Set job ID + * + * @param mixed $jobId + * @return Job + */ + public function setId($jobId) + { + $this->id = $jobId; + return $this; + } + + /** + * Get job ID + * @return mixed + */ + public function getId() + { + return $this->id; + } + + /** + * Execute job + */ + public function execute() + { + } +} diff --git a/libs/SimpleQueue/Queue.php b/libs/SimpleQueue/Queue.php new file mode 100644 index 00000000..a88b55cb --- /dev/null +++ b/libs/SimpleQueue/Queue.php @@ -0,0 +1,92 @@ +queueAdapter = $queueAdapter; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->queueAdapter->push($job); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $this->queueAdapter->schedule($job, $dateTime); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + return $this->queueAdapter->pull(); + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->queueAdapter->completed($job); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->queueAdapter->failed($job); + return $this; + } +} diff --git a/libs/SimpleQueue/QueueAdapterInterface.php b/libs/SimpleQueue/QueueAdapterInterface.php new file mode 100644 index 00000000..9bda3070 --- /dev/null +++ b/libs/SimpleQueue/QueueAdapterInterface.php @@ -0,0 +1,58 @@ +