1. 简介
1.1 消息队列简介
1.1.1 什么是消息队列
消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:
// 1. 创建一个保存字符串的队列 QueuestringQueue = new LinkedList();
// 2. 往消息队列中放入消息 stringQueue.offer( "hello" );
// 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。
我们可以简单理解消息队列就是将需要传输的数据存放在队列中。
1.1.1 消息队列中间件
消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。
目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。
消息队列的应用场景
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
image.png
image.png
1.1.1.1 日志处理(大数据领域常见)
大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。
image.png
image.png
image.png
image.png
image.png
点对点模式特点:
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
image.png
image.png
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
2. 以容错的持久化方式存储数据流
处理数据流
1. Publish and subscribe:发布与订阅
2. Store:存储
3. Process:处理
我们通常将Apache Kafka用在两类程序:
1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
2. 构建实时流应用程序,以转换或响应数据流
image.png
上图,我们可以看到:
1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
Kafka比ActiveMQ牛逼得多
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。
image.png
image.png
image.png
image.png
image.png
目录名称 | 说明 |
---|---|
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
基于1个分区1个副本的基准测试
测试步骤:
1. 启动Kafka集群
2. 创建一个1个分区1个副本的topic: benchmark
3. 同时运行生产者、消费者基准测试程序
4. 观察结果
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
<repositories><!-- 代码库 -->
<repository>
<id>central</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<!-- SLF桥接LOG4J日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<!-- SLOG4J日志 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
将log4j.properties配置文件放入到resources文件夹中
log4j.rootLogger=INFO,stdout**
**log4j.appender.stdout=**org.apache.log4j.ConsoleAppender****
****log4j.appender.stdout.layout**=**org.apache.log4j.PatternLayout****
****log4j.appender.stdout.layout.ConversionPattern**= **%5p - %m%n**
image.png
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"192.168.88.100:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
**try** {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));
// 调用一个Future.get()方法等待响应
future.get();
} **catch** (InterruptedException e) {
e.printStackTrace();
} **catch** (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
image.png
image.png
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
**try** {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));
// 调用一个Future.get()方法等待响应
future.get();
} **catch** (InterruptedException e) {
e.printStackTrace();
} **catch** (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
// 一、同步方式
// 获取返回值Future,该对象封装了返回值
// Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
// future.get();
// 二、带回调函数异步方式
producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ), **new** Callback() {
@Override
**public void** onCompletion(RecordMetadata metadata, Exception exception) {
**if**(exception != **null**) {
System.***out***.println( **"** **发送消息出现异常** **"** );
}
**else** {
String topic = metadata.topic();
**int** partition = metadata.partition();
**long** offset = metadata.offset();
System.***out***.println( **"** **发送消息到** **Kafka** **中的名字为** **"** + topic + **"** **的主题,第** **"** + partition + **"** **分区,第** **"** + offset + **"** **条数据成功** **!"** );
}
}
});
}
// 5. 关闭生产者
producer.close();
}
}
image.png
image.png
image.png
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
1.1.1 producer(生产者)
生产者负责将数据推送给broker的topic
1.1.2 consumer(消费者)
消费者负责从broker的topic中拉取数据,并自己进行处理
1.1.3 consumer group(消费者组)
image.png
image.png
image.png
image.png
image.png
image.png
// 3. 发送1-100数字到Kafka的test主题中
**while**(**true**) {
**for** (**int** i = 1; i <= 100; ++i) {
// 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回
// 这样可以让消息发送变得更高效
producer.send(**new** ProducerRecord<>( **"test"** , i + **""** ));
}
Thread.*sleep*(3000);
}
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
*// 1. 创建消费者*
public static Consumer < String, String > createConsumer() {
// 1. 创建Kafka消费者配置
Properties props = new Properties();
props.setProperty( " bootstrap.servers " , " node1.itcast.cn:9092 " );
props.setProperty( " group.id " , " ods_user " );
props.put( " isolation.level " , " read_committed " );
props.setProperty( " enable.auto.commit " , " false " );
props.setProperty( " key.deserializer " , " org.apache.kafka.common.serialization.StringDeserializer " );
props.setProperty( " value.deserializer " , " org.apache.kafka.common.serialization.StringDeserializer " );
// 2. 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅要消费的主题
consumer.subscribe(Arrays.asList( " ods_user " ));
return consumer;
}
编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
image.png
image.png
1.1.1.1 编写代码消费并生产数据
实现步骤:
1. 调用之前实现的方法,创建消费者、生产者对象
2. 生产者调用initTransactions初始化事务
3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
(1) 生产者开启事务
(2) 消费者拉取消息
(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
(4) 生产消息到dwd_user topic中
(5) 提交偏移量到事务中
(6) 提交事务
(7) 捕获异常,如果出现异常,则取消事务
public static void main(String[] args) {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// 初始化事务
producer.initTransactions();
while(true) {
try {
// 1. 开启事务
producer.beginTransaction();
// 2. 定义Map结构,用于保存分区对应的offset
Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
// 2. 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
// 4. 进行转换处理
String[] fields = record.value().split( " , " );
fields[1] = fields[1].equalsIgnoreCase( " 1 " ) ? " 男 " : " 女 " ;
String message = fields[0] + " , " + fields[1] + " , " + fields[2];
// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>( " dwd_user " , message));
}
// 6. 提交偏移量到事务
producer.sendOffsetsToTransaction(offsetCommits, " ods_user " );
// 7. 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 8. 放弃事务
producer.abortTransaction();
}
}
}
image.png
image.png
1. 分区和副本机制
1.1 生产者分区写入策略
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
1. 轮询分区策略
2. 随机分区策略
3. 按key分区分配策略
4. 自定义分区策略
1.1.1 轮询策略
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标 | 意义 |
---|---|
Brokers Spread | broker使用率 |
Brokers Skew | 分区是否倾斜 |
Brokers Leader Skew | leader partition是否存在倾斜 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** | 单分区单副本(ack=-1/all)**** |
---|---|---|---|
吞吐量 | 165875.991109/s每秒16.5W条记录 | 93092.533979/s每秒9.3W条记录 | 73586.766156 /s每秒7.3W调记录 |
吞吐速率 | 158.19 MB/sec | 88.78 MB/sec | 70.18 MB |
平均延迟时间 | 192.43 ms | 346.62 ms | 438.77 ms |
最大延迟时间 | 670.00 ms | 1003.00 ms | 1884.00 ms |
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** |
---|---|---|
吞吐量 | 165875.991109 records/sec每秒16.5W条记录 | 93092.533979 records/sec每秒9.3W条记录 |
吞吐速率 | 158.19 MB/sec每秒约160MB数据 | 88.78 MB/sec每秒约89MB数据 |
平均延迟时间 | 192.43 ms avg latency | 346.62 ms avg latency |
最大延迟时间 | 670.00 ms max latency | 1003.00 ms max latency |