rabbitmq 消息队列
大约 3 分钟
创建一个用户
创建一个交换机
用户绑定交换机
如何使用
1、先连接上rabbitmq
2、将需要发送的消息转为换byte,接受的时候再解析出来,绑定一个结构体即可后续的处理
比如说我需要将数据库的文章同步到es数据库中,我们可以将这篇文章全部查询出来,将查出出来的数据发送给mq,等消费端接收到这个消息的时候再转化为结构体进行处理,也可以发送一个文章的id值,通过id查询出这篇文章,再进行后续的处理,我们查询数据非常快,但是如果一下子需要处理大量的信息,服务器就会卡死了,这个时候如果使用mq,mq按照服务器所能承受的一个数值,慢慢取出这些数据,然后进行后续的操作,不至于服务器宕机
连接
var (
Client *amqp.Connection
)
func newClient() (*amqp.Connection, error) {
u := fmt.Sprintf("%s/%s", config.GetMqUrl(), "movie")
return amqp.DialConfig(u, amqp.Config{Heartbeat: 10})
}
func Connect() {
for {
conn, err := newClient()
if err != nil {
log.Println("mq已经断线,60秒后重连:", err)
time.Sleep(time.Second * 60)
newClient()
} else {
Client = conn
break
}
}
}
func Close() {
if !Client.IsClosed() {
Client.Close()
}
}
发送消息
func QueueDeclare(queueName string, data interface{}) error {
chanel, err := Client.Channel()
if err != nil {
return err
}
defer chanel.Close()
_, err = chanel.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return err
}
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
body, _ := json.Marshal(data)
err = chanel.PublishWithContext(ctx, "", queueName, false, false, amqp.Publishing{
ContentType: "text/plan",
Body: body,
})
if err != nil {
return err
}
return nil
}
消费消息
func Consume(queueName string, size int) {
channel, err := Client.Channel()
if err != nil {
log.Printf("queue 连接失败:", err)
return
}
defer channel.Close()
msgs, err := channel.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
log.Printf("【rabbitmq】消费失败:", err)
return
}
if size == 0 {
size = 1
}
err = channel.Qos(size, 0, false)
if err != nil {
log.Printf("【rabbitmq】qos 失败:", err)
return
}
var ch chan struct{}
go func() {
for d := range msgs {
switch queueName {
case MovieLabel:
var data MovieMqData
json.Unmarshal(d.Body, &data)
log.Printf("【%s】接收到的参数:%v", queueName, data)
// 爬取数据,保存到数据库中
spiderAndSave(data)
err = d.Ack(false)
if err != nil {
log.Println(err)
}
case ElasticSearchLabel:
var data models.MovieEsData
json.Unmarshal(d.Body, &data)
log.Printf("【%s】接收到的ID:%d", queueName, data.Data.ID)
pushElastic(data)
err = d.Ack(false)
if err != nil {
log.Println(err)
}
default:
var data string
json.Unmarshal(d.Body, &data)
log.Printf("msg:%s", data)
}
}
}()
<-ch
}
断线重连
func newClient() (*amqp.Connection, error) {
u := fmt.Sprintf("%s/%s", config.GetMqUrl(), "movie")
return amqp.DialConfig(u, amqp.Config{Heartbeat: 10})
}
func Connect() {
for {
conn, err := newClient()
if err != nil {
log.Println("mq已经断线,60秒后重连:", err)
time.Sleep(time.Second * 60)
newClient()
} else {
Client = conn
break
}
}
}
案例
网站文章抓取
1、分析网页结构
2、获取到网站文章的的详情链接地址
3、将连接地址发送给mq
4、监听mq的消息,如果有新的地址,直接请求文章的详情链接地址,将解析后的数据保存到数据库中
如果一次性请求太多数据,会把别人的网站压垮,第二我们自己数据库承受不住大量的数据写入,我们从mq中慢慢取数据可以减少请求的压力,自己的数据库可以保持一个稳定的值写入,避免数据库写入数据过大宕机。
消息分发
当我们在开发一个聊哦系统的时候,单个服务器可能无法承受大量的ws链接,于是我们可能将程序部署到多个服务器当中,假如说有两个服务器
a用户在A服务器,b用户在B服务器,现在a用户发消息给b,如何将消息推送给b (A服务器接收到了消息,需要通过B服务器向b用户推送消息)
使用redis进行区分,当用户建立链接的时候,创建一个 【服务器id + 用户id + 好友id】的键,当接收到消息后,将消息给mq,mq通过解析键来确定当前用户在那个服务器上,然后找到对应的服务器将消息推送给用户。