PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

互联网 20-5-15

延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\QueueBuilder; class AmqpBuilder extends QueueBuilder {     /**      * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments      *      * @return \Hyperf\Amqp\Builder\Builder      */     public function setArguments($arguments) : Builder     {         $this->arguments = array_merge($this->arguments, $arguments);         return $this;     }     /**      * 设置延时队列相关参数      *      * @param string $queueName      * @param int    $xMessageTtl      * @param string $xDeadLetterExchange      * @param string $xDeadLetterRoutingKey      *      * @return $this      */     public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self     {         $this->setArguments([             'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒             'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],             'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],         ]);         $this->setQueue($queueName);         return $this;     } }

DelayProducer.php

<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Di\Annotation\AnnotationCollector; use PhpAmqpLib\Message\AMQPMessage; use Throwable; class DelayProducer extends Builder {     /**      * @param ProducerMessageInterface $producerMessage      * @param AmqpBuilder              $queueBuilder      * @param bool                     $confirm      * @param int                      $timeout      *      * @return bool      * @throws \Throwable      */     public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool     {         return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)         {             return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);         });     }     /**      * @param ProducerMessageInterface $producerMessage      * @param AmqpBuilder              $queueBuilder      * @param bool                     $confirm      * @param int                      $timeout      *      * @return bool      * @throws \Throwable      */     private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool     {         $result = false;         $this->injectMessageProperty($producerMessage);         $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());         $pool    = $this->getConnectionPool($producerMessage->getPoolName());         /** @var \Hyperf\Amqp\Connection $connection */         $connection = $pool->get();         if ($confirm) {             $channel = $connection->getConfirmChannel();         } else {             $channel = $connection->getChannel();         }         $channel->set_ack_handler(function () use (&$result)         {             $result = true;         });         try {             // 处理延时队列             $exchangeBuilder = $producerMessage->getExchangeBuilder();             // 队列定义             $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());             // 路由定义             $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());             // 队列绑定             $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());             // 消息发送             $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());             $channel->wait_for_pending_acks_returns($timeout);         } catch (Throwable $exception) {             // Reconnect the connection before release.             $connection->reconnect();             throw $exception;         }         finally {             $connection->release();         }         return $confirm ? $result : true;     }     /**      * @param ProducerMessageInterface $producerMessage      */     private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void     {         if (class_exists(AnnotationCollector::class)) {             /** @var \Hyperf\Amqp\Annotation\Producer $annotation */             $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);             if ($annotation) {                 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);                 $annotation->exchange && $producerMessage->setExchange($annotation->exchange);             }         }     } }

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

<?php declare(strict_types = 1); namespace App\Amqp\Producer; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder\ExchangeBuilder; use Hyperf\Amqp\Message\ProducerMessage; /**  * @Producer(exchange="order_exchange", routingKey="order_exchange")  */ class OrderQueueProducer extends ProducerMessage {     public function __construct($data)     {         $this->payload = $data;     }     public function getExchangeBuilder() : ExchangeBuilder     {         return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub     } }

Orderqueueconsumer.php

<?php declare(strict_types = 1); namespace App\Amqp\Consumer; use App\Service\CityTransport\OrderService; use Hyperf\Amqp\Result; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; /**  * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)  */ class OrderQueueConsumer extends ConsumerMessage {     public function consume($data) : string     {        ##业务处理     }     public function isEnable() : bool     {         return true;     } }

Demo

$builder = new AmqpBuilder();         $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');         $que = ApplicationContext::getContainer()->get(DelayProducer::class);         var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多内容请关注技术你好其它相关文章!

来源链接:
免责声明:
1.资讯内容不构成投资建议,投资者应独立决策并自行承担风险
2.本文版权归属原作所有,仅代表作者本人观点,不代表本站的观点或立场
标签: Hyperf
上一篇:php获取远程图片并下载保存到本地的方法分析 下一篇:PHP 消息队列 Kafka 使用

相关资讯