rabbitmq 学习教程
大约 5 分钟
安装
composer require php-amqplib/php-amqplib
发送消息
<?php
namespace App\Console\Commands;
use App\Service\RabbitmqService;
use Illuminate\Console\Command;
class send extends Command
{
public $queueName = "log";
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:send';
protected $description = 'mq 发送消息';
/**
* php artisan app:send
*/
public function handle()
{
$id = 0;
while (true) {
$id++;
$user = [
"id" => $id,
"name" => fake()->userName(),
];
$data = json_encode($user);
// $this->sendMag($data);
// $this->sendSubMsg("laravel_pubsub",$data);
// $this->sendRoutingMsg("laravel_routing","logx",$data);
if ($id % 2 == 0) {
$user["name"] = "info";
$data = json_encode($user);
$this->sendTopicMsg("laravel_topic", "laravel.log.info", $data);
} else {
$user["name"] = "debug";
$data = json_encode($user);
$this->sendTopicMsg("laravel_topic", "laravel.log.debug", $data);
}
sleep(1);
}
}
public function sendMag($data = "")
{
$app = new RabbitmqService();
$app->pushMsg($this->queueName, $data);
}
public function sendSubMsg($exchangeName, $data)
{
$app = new RabbitmqService();
$app->pubSubMsg($exchangeName, $data);
}
public function sendRoutingMsg($exchangeName, $key, $data)
{
$app = new RabbitmqService();
$app->routingMsg($exchangeName, $key, $data);
}
public function sendTopicMsg($exchangeName, $key, $data)
{
$app = new RabbitmqService();
$app->topicMsg($exchangeName, $key, $data);
}
}
接收消息
<?php
namespace App\Console\Commands;
use App\Service\RabbitmqService;
use Illuminate\Console\Command;
class receive extends Command
{
public $queueName = "log";
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:receive';
/**
* The console command description.
*
* @var string
*/
protected $description = '接受mq消息';
/**
* php artisan app:receive
*/
public function handle()
{
// $this->receiveMsg();
// $this->receivePubSubMsg("laravel_pubsub");
// $this->receiveRoutingSubMsg("laravel_routing","laravel_routing_queue","logx");
// $this->receiveTopicMsg("laravel_topic","laravel_topic_info","laravel.log.info");
// $this->receiveTopicMsg("laravel_topic","laravel_topic_debug","laravel.log.debug");
$this->receiveTopicMsg("laravel_topic","laravel_topic_debug","laravel.log.*");
}
public function receiveMsg()
{
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
};
$app = new RabbitmqService();
$app->receiveMsg($this->queueName,$callback);
}
public function receivePubSubMsg($exchangeName)
{
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
// 消息确认
$msg->ack();
};
$app = new RabbitmqService();
$app->receivePubSubMsg($exchangeName,$callback);
}
public function receiveRoutingSubMsg($exchangeName,$queueName,$key)
{
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
// 消息确认
$msg->ack();
};
$app = new RabbitmqService();
$app->receiveRoutingMsg($exchangeName,$queueName,$key,$callback);
}
public function receiveTopicMsg($exchangeName,$queueName,$key)
{
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
// 消息确认
$msg->ack();
};
$app = new RabbitmqService();
$app->receiveTopicMsg($exchangeName,$queueName,$key,$callback);
}
}
公共方法
<?php
namespace App\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqService
{
public $client = null; // 连接对象
public $channel = null; // 通道对象
public $vhost="laravel";
/**
* @throws \Exception
*/
public function __construct()
{
$conn = new AMQPStreamConnection('home.cc', 5672, 'rabbitmq', 'rabbitmq',$this->vhost);
$this->client = $conn;
$this->channel = $conn->channel();
}
/**
* $queue:队列的名称。
* $passive:如果设置为true,则只检查队列是否存在,不会创建队列。如果设置为false,则如果队列不存在,则会创建队列。
* $durable:如果设置为true,则队列将在服务器重启后仍然存在。如果设置为false,则队列在服务器重启后将被删除。
* $exclusive:如果设置为true,则队列将只对当前连接可见,并在连接关闭后自动删除。如果设置为false,则队列对所有连接可见,并且在连接关闭后不会自动删除。
* $auto_delete:如果设置为true,则当队列没有消费者时,队列将被自动删除。如果设置为false,则队列将一直存在,即使没有消费者。
* @return bool
*/
public function pushMsg($queueName,$data)
{
try {
$this->channel->queue_declare($queueName, false, true, false, false);
$msg = new AMQPMessage($data,['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, '', $queueName);
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$queueName]".$e->getMessage()."\n";
return false;
}
echo "[{$queueName}]消息发送成功!\n";
return true;
}
public function receiveMsg($queueName ,$callback)
{
try {
$this->channel->queue_declare($queueName, false, true, false, false);
/**
* $queue:要订阅的队列的名称。
* $consumer_tag:消费者的标签。如果设置为空字符串,则RabbitMQ会自动生成一个唯一的标签。
* $no_local:如果设置为true,则消费者将不会接收到自己发送的消息。如果设置为false,则消费者可以接收到自己发送的消息。
* $no_ack:如果设置为true,则消费者在处理完消息后不需要手动确认消息。如果设置为false,则消费者需要手动确认消息。
* $exclusive:如果设置为true,则消费者将独占该队列。如果设置为false,则消费者可以与其他消费者共享该队列。
* $nowait:如果设置为true,则RabbitMQ不会等待消费者的确认消息。如果设置为false,则RabbitMQ会等待消费者的确认消息。
* $callback:消息处理函数。当消费者接收到消息时,RabbitMQ会调用该函数。
*/
$this->channel->basic_consume($queueName, '', false, true, false, false, $callback);
$this->channel->consume();
}catch (\ErrorException $e){
dump($e->getMessage());
}
}
public function pubSubMsg($exchangeName,$data)
{
try {
$this->channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$msg = new AMQPMessage($data,['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, $exchangeName);
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[{$exchangeName}]消息发送成功!\n";
return true;
}
public function receivePubSubMsg($exchangeName,$callback)
{
try {
$this->channel->exchange_declare($exchangeName, 'fanout', false, false, false);
// 声明一个非持久化、排他的、自动删除的队列
list($queue_name, ,) = $this->channel->queue_declare("", false, false, true, true);
$this->channel->queue_bind($queue_name, $exchangeName);
$this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
$this->channel->consume();
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[$exchangeName]消息发送成功!\n";
return true;
}
public function routingMsg($exchangeName,$key,$data)
{
try {
$this->channel->exchange_declare($exchangeName, 'direct', false, false, false);
$msg = new AMQPMessage($data,['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, $exchangeName,$key);
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[{$exchangeName}]消息发送成功!\n";
return true;
}
public function receiveRoutingMsg($exchangeName,$queueName,$key,$callback)
{
try {
$this->channel->exchange_declare($exchangeName, 'direct', false, false, false);
$this->channel->queue_declare($queueName, false, false, false, false);
$this->channel->queue_bind($queueName, $exchangeName,$key);
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
$this->channel->consume();
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[$exchangeName]消息发送成功!\n";
return true;
}
public function topicMsg($exchangeName,$key,$data)
{
try {
$this->channel->exchange_declare($exchangeName, 'topic', false, false, false);
$msg = new AMQPMessage($data,['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg, $exchangeName,$key);
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[{$exchangeName}]消息发送成功!\n";
return true;
}
public function receiveTopicMsg($exchangeName,$queueName,$key,$callback)
{
try {
$this->channel->exchange_declare($exchangeName, 'topic', false, false, false);
$this->channel->queue_declare($queueName, false, false, false, false);
$this->channel->queue_bind($queueName, $exchangeName,$key);
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
$this->channel->consume();
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
echo "[$exchangeName]".$e->getMessage()."\n";
return false;
}
echo "[$exchangeName]消息发送成功!\n";
return true;
}
}
延迟队列
安装插件:
public function dlx($queueName,$key,$data,$seconds = 1)
{
$delayExchangeName = "dlx_exchange"; // 私信交换机
try {
// 1、声明私信交换机
$this->channel->exchange_declare($delayExchangeName, 'x-delayed-message', false, true, false);
// 2、声明私信队列
$this->channel->queue_declare($queueName, false, true, false, false);
// 3、绑定私信队列绑定到私信交换机
$this->channel->queue_bind($queueName, $delayExchangeName, $key);
// 4、发送消息
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'text/plain',
'headers' => [
'x-delay' => $seconds*1000, // 延迟时间,单位毫秒
// 'x-dead-letter-exchange' => 'dlx_exchange', // 私信交换机
// 'x-dead-letter-routing-key' => $key, // 私信队列
]
]);
$this->channel->basic_publish($msg, $delayExchangeName, $key);
$this->channel->close();
$this->client->close();
}catch (\Exception $e){
dump($e->getMessage());
return false;
}
return true;
}
public function receiveDlx($queueName,$callback)
{
try{
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
$this->channel->consume();
$this->channel->close();
$this->client->close();
}catch (\Exception){
return false;
}
return true;
}
发送
$app = new RabbitmqService();
$app->dlx($queueName, $key, $data,5); // 5秒
接收
public function receiveDlxMsg($queueName)
{
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
// 消息确认
$msg->ack();
};
$app = new RabbitmqService();
$app->receiveDlx($queueName,$callback);
}