diff options
Diffstat (limited to 'libs/SimpleQueue/Adapter')
-rw-r--r-- | libs/SimpleQueue/Adapter/AmqpQueueAdapter.php | 138 | ||||
-rw-r--r-- | libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php | 120 |
2 files changed, 258 insertions, 0 deletions
diff --git a/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php new file mode 100644 index 00000000..379dd9b8 --- /dev/null +++ b/libs/SimpleQueue/Adapter/AmqpQueueAdapter.php @@ -0,0 +1,138 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class AmqpQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class AmqpQueueAdapter implements QueueAdapterInterface +{ + /** + * @var AMQPChannel + */ + protected $channel; + + /** + * @var string + */ + protected $exchange = ''; + + /** + * @var string + */ + protected $queue = ''; + + /** + * AmqpQueueAdapter constructor. + * + * @param AMQPChannel $channel + * @param string $queue + * @param string $exchange + */ + public function __construct(AMQPChannel $channel, $queue, $exchange) + { + $this->channel = $channel; + $this->exchange = $exchange; + $this->queue = $queue; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); + $message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); + + $this->channel->basic_publish($message, $this->exchange); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $message = null; + + $this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { + $message = $msg; + $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); + }); + + while (count($this->channel->callbacks)) { + $this->channel->wait(); + } + + if ($message === null) { + return null; + } + + $job = new Job(); + $job->setId($message->get('delivery_tag')); + $job->unserialize($message->getBody()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $this->channel->basic_ack($job->getId()); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $this->channel->basic_nack($job->getId()); + return $this; + } +} diff --git a/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php new file mode 100644 index 00000000..407f60e2 --- /dev/null +++ b/libs/SimpleQueue/Adapter/BeanstalkQueueAdapter.php @@ -0,0 +1,120 @@ +<?php + +namespace SimpleQueue\Adapter; + +use DateTime; +use Pheanstalk\Job as BeanstalkJob; +use Pheanstalk\Pheanstalk; +use Pheanstalk\PheanstalkInterface; +use SimpleQueue\Job; +use SimpleQueue\QueueAdapterInterface; + +/** + * Class BeanstalkQueueAdapter + * + * @package SimpleQueue\Adapter + */ +class BeanstalkQueueAdapter implements QueueAdapterInterface +{ + /** + * @var PheanstalkInterface + */ + protected $beanstalk; + + /** + * @var string + */ + protected $queueName = ''; + + /** + * BeanstalkQueueAdapter constructor. + * + * @param PheanstalkInterface $beanstalk + * @param string $queueName + */ + public function __construct(PheanstalkInterface $beanstalk, $queueName) + { + $this->beanstalk = $beanstalk; + $this->queueName = $queueName; + } + + /** + * Send a job + * + * @access public + * @param Job $job + * @return $this + */ + public function push(Job $job) + { + $this->beanstalk->putInTube($this->queueName, $job->serialize()); + return $this; + } + + /** + * Schedule a job in the future + * + * @access public + * @param Job $job + * @param DateTime $dateTime + * @return $this + */ + public function schedule(Job $job, DateTime $dateTime) + { + $now = new DateTime(); + $when = clone($dateTime); + $delay = $when->getTimestamp() - $now->getTimestamp(); + + $this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); + return $this; + } + + /** + * Wait and get job from a queue + * + * @access public + * @return Job|null + */ + public function pull() + { + $beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); + + if ($beanstalkJob === false) { + return null; + } + + $job = new Job(); + $job->setId($beanstalkJob->getId()); + $job->unserialize($beanstalkJob->getData()); + + return $job; + } + + /** + * Acknowledge a job + * + * @access public + * @param Job $job + * @return $this + */ + public function completed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->delete($beanstalkJob); + return $this; + } + + /** + * Mark a job as failed + * + * @access public + * @param Job $job + * @return $this + */ + public function failed(Job $job) + { + $beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); + $this->beanstalk->bury($beanstalkJob); + return $this; + } +} |