导语:对于企业来说,使用消息队列kafka主要要求的是高可用、扩展性、易维护、重要的是省钱。
一、先购买一个实例
1、供上购买链接:https://buy.cloud.tencent.com/ckafka?rid=1
2、先创建一个Ckafka实例
不同规格实例,有不同数量的topic和partition
这里可以选择自己业务所在私有网络,不过Ckafka提供多种接入方式,可以满足不同环境使用Ckafka
这里看一下,价格也是很便宜,还不到一台4核8G的CVM的价格,一台CVM还只能部署一个单机版的kafka服务端,没有高可用保障。
这是一台4核8G的CVM的价格
言归正传,创建好的ckafka实例就是这样的,要给它取个实例名称,跟业务关联,方便后期识别管理
ckafka也提供了多种接入方式,满足不场景的使用,开发、测试、生产环境都可以使用,并有保障安全的方式。
这里创建一个公网域名接入,用来开发测试,生产业务建议使用内网
创建一个topic
创建一个用户名,用于公网域名用户名认证使用
配置ACL策略,限制那个用户可以访问这个topic,保障线上ckafka的安全,这样我们就可以通过公网域名在本地访问线上的ckafka了,一起来测试一下。
二、使用IntelliJ IDEA搭建Maven工程
好用的工具就自己下载吧:
https://www.jetbrains.com/products.html#type=ide 安装JDK、Maven、nexus我就不教了,看前面的教程
先来创建一个工程
选择maven工程
取个工程名
pom.xml,仔细研究吧
三、编写生产者代码:Producer.java
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("session.timeout.ms", 30000);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 232840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="ckafka-in5xxxxf#Jensen" password="xxxx";");
Producer.asynSendRecord(props);} //异步发送消息,是不是有点浪漫 public static void asynSendRecord(Properties props){ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { ProducerRecord<String,String> record=new ProducerRecord<String,String>("jasen", Integer.toString(i), Integer.toString(i)); System.out.println("record:"+record.value()); producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("消息发送:"+"offset:"+recordMetadata.offset()+" timestamp:"+recordMetadata.timestamp()+" topic:"+recordMetadata.topic()+" partition:"+recordMetadata.partition()); System.out.println("消息发送成功"); } else { System.out.println(String.format("消息发送失败: %s", e.getMessage())); } } }); } producer.close(); }
}
看看生产者的日志:
C:\Program Files\Java\jdk1.8.0_161\bin\java.exe" "-javaagent:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=51023:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\programming\kafka-demo2\target\test-classes;C:\Users\Jensen.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Jensen.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Jensen.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Jensen.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Jensen.m2\repository\org\slf4j\slf4j-simple\1.7.2\slf4j-simple-1.7.2.jar" Producer
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 232840
bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 10
max.block.ms = 3000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'session.timeout.ms' was supplied but isn't a known config.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
record:0
record:1
record:2
record:3
record:4
record:5
record:6
record:7
record:8
record:9
record:10
record:11
。。。。。
record:90
record:91
record:92
record:93
record:94
record:95
record:96
record:97
record:98
record:99
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
消息发送:offset:1382 timestamp:1592221545987 topic:jasen partition:0
消息发送成功
消息发送:offset:1383 timestamp:1592221546002 topic:jasen partition:0
消息发送成功
消息发送:offset:1384 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1385 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1386 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1387 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1388 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1389 timestamp:1592221546003 topic:jasen partition:0
消息发送成功
消息发送:offset:1390 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1391 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1392 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1393 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1394 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1395 timestamp:1592221546004 topic:jasen partition:0
消息发送成功
消息发送:offset:1396 timestamp:1592221546005 topic:jasen partition:0
消息发送成功
消息发送:offset:1397 timestamp:1592221546005 topic:jasen partition:0
消息发送成功
......
消息发送:offset:1476 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1477 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1478 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1479 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1480 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
消息发送:offset:1481 timestamp:1592221546014 topic:jasen partition:0
消息发送成功
Process finished with exit code 0
四、写个消费者的代码--Consumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;public class Consumer {
public static void main(String[] args) {
String topicName = "jasen";
String groupId = "test-group";Properties props = new Properties(); props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002"); props.put("group.id", groupId); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("session.timeout.ms", 30000); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"ckafka-in5yxxxx#Jensen\" password=\"xxxx\";"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅主题 consumer.subscribe(Collections.singletonList(topicName)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } }
}
看是否可以消费到上面生产的100条消息
"C:\Program Files\Java\jdk1.8.0_161\bin\java.exe" "-javaagent:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=51176:D:\program files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\programming\kafka-demo2\target\test-classes;C:\Users\Jensen.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Jensen.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Jensen.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Jensen.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;C:\Users\Jensen.m2\repository\org\slf4j\slf4j-simple\1.7.2\slf4j-simple-1.7.2.jar" Consumer
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test-group
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Discovered group coordinator 111.230.124.164:18006 (id: 2147472928 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 9
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions [jasen-0]
[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=test-group] Fetch offset 942 is out of range for partition jasen-0, resetting offset
offset = 1382, key = 0, value = 0
offset = 1383, key = 1, value = 1
offset = 1384, key = 2, value = 2
offset = 1385, key = 3, value = 3
offset = 1386, key = 4, value = 4
offset = 1387, key = 5, value = 5
offset = 1388, key = 6, value = 6
offset = 1389, key = 7, value = 7
offset = 1390, key = 8, value = 8
offset = 1391, key = 9, value = 9
offset = 1392, key = 10, value = 10
offset = 1393, key = 11, value = 11
offset = 1394, key = 12, value = 12
offset = 1395, key = 13, value = 13
offset = 1396, key = 14, value = 14
offset = 1397, key = 15, value = 15
offset = 1398, key = 16, value = 16
。。。。。就是一百条数据,绝对没少
offset = 1464, key = 82, value = 82
offset = 1465, key = 83, value = 83
offset = 1466, key = 84, value = 84
offset = 1467, key = 85, value = 85
offset = 1468, key = 86, value = 86
offset = 1469, key = 87, value = 87
offset = 1470, key = 88, value = 88
offset = 1471, key = 89, value = 89
offset = 1472, key = 90, value = 90
offset = 1473, key = 91, value = 91
offset = 1474, key = 92, value = 92
offset = 1475, key = 93, value = 93
offset = 1476, key = 94, value = 94
offset = 1477, key = 95, value = 95
offset = 1478, key = 96, value = 96
offset = 1479, key = 97, value = 97
offset = 1480, key = 98, value = 98
offset = 1481, key = 99, value = 99
总结一下:
总体来说怎么样,香不香?还得用了才知道吧,不信你自己搭建一 个kafka集群吧,zookeeper集群也不能少,万一要扩容怎么办? 对于企业来说,一个是要求高可用、扩展性、易维护、重要的是省钱。