From a4642d17e0e1ea018b128efdcc3db281461458b1 Mon Sep 17 00:00:00 2001 From: Frédéric Guillot Date: Wed, 4 Apr 2018 15:21:13 -0700 Subject: Move custom libs to the source tree --- libs/SimpleQueue/Adapter/AmqpQueueAdapter.php | 138 ++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 libs/SimpleQueue/Adapter/AmqpQueueAdapter.php (limited to 'libs/SimpleQueue/Adapter/AmqpQueueAdapter.php') diff --git a/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php new file mode 100644 index 00000000..379dd9b8 --- /dev/null +++ b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php @@ -0,0 +1,138 @@ +channel = $channel; + $this->exchange = $exchange; + $this->queue = $queue; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); + $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); + + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $message = null; + + $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { + $message = $msg; + $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); + }); + + while (count($this->channel->callbacks)) { + $this->channel->wait(); + } + + if ($message === null) { + return null; + } + + $job = new Job(); + $job->setId($message->get('delivery_tag')); + $job->unserialize($message->getBody()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->channel->basic_ack($job->getId()); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->channel->basic_nack($job->getId()); + return $this; + } +} -- cgit v1.2.3