槽痞

《深入RabbitMQ》笔记

三大抽象组件: 交换器、队列、绑定。

AMQP(高级消息队列协议):

当消息与任一绑定的队列符合匹配标准时,RabbitMQ服务器将以FIFO的顺序将消息放入队列中。放入队列数据结构中的并不是实际消息,而是消息的引用

默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。当添加更多消息时,队列的大小也会随之增加。RabbitMQ可以将这些消息保存在内存中或写入磁盘,具体取决于消息Basic.Properties中指定的delivery-mode属性。

提示 当你为生产环境编写发布者应用程序时,请使用JSON或XML等数据序列化格式以便消费者可以轻松地反序列化消息,这样在解决可能出现的任何问题时更易于阅读。

如果仔细观察,你可能已经注意到,尽管在发布消息时没有指定message_id或timestamp属性,但从消费者打印出来的每条消息中都有它们。如果不指定它们,rabbitpy客户端库将自动为你填充这些属性。另外,如果你发送了一个Python dict结构作为消息,rabbitpy会自动将数据序列化为JSON格式,并将content-type属性设置为application/json。

Basic.Properties几个重要属性:

Basic.Publish:

发布者确认作为事务的轻量级替代方法
如rabbitpy里实现的channel.enable_publisher_confirms(),是异步响应

事物机制

使用HA(高可用)队列避免节点故障
它允许队列在多个服务器上拥有冗余副本

rabbitmq提供了相关的管理API查询状态

消费消息:

在简单的消息速度测试中,使用Basic.Consume至少是使用Basic.Get的两倍,速度不同的最明显原因是使用Basic.Get会导致每条消息都会产生与RabbitMQ同步通信的开销,这一过程由发送请求帧的客户端应用程序和发送应答的RabbitMQ组成

当一个客户端发出一个Basic.Consume时,一旦有消息可用时RabbitMQ就会进行发送,直到客户端发出一个Basic.Cancel为止

优化消费者性能

大多数linux发型版可以在/etc/sysctl.conf中设置,或手动通过以下命令更改:

echo 16777216 > /proc/sys/net/core/rmem_default
echo 16777216 > /proc/sys/net/core/rmem_max

死信交换器
注意 死信交换器与第4章讨论的备用交换器不同。过期或被拒绝的消息通过死信交换器进行投递,而备用交换器则路由那些无法由RabbitMQ路由的信息。

队列的类型

临时队列

#! /usr/bin/env python3
# -*- coding:utf-8 -*
# product
import rabbitpy

url = 'amqp://guest:guest@localhost:5672/%2F'
with rabbitpy.Connection(url) as conn:
    with conn.channel() as channel:
        exchange = rabbitpy.Exchange(channel,'ad-example')
        exchange.declare()
        # queue = rabbitpy.Queue(channel,'ad_example',auto_delete=True)
        # queue.declare()
        # binded = queue.bind(exchange,'ad-example-routing-key')
        message = rabbitpy.Message(channel,
        'this_is delete',
        {'content_type':'text/plain'},
        opinionated=True
        )
        rtl = message.publish(exchange,'ad-example-routing-key')
        print(rtl)

#! /usr/bin/env python3
# -*- coding:utf-8 -*
# 自动删除的comsumer
import rabbitpy
import time

url = 'amqp://guest:guest@localhost:5672/%2F'
with rabbitpy.Connection(url) as conn:
    with conn.channel() as channel:
        exchange = rabbitpy.Exchange(channel,'ad-example')
        exchange.declare()
        queue = rabbitpy.Queue(channel,'ad_example_steve',auto_delete=True)
        queue.declare()
        binded = queue.bind(exchange,'ad-example-routing-key')
        while(True):
            if(len(queue)<=0):
                time.sleep(2)
                continue
            message = queue.get()
            message.pprint()
            message.ack()
            

永久队列

创建队列时声明:durable=True

队列消息自动过期:
创建队列时,设置arguments={"x-message-ttl":1000}
声明队列时同时指定死信交换器和TTL值将导致该队列中已到期的消息成为死信消息

