rabbitmq 学习教程
大约 2 分钟
安装
composer require php-amqplib/php-amqplib
发送消息
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Str;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class send extends Command
{
public $queueName = "hello";
public $vhost="hello";
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:send';
protected $description = 'mq 发送消息';
/**
* php artisan app:send
*/
public function handle()
{
for ($i = 0; $i < 10; $i++) {
$user = [
"name" => fake()->userName(),
"password" => Str::uuid(),
"email" => fake()->email(),
];
$this->seedMag(json_encode($user));
}
$this->info("done");
}
// 连接s
public function seedMag($msg="")
{
$conn = new AMQPStreamConnection('home.cc', 5672, 'admin', 'admin666',$this->vhost);
$channel = $conn->channel();
/**
* $queue:队列的名称。
* $passive:如果设置为true,则只检查队列是否存在,不会创建队列。如果设置为false,则如果队列不存在,则会创建队列。
* $durable:如果设置为true,则队列将在服务器重启后仍然存在。如果设置为false,则队列在服务器重启后将被删除。
* $exclusive:如果设置为true,则队列将只对当前连接可见,并在连接关闭后自动删除。如果设置为false,则队列对所有连接可见,并且在连接关闭后不会自动删除。
* $auto_delete:如果设置为true,则当队列没有消费者时,队列将被自动删除。如果设置为false,则队列将一直存在,即使没有消费者。
*/
$channel->queue_declare($this->queueName, false, false, false, false);
$msg = new AMQPMessage($msg);
$channel->basic_publish($msg, '', 'hello');
echo " [√] 发送成功!'\n";
$conn->close();
$channel->close();
return true;
}
}
接收消息
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class receive extends Command
{
public $queueName = "hello";
public $vhost="hello";
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:receive';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* php artisan app:receive
*/
public function handle()
{
$this->receiveMsg();
}
public function receiveMsg()
{
$con = new AMQPStreamConnection('home.cc', 5672, 'admin', 'admin666',$this->vhost);
$channel = $con->channel();
$channel->queue_declare($this->queueName, false, false, false, false);
// 接受消息
$callback = function ($msg){
$user = json_decode($msg->body,true);
dump($user);
sleep(1);
};
/**
* $queue:要订阅的队列的名称。
* $consumer_tag:消费者的标签。如果设置为空字符串,则RabbitMQ会自动生成一个唯一的标签。
* $no_local:如果设置为true,则消费者将不会接收到自己发送的消息。如果设置为false,则消费者可以接收到自己发送的消息。
* $no_ack:如果设置为true,则消费者在处理完消息后不需要手动确认消息。如果设置为false,则消费者需要手动确认消息。
* $exclusive:如果设置为true,则消费者将独占该队列。如果设置为false,则消费者可以与其他消费者共享该队列。
* $nowait:如果设置为true,则RabbitMQ不会等待消费者的确认消息。如果设置为false,则RabbitMQ会等待消费者的确认消息。
* $callback:消息处理函数。当消费者接收到消息时,RabbitMQ会调用该函数。
*/
$channel->basic_consume($this->queueName, 'delivery_tag', false, true, false, false, $callback);
try {
// 直接消费
$channel->consume();
}catch (\Throwable $e){
$this->info($e->getMessage());
}
}
}