论文部分内容阅读
中图分类号:TU 文献标识码:A 文章编号:(2021)-01-252
前言
OpenStack遵循这样的设计原则:项目之间通过 RESTful API 进行通信,项目内部,不同服务进程之间通过消息队列进行通信,通信的代码实现借由 oslo_messaging 库完成
oslo_messaging 是对 Python 庫 kombu 的封装,kombu 是对 AMQP 的封装,所以在讨论 oslo_messaging 前有必要对 AMQP 进行说明
AMQP
Openstack 所支持的消息队列类型中,大部分都是基于AMQP,基于 AMQP 标准,有很多具体的实现,比如 OpenStack 默认的 RabbitMQ,Kafka 等,所以本文着重介绍 AMQP 的架构,RabbitMQ 架构与之类似
架构图如下所示:
在对上图进行说明之前需要先对其中的一些名词进行说明:
·Server/Broker:AMQP 的具体实现,如 RabbitMQ,Kafka
·Producer:生产者
·Consumer:消费者
·Virtual Host:虚拟主机,一个 broker 里可以有多个 vhost,用作不同用户的权限分离
·Exchange:接收 Producers 发送过来的消息,按照一定规则转发到相应的 Message Queues 中
·Queue:将接收到的消息转发到相应的 Consumers
·Channel:消息通道,在客户端的每个连接里,可建立多个 Channel
·Routing Key:Exchange 根据这个关键字将消息转发至不同的 Queue
流程说明:生产者将消息发送给 Exchange,由 Exchange 来决定消息的路由,即决定将消息发送到那个 Queue,然后消费者从 Queue 中取出消息,进行处理,至于 Exchange 将消息转发给哪一个 Queue,这将依赖于 Routing Key,每一个发送的消息都有一个 Routing Key,同样,每一个 Queue 也有一个 Binding Key,Exchange 在进行消息路由时,会查询每一个 Queue,如果某个 Queue 的 Binding Key 与某个消息的 Routing Key 匹配,这个消息会被转发到那个 Queue
简单来说,Exchange 依据 Routing Key 进行匹配,将消息转发给匹配成功的 Message Queue,总共有三种匹配模式:
?1?Direct:Routing Key 为一字符串,匹配规则为全值匹配
?2?Topic:Routing Key 为由 . 分隔的一个个子串组成的字符串,匹配规则为模式匹配,具有两种通配符,星号 '*' 表示任意一个子字符串,井号,'#' 表示任意多个子字符串,Producer 发送的 Routing Key 没有通配符,Message Queue 绑定的 Routing Key 可以有通配符
?3?Fanout Exchange:广播,没有 Routing Key,所有绑定到 Fanout Exchange 的 Message Queue 都能收到来自 Producer 发送的消息
Notifiy 与 oslo.messaging
oslo.messaging 库通过以下两种方式来完成项目各个服务进程之间的通信
?1?远程过程调用(Remote Procedure Call,RPC)通过远程过程调用,一个服务进程可以调用其他远程服务进程的方法,并且有两种方法:call 和 cast,通过 call 的方式调用,远程方法会被同步执行,调用者会被阻塞直到结果返回,通过 cast 的方式调用,远程方法会被异步执行,结果不会立即返回,调用者也不会被阻塞,但是调用者需要用其他方式查询这次远程调用的结果
?2?事件通知(Event Notification)某个服务进程可以把事件通知发送到消息总线上,该消息总线上所有对此类事件感兴趣的服务进程,都可以获得此事件通知并进行进一步的处理,处理的结果并不会发送给事件发送者,这种通信方式,不但可以在同一个项目内部的各个服务进程之间发送通知,还可以实现跨项目之间的通知发送,Ceilometer 就通过这种方式大量获取其他 OpenStack 项目的事件通知,从而进行计量和监控
编程实现
1.创建连接 Channel
2.创建队列,在队列中指明回调函数
3.创建回调函数
4.使用某一角色收发信息
关于回调函数,为了保证数据不被丢失,RabbitMQ 支持消息确认机制,为了保证数据能被正确处理而不仅仅是被 Consumer 收到,那么我们不能采用 no-ack,而应该是在处理完数据之后发送 ack
def process_media(body, message):
print(' ')
print(body)
message.ack()
OpenStack 组件发送消息的格式如下:
{
'message_id': six.text_type(uuid.uuid4()), #消息id号
'publisher_id': 'compute.hos1',#发送者id
'timestamp': timeutils.utcnow(),#时间戳
'priority': 'WARN',#通知优先级
'event_type': 'compute.create_instance', #通知类型
'payload': {'instance_id': 12, ...} #通知内容
}
使用 Python 的测试代码如下:
from kombu.entity import Queue
from kombu.messaging import Consumer
from kombu.connection import Connection
# 处理消息
def process_media(body, message):
print(' ')
print(body)
message.ack()
connection = Connection('amqp://openstack:[email protected]:5672//')
queue = Queue('notifications.info', durable=False)
with connection.Consumer(queues=[queue], callbacks=[process_media]) as consumer:
while True:
# 等待消息传入
connection.drain_events()
临沂大学 山东 临沂 276000
前言
OpenStack遵循这样的设计原则:项目之间通过 RESTful API 进行通信,项目内部,不同服务进程之间通过消息队列进行通信,通信的代码实现借由 oslo_messaging 库完成
oslo_messaging 是对 Python 庫 kombu 的封装,kombu 是对 AMQP 的封装,所以在讨论 oslo_messaging 前有必要对 AMQP 进行说明
AMQP
Openstack 所支持的消息队列类型中,大部分都是基于AMQP,基于 AMQP 标准,有很多具体的实现,比如 OpenStack 默认的 RabbitMQ,Kafka 等,所以本文着重介绍 AMQP 的架构,RabbitMQ 架构与之类似
架构图如下所示:
在对上图进行说明之前需要先对其中的一些名词进行说明:
·Server/Broker:AMQP 的具体实现,如 RabbitMQ,Kafka
·Producer:生产者
·Consumer:消费者
·Virtual Host:虚拟主机,一个 broker 里可以有多个 vhost,用作不同用户的权限分离
·Exchange:接收 Producers 发送过来的消息,按照一定规则转发到相应的 Message Queues 中
·Queue:将接收到的消息转发到相应的 Consumers
·Channel:消息通道,在客户端的每个连接里,可建立多个 Channel
·Routing Key:Exchange 根据这个关键字将消息转发至不同的 Queue
流程说明:生产者将消息发送给 Exchange,由 Exchange 来决定消息的路由,即决定将消息发送到那个 Queue,然后消费者从 Queue 中取出消息,进行处理,至于 Exchange 将消息转发给哪一个 Queue,这将依赖于 Routing Key,每一个发送的消息都有一个 Routing Key,同样,每一个 Queue 也有一个 Binding Key,Exchange 在进行消息路由时,会查询每一个 Queue,如果某个 Queue 的 Binding Key 与某个消息的 Routing Key 匹配,这个消息会被转发到那个 Queue
简单来说,Exchange 依据 Routing Key 进行匹配,将消息转发给匹配成功的 Message Queue,总共有三种匹配模式:
?1?Direct:Routing Key 为一字符串,匹配规则为全值匹配
?2?Topic:Routing Key 为由 . 分隔的一个个子串组成的字符串,匹配规则为模式匹配,具有两种通配符,星号 '*' 表示任意一个子字符串,井号,'#' 表示任意多个子字符串,Producer 发送的 Routing Key 没有通配符,Message Queue 绑定的 Routing Key 可以有通配符
?3?Fanout Exchange:广播,没有 Routing Key,所有绑定到 Fanout Exchange 的 Message Queue 都能收到来自 Producer 发送的消息
Notifiy 与 oslo.messaging
oslo.messaging 库通过以下两种方式来完成项目各个服务进程之间的通信
?1?远程过程调用(Remote Procedure Call,RPC)通过远程过程调用,一个服务进程可以调用其他远程服务进程的方法,并且有两种方法:call 和 cast,通过 call 的方式调用,远程方法会被同步执行,调用者会被阻塞直到结果返回,通过 cast 的方式调用,远程方法会被异步执行,结果不会立即返回,调用者也不会被阻塞,但是调用者需要用其他方式查询这次远程调用的结果
?2?事件通知(Event Notification)某个服务进程可以把事件通知发送到消息总线上,该消息总线上所有对此类事件感兴趣的服务进程,都可以获得此事件通知并进行进一步的处理,处理的结果并不会发送给事件发送者,这种通信方式,不但可以在同一个项目内部的各个服务进程之间发送通知,还可以实现跨项目之间的通知发送,Ceilometer 就通过这种方式大量获取其他 OpenStack 项目的事件通知,从而进行计量和监控
编程实现
1.创建连接 Channel
2.创建队列,在队列中指明回调函数
3.创建回调函数
4.使用某一角色收发信息
关于回调函数,为了保证数据不被丢失,RabbitMQ 支持消息确认机制,为了保证数据能被正确处理而不仅仅是被 Consumer 收到,那么我们不能采用 no-ack,而应该是在处理完数据之后发送 ack
def process_media(body, message):
print(' ')
print(body)
message.ack()
OpenStack 组件发送消息的格式如下:
{
'message_id': six.text_type(uuid.uuid4()), #消息id号
'publisher_id': 'compute.hos1',#发送者id
'timestamp': timeutils.utcnow(),#时间戳
'priority': 'WARN',#通知优先级
'event_type': 'compute.create_instance', #通知类型
'payload': {'instance_id': 12, ...} #通知内容
}
使用 Python 的测试代码如下:
from kombu.entity import Queue
from kombu.messaging import Consumer
from kombu.connection import Connection
# 处理消息
def process_media(body, message):
print(' ')
print(body)
message.ack()
connection = Connection('amqp://openstack:[email protected]:5672//')
queue = Queue('notifications.info', durable=False)
with connection.Consumer(queues=[queue], callbacks=[process_media]) as consumer:
while True:
# 等待消息传入
connection.drain_events()
临沂大学 山东 临沂 276000