diff options
author | Frédéric Guillot <fred@kanboard.net> | 2018-04-04 15:21:13 -0700 |
---|---|---|
committer | Frédéric Guillot <fred@kanboard.net> | 2018-04-04 15:21:13 -0700 |
commit | a4642d17e0e1ea018b128efdcc3db281461458b1 (patch) | |
tree | 00210c3d0abd0adea7f8817e6ba1d82c1ea4b50e /vendor/fguillot/simple-queue/src/Adapter | |
parent | 62178b1f2b4ad6ed8eafbcd3be8ef2f46b041b82 (diff) |
Move custom libs to the source tree
Diffstat (limited to 'vendor/fguillot/simple-queue/src/Adapter')
5 files changed, 0 insertions, 617 deletions
diff --git a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php deleted file mode 100644 index 379dd9b8..00000000 --- a/vendor/fguillot/simple-queue/src/Adapter/AmqpQueueAdapter.php +++ /dev/null @@ -1,138 +0,0 @@ -<?php - -namespace SimpleQueue\Adapter; - -use DateTime; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Message\AMQPMessage; -use PhpAmqpLib\Wire\AMQPTable; -use SimpleQueue\Job; -use SimpleQueue\QueueAdapterInterface; - -/** - * Class AmqpQueueAdapter - * - * @package SimpleQueue\Adapter - */ -class AmqpQueueAdapter implements QueueAdapterInterface -{ - /** - * @var AMQPChannel - */ - protected $channel; - - /** - * @var string - */ - protected $exchange = ''; - - /** - * @var string - */ - protected $queue = ''; - - /** - * AmqpQueueAdapter constructor. - * - * @param AMQPChannel $channel - * @param string $queue - * @param string $exchange - */ - public function __construct(AMQPChannel $channel, $queue, $exchange) - { - $this->channel = $channel; - $this->exchange = $exchange; - $this->queue = $queue; - } - - /** - * Send a job - * - * @access public - * @param Job $job - * @return $this - */ - public function push(Job $job) - { - $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); - $this->channel->basic_publish($message, $this->exchange); - return $this; - } - - /** - * Schedule a job in the future - * - * @access public - * @param Job $job - * @param DateTime $dateTime - * @return $this - */ - public function schedule(Job $job, DateTime $dateTime) - { - $now = new DateTime(); - $when = clone($dateTime); - $delay = $when->getTimestamp() - $now->getTimestamp(); - - $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); - $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); - - $this->channel->basic_publish($message, $this->exchange); - return $this; - } - - /** - * Wait and get job from a queue - * - * @access public - * @return Job|null - */ - public function pull() - { - $message = null; - - $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { - $message = $msg; - $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); - }); - - while (count($this->channel->callbacks)) { - $this->channel->wait(); - } - - if ($message === null) { - return null; - } - - $job = new Job(); - $job->setId($message->get('delivery_tag')); - $job->unserialize($message->getBody()); - - return $job; - } - - /** - * Acknowledge a job - * - * @access public - * @param Job $job - * @return $this - */ - public function completed(Job $job) - { - $this->channel->basic_ack($job->getId()); - return $this; - } - - /** - * Mark a job as failed - * - * @access public - * @param Job $job - * @return $this - */ - public function failed(Job $job) - { - $this->channel->basic_nack($job->getId()); - return $this; - } -} diff --git a/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php deleted file mode 100644 index 01377317..00000000 --- a/vendor/fguillot/simple-queue/src/Adapter/AwsSqsQueueAdapter.php +++ /dev/null @@ -1,150 +0,0 @@ -<?php - -namespace SimpleQueue\Adapter; - -use Aws\Sqs\SqsClient; -use SimpleQueue\Job; -use SimpleQueue\QueueAdapterInterface; -use DateTime; - -/** - * Class AwsSqsQueueAdapter - * - * @package SimpleQueue\Adapter - * @author George Webb <george@webb.uno> - */ -class AwsSqsQueueAdapter implements QueueAdapterInterface -{ - /** - * @var string - */ - private $queueName; - - /** - * @var SqsClient - */ - private $sqsClient; - - /** - * @var string - */ - private $sqsUrl; - - /** - * @var array - */ - private $config; - - /** - * AwsSqsQueueAdapter constructor. - * - * @param string $queueName The name of the SQS queue - * @param SqsClient $sqsClient An SQS client - * @param array $config Array of config values - */ - public function __construct($queueName, SqsClient $sqsClient, $config = array()) - { - $this->queueName = $queueName; - $this->sqsClient = $sqsClient; - $this->sqsUrl = $this->sqsClient->getQueueUrl(array('QueueName' => $this->queueName))->get('QueueUrl'); - $this->config = $config; - } - - /** - * Send a job - * - * @access public - * @param Job $job - * @return $this - */ - public function push(Job $job) - { - $this->sqsClient->sendMessage(array( - 'QueueUrl' => $this->sqsUrl, - 'MessageBody' => $job->serialize() - )); - return $this; - } - - /** - * Schedule a job in the future - * - * @access public - * @param Job $job - * @param DateTime $dateTime - * @return $this - */ - public function schedule(Job $job, DateTime $dateTime) - { - $now = new DateTime(); - $when = clone($dateTime); - $delay = $when->getTimestamp() - $now->getTimestamp(); - - $this->sqsClient->sendMessage(array( - 'QueueUrl' => $this->sqsUrl, - 'MessageBody' => $job->serialize(), - 'VisibilityTimeout' => $delay - )); - - return $this; - } - - /** - * Wait and get job from a queue - * - * @access public - * @return Job|null - */ - public function pull() - { - $result = $this->sqsClient->receiveMessage(array( - 'QueueUrl' => $this->sqsUrl, - 'WaitTimeSeconds' => empty($this->config['LongPollingTime']) ? 0 : (int) $this->config['LongPollingTime'] - )); - - if ($result['Messages'] == null) { - return null; - } - - $resultMessage = array_pop($result['Messages']); - - $job = new Job(); - $job->setId($resultMessage['ReceiptHandle']); - $job->unserialize($resultMessage['Body']); - - return $job; - } - - /** - * Acknowledge a job - * - * @access public - * @param Job $job - * @return $this - */ - public function completed(Job $job) - { - $this->sqsClient->deleteMessage(array( - 'QueueUrl' => $this->sqsUrl, - 'ReceiptHandle' => $job->getId() - )); - return $this; - } - - /** - * Mark a job as failed - * - * @access public - * @param Job $job - * @return $this - */ - public function failed(Job $job) - { - $this->sqsClient->changeMessageVisibility(array( - 'QueueUrl' => $this->sqsUrl, - 'ReceiptHandle' => $job->getId(), - 'VisibilityTimeout' => 0 - )); - return $this; - } -} diff --git a/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php deleted file mode 100644 index 407f60e2..00000000 --- a/vendor/fguillot/simple-queue/src/Adapter/BeanstalkQueueAdapter.php +++ /dev/null @@ -1,120 +0,0 @@ -<?php - -namespace SimpleQueue\Adapter; - -use DateTime; -use Pheanstalk\Job as BeanstalkJob; -use Pheanstalk\Pheanstalk; -use Pheanstalk\PheanstalkInterface; -use SimpleQueue\Job; -use SimpleQueue\QueueAdapterInterface; - -/** - * Class BeanstalkQueueAdapter - * - * @package SimpleQueue\Adapter - */ -class BeanstalkQueueAdapter implements QueueAdapterInterface -{ - /** - * @var PheanstalkInterface - */ - protected $beanstalk; - - /** - * @var string - */ - protected $queueName = ''; - - /** - * BeanstalkQueueAdapter constructor. - * - * @param PheanstalkInterface $beanstalk - * @param string $queueName - */ - public function __construct(PheanstalkInterface $beanstalk, $queueName) - { - $this->beanstalk = $beanstalk; - $this->queueName = $queueName; - } - - /** - * Send a job - * - * @access public - * @param Job $job - * @return $this - */ - public function push(Job $job) - { - $this->beanstalk->putInTube($this->queueName, $job->serialize()); - return $this; - } - - /** - * Schedule a job in the future - * - * @access public - * @param Job $job - * @param DateTime $dateTime - * @return $this - */ - public function schedule(Job $job, DateTime $dateTime) - { - $now = new DateTime(); - $when = clone($dateTime); - $delay = $when->getTimestamp() - $now->getTimestamp(); - - $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); - return $this; - } - - /** - * Wait and get job from a queue - * - * @access public - * @return Job|null - */ - public function pull() - { - $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); - - if ($beanstalkJob === false) { - return null; - } - - $job = new Job(); - $job->setId($beanstalkJob->getId()); - $job->unserialize($beanstalkJob->getData()); - - return $job; - } - - /** - * Acknowledge a job - * - * @access public - * @param Job $job - * @return $this - */ - public function completed(Job $job) - { - $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); - $this->beanstalk->delete($beanstalkJob); - return $this; - } - - /** - * Mark a job as failed - * - * @access public - * @param Job $job - * @return $this - */ - public function failed(Job $job) - { - $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); - $this->beanstalk->bury($beanstalkJob); - return $this; - } -} diff --git a/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php deleted file mode 100644 index 047658d7..00000000 --- a/vendor/fguillot/simple-queue/src/Adapter/DisqueQueueAdapter.php +++ /dev/null @@ -1,109 +0,0 @@ -<?php - -namespace SimpleQueue\Adapter; - -use DateTime; -use Disque\Client as DisqueClient; -use Disque\Queue\Job as DisqueJob; -use SimpleQueue\Job; -use SimpleQueue\QueueAdapterInterface; - -/** - * Class DisqueQueueAdapter - * - * @package SimpleQueue\Adapter - */ -class DisqueQueueAdapter implements QueueAdapterInterface -{ - /** - * @var DisqueClient - */ - protected $disque; - - /** - * @var string - */ - protected $queueName; - - /** - * DisqueQueueAdapter constructor. - * - * @param DisqueClient $disque - * @param string $queueName - */ - public function __construct(DisqueClient $disque, $queueName) - { - $this->disque = $disque; - $this->queueName = $queueName; - } - - /** - * Send a job - * - * @access public - * @param Job $job - * @return $this - */ - public function push(Job $job) - { - $this->disque->queue($this->queueName)->push(new DisqueJob($job->getBody())); - return $this; - } - - /** - * Schedule a job in the future - * - * @access public - * @param Job $job - * @param DateTime $dateTime - * @return $this - */ - public function schedule(Job $job, DateTime $dateTime) - { - $this->disque->queue($this->queueName)->schedule(new DisqueJob($job->serialize()), $dateTime); - return $this; - } - - /** - * Wait and get job from a queue - * - * @access public - * @return Job|null - */ - public function pull() - { - $disqueJob = $this->disque->queue($this->queueName)->pull(); - - if ($disqueJob === null) { - return null; - } - - return new Job($disqueJob->getBody(), $disqueJob->getId()); - } - - /** - * Acknowledge a job - * - * @access public - * @param Job $job - * @return $this - */ - public function completed(Job $job) - { - $this->disque->queue($this->queueName)->processed(new DisqueJob($job->getBody(), $job->getId())); - return $this; - } - - /** - * Mark a job as failed - * - * @access public - * @param Job $job - * @return $this - */ - public function failed(Job $job) - { - $this->disque->queue($this->queueName)->failed(new DisqueJob($job->getBody(), $job->getId())); - return $this; - } -} diff --git a/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php b/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php deleted file mode 100644 index 36375d5b..00000000 --- a/vendor/fguillot/simple-queue/src/Adapter/MemoryQueueAdapter.php +++ /dev/null @@ -1,100 +0,0 @@ -<?php - -namespace SimpleQueue\Adapter; - -use DateTime; -use Exception; -use SimpleQueue\Exception\NotSupportedException; -use SimpleQueue\QueueAdapterInterface; -use SimpleQueue\Job; -use SplQueue; - -/** - * Class MemoryAdapter - * - * @package SimpleQueue\Adapter - */ -class MemoryQueueAdapter implements QueueAdapterInterface -{ - /** - * @var SplQueue - */ - protected $queue; - - /** - * MemoryAdapter constructor. - */ - public function __construct() - { - $this->queue = new SplQueue(); - } - - /** - * Send a job - * - * @access public - * @param Job $job - * @return $this - */ - public function push(Job $job) - { - $this->queue->enqueue($job->serialize()); - return $this; - } - - /** - * Schedule a job in the future - * - * @access public - * @param Job $job - * @param DateTime $dateTime - * @return bool - * @throws NotSupportedException - */ - public function schedule(Job $job, DateTime $dateTime) - { - throw new NotSupportedException('Job delay is not supported by MemoryQueue'); - } - - /** - * Wait and get job from a queue - * - * @access public - * @return Job|null - */ - public function pull() - { - try { - $job = new Job(); - $payload = $this->queue->dequeue(); - return $job->unserialize($payload); - } catch (Exception $e) { - return null; - } - } - - /** - * Acknowledge a job - * - * @access public - * @param Job $job - * @return $this - */ - public function completed(Job $job) - { - return $this; - } - - /** - * Mark a job as failed - * - * @access public - * @param Job $job - * @return $this - */ - public function failed(Job $job) - { - $this->queue->enqueue($job->serialize()); - return $this; - } -} |