最大长度队列:
从RabbitMQ 3.1.0开始,可以在声明队列时指定最大长度arguments={"x-max-length":1000}

消息路由模式

exchange_type:

交换器间路由,将一个exchange绑定到另外一个exchange

使用替代协议

MQTT

原因:

AMQP 0-9-1这一健壮的协议可以满足大多数应用程序与RabbitMQ的通信需求。而对于特殊使用场景,我们有更好的选择。举例来说,移动设备由于其高延迟、不可靠的网络通信会给AMQP带来诸多问题。相对而言,某些应用场景下,客户端应用程序不愿维护长连接,但是却想高速发送消息。这时,基于状态的AMQP就显得过于复杂了。此外,一些应用程序可能已经支持消息通信了,但却没有采用AMQP协议MQTT被设计用来在资源约束的设备以及低带宽的环境下使用,而不必牺牲消息通信的可靠性

应用场景:

MQTT协议适用于移动端应用。STOMP相对于AMQP来说更为简单。Web版的STOMP协议被设计用于Web浏览器。statelessd适用于高速消息发送。

消息队列遥测传输(MQ Telemetry Transpor,即MQTT)协议是一种轻量级的消息通信协议,在移动端应用中应用广泛。RabbitMQ通过插件机制来支持它。

评估是否适合用mqtt协议,架构是否能从mqtt的最后遗愿功能中获益,(LWT使得客户端能够在无意间断开连接时,发送一条指定的消息)。也许你会触及到MQTT的最大消息长度:256MB。。

优缺点:发送小型消息如应用状态等,MQTT比http更合适,但如果是传输图片视频等信息的话,http更合适(http支持快文件上传)。

amq.topic交换器是默认的交换器,MQTT客户端会将消息发往该交换器上。在发布消息时,MQTT插件会自动将MQTT topic名称中的正斜杠变更为句点,用作AMQP路由键。

当通过MQTT连接RabbitMQ来订阅消息时,RabbitMQ将创建新的队列。队列名称将采用mqtt-subscriber-[NAME]qos[N]的格式。其中[NAME]是唯一的客户端名称,[N]是客户端连接设置的QoS等级。举例来说,一个名为mqtt-subscriber-facebookqos0的队列,代表订阅者名称为facebook,并且QoS设置为了0。一旦为订阅请求创建队列之后,那么该队列将会采用AMQP点分路由键的语义,被绑定到topic交换器上

对于MQTT客户端发送消息来说,加号符号(+)是用于路由键的单层匹配,而非星号(*)

STOMP

STOMP专门设计用于基于流的处理,STOMP帧是UTF-8文本,由命令和命令对应的载荷组成,并以null(0x00)字节结束。不同于AMQP和MQTT协议,STOMP是可读的,而且不需要二进制位封装信息来定义STOMP消息帧和内容,是一种人类可读的文本协议,采用utf-8编码。
STOMP协议是一种人类可读的基于文本的流协议,其设计简单易用。尽管AMQP和MQTT等二进制协议可能更高效,STOMP协议通过使用更少的数据来传输相同的消息也颇具优势,特别是在使用STOMP插件和RabbitMQ时。队列创建和绑定行为需要较少的代码,但也需要付出代价。由STOMP插件创建的代理AMQP连接,在与RabbitMQ通信进行时需要对STOMP数据进行翻译,这相对于直接使用AMQP连接来说会有额外的开销。

statelessd

个人觉得类似于一个维护连接状态的中间件。

出现背景:当我们开始使用RabbitMQ时,我们立刻发现对于我们的PHP应用程序栈来说,有状态的AMQ协议非常昂贵。我们发现PHP无法维持跨客户端请求的开放连接和信道状态。为了发布消息,PHP应用程序在处理每个请求时,都需要与RabbitMQ建立新的连接。为了解决这个问题,我们最终创建了HTTP转AMQP的发布网关statelessd。它需要接收高速的HTTP请求,同时管理用于消息发布所需的连接栈。另外,它不会成为性能瓶颈,并且能可靠地将消息发送到RabbitMQ。

当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »