summaryrefslogtreecommitdiff
path: root/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php')
-rw-r--r--vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php138
1 files changed, 0 insertions, 138 deletions
diff --git a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php
deleted file mode 100644
index 379dd9b8..00000000
--- a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php
+++ /dev/null
@@ -1,138 +0,0 @@
-<?php
-
-namespace SimpleQueue\Adapter;
-
-use DateTime;
-use PhpAmqpLib\Channel\AMQPChannel;
-use PhpAmqpLib\Message\AMQPMessage;
-use PhpAmqpLib\Wire\AMQPTable;
-use SimpleQueue\Job;
-use SimpleQueue\QueueAdapterInterface;
-
-/**
- * Class AmqpQueueAdapter
- *
- * @package SimpleQueue\Adapter
- */
-class AmqpQueueAdapter implements QueueAdapterInterface
-{
- /**
- * @var AMQPChannel
- */
- protected $channel;
-
- /**
- * @var string
- */
- protected $exchange = '';
-
- /**
- * @var string
- */
- protected $queue = '';
-
- /**
- * AmqpQueueAdapter constructor.
- *
- * @param AMQPChannel $channel
- * @param string $queue
- * @param string $exchange
- */
- public function __construct(AMQPChannel $channel, $queue, $exchange)
- {
- $this->channel = $channel;
- $this->exchange = $exchange;
- $this->queue = $queue;
- }
-
- /**
- * Send a job
- *
- * @access public
- * @param Job $job
- * @return $this
- */
- public function push(Job $job)
- {
- $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain'));
- $this->channel->basic_publish($message, $this->exchange);
- return $this;
- }
-
- /**
- * Schedule a job in the future
- *
- * @access public
- * @param Job $job
- * @param DateTime $dateTime
- * @return $this
- */
- public function schedule(Job $job, DateTime $dateTime)
- {
- $now = new DateTime();
- $when = clone($dateTime);
- $delay = $when->getTimestamp() - $now->getTimestamp();
-
- $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2));
- $message->set('application_headers', new AMQPTable(array('x-delay' => $delay)));
-
- $this->channel->basic_publish($message, $this->exchange);
- return $this;
- }
-
- /**
- * Wait and get job from a queue
- *
- * @access public
- * @return Job|null
- */
- public function pull()
- {
- $message = null;
-
- $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) {
- $message = $msg;
- $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
- });
-
- while (count($this->channel->callbacks)) {
- $this->channel->wait();
- }
-
- if ($message === null) {
- return null;
- }
-
- $job = new Job();
- $job->setId($message->get('delivery_tag'));
- $job->unserialize($message->getBody());
-
- return $job;
- }
-
- /**
- * Acknowledge a job
- *
- * @access public
- * @param Job $job
- * @return $this
- */
- public function completed(Job $job)
- {
- $this->channel->basic_ack($job->getId());
- return $this;
- }
-
- /**
- * Mark a job as failed
- *
- * @access public
- * @param Job $job
- * @return $this
- */
- public function failed(Job $job)
- {
- $this->channel->basic_nack($job->getId());
- return $this;
- }
-}