再见了Kafka,MQ新王Pulsar大厂实践!

本文介绍公司选择 Apache Pulsar 的原因,使用 Apache Pulsar 的场景,Apache Pulsar 实践应用中遇到的问题及使用 Apache Pulsar 的未来规划。

1 背景

传统金融公司或券商一般使用统一接入服务或组件来处理对外业务。接收到用户请求后,根据业务规则将请求转对应业务系统 / 模块。有些请求会转发给MQ,请求写入后,下游业务系统从MQ获取请求,并在处理后通过MQ原路返给客户,整个请求过程封闭运行,功能有限。

1.1 消息队列下传统架构带来的挑战

采用上述传统架构,目前只支持MQ,但难以获取MQ细节。由于是定制系统,支持语言有限。现有MQ对业务发展和业务创新不足:

  • 黑盒系统,难观测,MQ是黑盒系统,难观测到架构细节
  • 直接交换(Direct Exchange),无法路由:由于架构目前只支持MQ,无法支持需要路由的场景
  • 弱校验接入,安全风险高:现有系统的密码认证、校验等检验较弱,安全风险高
  • 定制系统,有限语言支持:定制系统接入语言的支持有限,导致我们选择范围少,难以在原有系统基础上改造

随业务扩展和架构改进,公司现有MQ系统 / 组件面临挑战,而系统现存问题如安全性等在金融场景中刻不容缓。

2 金融场景的业务需求

业务需求主要三类:

2.1 身份识别 & 安全控制

身份识别,主要用于确定接入消息队列的客户端和接入者的身份信息,指定相应的安全规则,拒绝不合法接入者,进而实现预期的安全要求。从最基础的层面看,需要识别控制接入的系统、IP,根据业务场景及特定需求,进行权限限制。

2.2 路由分发

指消息根据规则由写入队列路由至对应的队列。现有MQ支持场景有限,若想支持更多,需投入大量时间精力开发(涉及上下游系统改造),同时引入其他问题。较好解决方案是MQ系统原生支持更多模式及特性,如 TOPIC 模式、流式消息处理。若MQ系统支持路由,则系统的接入复杂度大大降低,可通过更优方式操作接入层,每个系统只需要对接一组 topic,路由负责分发;也可更有针对性优化性能(路由、转发、协议转化都是消耗性能的操作)。

原系统架构通讯机制是点对点,封闭运行,请求消息无法共享,只能间接采用适配器或日志采集方式实现分发,此类做法难以有效满足实时性要求。

2.3 审计

消息发布者 / 接收者都属于整个系统的参与者,且是重中之重。系统安全性主要影响系统的所有参与者;因此,从安全角度出发,对消息审计要求较高。另一急迫需求是对消息流向控制。若可进行身份识别和安全控制,则可在审计时完善和优化安全信息,进而保证在业务入口处拒绝无效、非法请求,保证内部系统健壮。记录接入的消息发布者 / 接收者信息还可用于异常情况监控、稽核审计。

3 新增业务的系统需求

新增业务对消息系统提出更高要求,主要包括可用性、消息发送延迟、扩缩容、消息回溯。

3.1 需求一:高可用、低延迟

互联网行业,高可用低延迟是系统基本要求。从单点到灾备,到同城跨机房,再到异城跨多中心或先跨城、灾备,再跨城多中心(两地三中心)模式都已常态,很多公司业务系统正在或将往此发展。这样的系统对高可用、低延迟要求较高。因此需考虑当系统复杂度增加(如灾备、跨城等场景)时,如何将延迟降到最低。

3.2 需求二:快速扩容与恢复

金融业业务主要特性之一是请求可能在某时间段或某个周期激增,过了这个时间窗口,流量逐渐正常。该特性要求系统可快速横向扩缩容,出于成本考虑,按最高流量部署整个系统架构显然不合理。最好解决方案是系统可根据单层流量合理安排系统架构或系统部署方式,在流量突然增加时,系统可快速扩容,支撑业务。最理想的情况是系统的所有组件都有快速扩缩容、恢复能力。

3.3 需求三:消息有序、消息防重

一些场景需保证消息有序或防重。经常对一些接口进行幂等操作,若可保证上游消息不重复,就可减小下游压力。

3.4 需求四:可回溯、序列化

若业务系统出现问题,但测试环境难复现,就要引入消息回溯,即重放一遍出现问题的时间窗口中的所有请求,验证是否能复现并排查,这可大大减轻排查工作量。

还可以此进行灰度验证和并行验证。

4 选择 Apache Pulsar

明确业务需求和系统需求,发现 Apache Pulsar 完美契合。

4.1 集群模式

支持跨集群同步。建设系统双活,跨集群的地域复制在客户端无感的情况下实现消息同步。

4.2 算存分离

根据使用情况横向扩展存储 / 计算,客户端对此操作无感知。基于二级存储,扩展消息的使用场景,为数据分析、消息审计提供可能。

4.3 客户端接入认证模块插件化

支持自定义开发。因业务需求,在客户端接入时,要求鉴权、认证,有效保证消息的来源可靠、可控。

4.4 完善的 Rest API

可查看队列情况。之前使用的消息系统有很好性能,但可观测性欠缺,排障困难,同时消息系统管理方式原始,难适配大规模系统管理要求。而 Apache Pulsar 完善 Rest API 不仅可获取系统运行指标,且有助集群高效管理。

4.5 Functions

基于 Functions 可实现消息的路由开发、过滤和统计等。

4.6 消息重放

可设置消息的持久化模式和过期时间,允许消息重放。

