It is not secure to run with OAUTHBEARER mechanism in SASL_PLAINTEXT, we need the SSL to encrypt the data transportation.
For SSL settings, please refer to How to run kafka in SASL_SSL Mode
Suppose that we have all the keystore, truststore, certificate, private-key ready.
- Modify the server.properties file as below
listeners=SASL_SSL://localhost:9093 advertised.listeners= SASL_SSL://localhost:9093 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=OAUTHBEARER sasl.enabled.mechanisms=OAUTHBEARER
Specify the JAAS login context name for SASL/OAUTHBEARER
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="alice";
ssl configurations
ssl.keystore.location=/path_to/kafka.keystore
ssl.keystore.type=pkcs12
ssl.keystore.password=yourpass
ssl.key.password=yourpass
ssl.truststore.location=/path_to/kafka.truststore
ssl.truststore.type=pkcs12
ssl.truststore.password=yourpassssl.client.auth is probably not needed in SASL mode
ssl.client.auth=required
- Then start the zookeeper and kafka-server in different consoles, now the kafka server is setup correctly and running
// .bat script
zookeeper-server-start.bat .\config\zookeeper.properties
kafka-server-start.bat .\config\server.properties
// .sh script
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
- Next we need to modify the consumer.properties/producer.properties to allow connecting to port 9093 with protocol SASL_SSL, you can aslo copy the following content to a file 'client.properties' for kafka-topics.bat to use.
bootstrap.servers=localhost:9093
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="alice";
#ssl configurations
ssl.truststore.location=/path_to/kafka.truststore
ssl.truststore.type=pkcs12
ssl.truststore.password=yourpass
#the following keystore setting are probably not needed
#ssl.keystore.location=/path_to/kafka.keystore
#ssl.keystore.type=pkcs12
#ssl.keystore.password=yourpass
- Create and List topic with port 9093 in SASL_SSL mode
// .bat script
kafka-topics.bat --create --topic gaming-events --bootstrap-server localhost:9093 --command-config ./config/client.properties
kafka-topics.bat --list --bootstrap-server localhost:9093 --command-config ./config/client.properties
// .sh script
./bin/kafka-topics.sh --create --topic gaming-events --bootstrap-server localhost:9093 --command-config ./config/client.properties
./bin/kafka-topics.sh --list --bootstrap-server localhost:9093 --command-config ./config/client.properties
- Run Consumer with port 9093 in SASL_SSL mode
// .bat script
kafka-console-consumer.bat --topic gaming-events --from-beginning --bootstrap-server localhost:9093 --consumer.config ./config/consumer.properties
// .sh script
./bin/kafka-console-consumer.sh --topic gaming-events --from-beginning --bootstrap-server localhost:9093 --consumer.config ./config/consumer.properties
- Run Producer with port 9093 in SASL_SSL mode
// .bat script
kafka-console-producer.bat --topic gaming-events --bootstrap-server localhost:9093 --producer.config ./config/producer.properties
// .sh script
./bin/kafka-console-producer.sh --topic gaming-events --bootstrap-server localhost:9093 --producer.config ./config/producer.properties
Now you are good to communicate between producer and consumer on secured port 9093 in SAL_SSL mode using oauthbearer token to authenticate.