4.7 多语言支持

快速便捷接入。

5 Pulsar业务实践

使用 Apache Pulsar 构建统一消息平台,期望整合客户、交易、行情、资金四大数据流,应用于行情分发、实时风控等。本文主要介绍应用场景下的新架构的优势和不足,以及其对开发、运维影响。

5.1 请求路由——简化系统

消息路由流程如图。从 A 组件发请求写入 Topic A,然后路由模块将 topic 信息路由,分发到多个对应 topic,订阅这些 topic 的下游组件就可处理相关消息。组件 A 只需向固定队列写消息,无需关注 Topic B、C、D 信息,下游系统只需了解接收消息的队列,无需关注 Topic A,从而简化整个网络结构。

这种消息路由模式简化了系统整体架构,目前路由系统仍待优化:

  • 虽路由分发的工作量减轻,但排查问题步骤增加。如组件 A 发消息后,组件 B 未收到消息时,需先检查组件 A 是否写入 Topic A、路由模块是否成功路由该消息,再看组件 B 是否正确订阅该消息
  • 目前测试效果看,由于消息链路变长,时延增加
  • 由于每个队列的消息都会持久化,导致存储和队列中都出现数据冗余
  • 路由模块是新增模块,运维学习成本高
5.2 数据广播——降低时延

数据广播采用发送 / 订阅模式,用于同步消息。之前不需要同步行情到业务系统或通过其他方式(如同步数据库)实现。但随业务增长,同步时效和用户体验竞争度越来越激烈。如何让用户更快看到信息?以同步行情场景为例,先同步数据库再查阅的方式,时延相对较长;而广播模式的业务系统只需订阅所需 Topic,查阅时即可直接读数据,有效降低时延。

5.3 消息通知——安全管控

虽消息通知涉及业务较少,但这业务场景很重要。整体业务流程图如下。由于信号源不唯一,因此消息发布到计算引擎后,计算引擎需根据信号源的信息进行逻辑、安全等计算。计算完成后调起Task,再由激活的 Task 向相关业务系统发业务请求,执行后将结果返给发起信号源的服务,该服务根据返回的结果触发下一个信号源。

该场景涉及业务对安全和管控要求严格,不仅要限制信号源发送的消息或信号,截断 / 过滤某些信号,还要对返回结果处理:哪些可返回,哪些要过滤或转换成其他内容。如不使用MQ,消息源会直接发消息给计算引擎,在计算引擎执行安全或管控策略后,将消息发到 Task;Task执行完成后,其结果要再进行一轮安全管控处理。这部分重复操作对性能影响大,同时策略更新、信号状态查看的时效性没那么实时。

引入Pulsar后,将管控审计模块剥离,专门针对信号队列和结果队列进行过滤、审计、统计,并实时输出结果到管理端。运维或审计人员看到这些信息后,可控制、更新相应策略。这模式不仅精简数据流,还可增加数据补充渠道,也更清晰定义各服务模块边界。

6 问题发现与解决方案

6.1 实现 REQ-REP 模式

通过总线模式进行兼容。

常见调用方式是客户端发起调用请求,服务端处理完成后返回响应。但引入总线(同步转异步),在多节点部署场景,节点 1 发请求,服务端收到请求后返回处理结果,所有节点都要监听这条处理结果,节点 2 收到归属节点 1 的响应消息时咋处理?节点 2 要先订阅并获取回包的消息,判断是否自身节点发起请求的响应,若不是,则丢弃该消息。若按这模式实现,则发消息时,每个节点都要缓存自身发送的消息 ID;服务端处理完后,按协议回包数据要带上请求的消息 ID,每个节点都订阅获取所有回包,并校验缓存中是否有该消息 ID,若不存在,则丢弃消息。

该实现存在严峻问题:节点发起一个查询大量数据的请求时,假定 Apache Pulsar 设置一个消息大小为8M,TPS 为 1000,那是不是每个节点都要收到这么多请求的回包流量呢?假如有 5 个节点,每个节点本应该只接收 200 个请求的回包流量就够了,但现在的模式需要每个节点承受 1000 个请求的回包流量,而其目的仅仅是为了过滤操作。如果节点负载性能达到上限,需要扩容节点,将导致网络带宽成倍增加。由于 Apache Pulsar 可以支持大量 Topic,虽然通过给每个节点配置一个回包队列等方式可以解决这一问题,但我们想尝试通过 broker 的 FILTER 功能,来解决该问题。

6.2 实现读写分离

广播场景涉及读写分离。若增加大量订阅节点,最好避免将所有节点的链接集中在 Topic 的 owner broker。针对该问题,可行的解决方案是合理分配使用的 Topic 和 Partition。

Apache Pulsar 2.7.2 还不支持读写分离,Apache Pulsar 升级到 2.8就可轻松实现读写分离,满足消息广播。

6.3 解决多网卡问题

基于公司网络安全考虑,内部存在多种网络分区及网段,不同的网络分区 / 网段使用不同IP,服务器存在多个网卡,供跨分区系统间通信。目前若使用 IP 注册 broker,只能注册某网段的 IP;如果使用域名注册 broker,则不同网络区域的 DNS 解析又需要进行不同的配置。若broker支持多网卡通信,这些问题就没了。目前解决方案是用 proxy 代理客户端的请求,外部系统也只连到 proxy,我们也会为 proxy 增加一些高可用配置。

7 容灾

先在单机房、单集群小规模运行。作为业务系统的基础设施,Pulsar自身可用性极为重要,建设同城双中心单集群的双活规划: