
配置SASL/PLAINTEXT MECHANISM为SCRAM-SHA512
Kafka加密 配置SASL+ACL 一、 SASL配置 1. 修改zoo.cfg配置文件,开启zk的SASL认证requireClientAuthScheme=sasl
2. 创建kafka-broker-jaas.conf文件,为kafka添加认证信息kafksServer 中的username和password是broker之间通信
Client 是客户端的username和password,除了配置文件方式,也可以通过命令创建(后面会讲)
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin1234";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafka1234";
};
3. 在kafka的sasl_server.properties配置文件中开启SASL认证
#设置本例中admin为超级用户 super.users=User:admin;User:kafka #启用SCRAM机制,采用SCRAM-SHA-512算法 sasl.enabled.mechanisms=SCRAM-SHA-512 #为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 #broker间通讯使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT #配置listeners使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://192.168.1.88:9092 #配置advertised.listeners advertised.listeners=SASL_PLAINTEXT://192.168.1.88:9092 security.protocol=SASL_SSL4. 修改kafka启动脚本,加载指定的properties文件 及 读取认证配置文件,这一行代码 放在zkEnv.sh脚本的最前面
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/impdatahd/kafka_2.12-2.4.0/config/kafka-broker-jaas.conf"
5.sh kafka-start.sh 启动kafka server,并验证启动成功 二、 ACL配置 1. 修改zoo.cfg配置文件,开启zk的ACL认证authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
2. 在kafka的sasl_server.properties配置文件中开启ACL认证# 启用ACL authorizer.class.name=kafka.security.authorizer.AclAuthorizer3. ACL动态创建用户、分配用户组 及 topic读、写赋权命令
创建账号:
sh kafka-configs.sh --zookeeper localhost:2181/kafka240 --alter --add-config 'SCRAM-SHA-512=[password=sasl_user_1_pwd]' --entity-type users --entity-name sasl_user_1
添加账号写权限:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:sasl_user_1 --operation Write --topic kafka_sasl_2
添加账号读权限:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:test3read --operation Read --topic kafka_sasl_6
创建Group:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:sasl_user_1 --group kafka-acls-group4. 验证步骤3是否正确,通过kafka自带的producer、consumer命令测试
4.1 创建生产者配置文件 producer.properties
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka1234";
4.2 创建消费者配置文件 consumer.properties
bootstrap.servers=localhost:9092 group.id=kafka-acls-group security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test3read" password="test3read1234";
4.3 生产 和 消费命令
生产:kafka-console-producer.sh --broker-list loyx01:9092 --topic sasl_test_one --producer.config /home/impdatahd/kafka_2.12-2.4.0/config/p_sasl.properties
消费:kafka-console-consumer.sh --bootstrap-server loyx01:9092 --topic behavior_log_andr_test --consumer.config /home/impdatahd/kafka_2.12-2.4.0/config/c_sasl.properties
三、 常见问题
- kafka启动时报错: ERROR SASL authentication failed using login context ‘Client’ with exception: {}
解决:
- 使用windows拖拽的方式会导致有无法看见的结束符,不识别conf文件而造成失败。启动时没有加载zookeeper_sasl.conf文件报错
利用flume进行采集需要做以下修改。
1)创建jaas文件
需要和kafka配置保持一致!
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="youradminusername"
password="youradminpwd";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="yourusername"
password="yourpwd";
};
2)修改flume-env.sh
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/home/impdatahd/flume-1.9.0/conf/kafka-broker-jaas.conf"
3)编写flume脚本
# 命名每个组件 a1代表agent的名称 #a1.sources代表a1中配置的source,多个使用空格间隔 #a1.sinks代表a1中配置的sink,多个使用空格间隔 #a1.channels代表a1中配置的channel,多个使用空格间隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 1000 a1.sources.r1.batchDurationMillis = 1000 a1.sources.r1.kafka.bootstrap.servers = 192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092 a1.sources.r1.kafka.topics = kafka_sasl_1 a1.sources.r1.kafka.consumer.group.id = kafka-acls-group a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.r1.kafka.consumer.sasl.mechanism = SCRAM-SHA-512 a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="yourusername" password="yourpwd"; a1.sources.r1.kafka.consumer.auto.offset.reset = earliest #channel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 6912212 # 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=/origin_data/test/sasl_test/%Y-%m-%d #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = sasl- a1.sinks.k1.hdfs.batchSize= 7500 a1.sinks.k1.hdfs.minBlockReplicas=1 #配置文件滚动 # 30MIN a1.sinks.k1.hdfs.rollInterval = 1800 #128M after codec a1.sinks.k1.hdfs.rollSize = 1580484745 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType=DataStream # 绑定和连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动flume即可!
采集后如图所示
1)exporter
利用github开源项目做数据收集,分布式部署到kafka所在节点。
exporter开源项目 采用版本v.1.13.1
将源文件编译后得到其脚本文件,给予可执行权限后运行以下语句开启监控。
grafana面板地址
ID:10736
下面是启停命令。
# --kafka.server=kafka_broker_address
# --kafka.version=kafka_version
# --log.leve=日志等级
nohup ./kafka_exporter
--kafka.server=192.168.1.88:9092
--kafka.server=192.168.1.89:9092
--kafka.server=192.168.1.90:9092
--kafka.version=2.4.0
--sasl.enabled
--sasl.mechanism=scram-sha512
--sasl.username=admin
--sasl.password=admin1234
--tls.insecure-skip-tls-verify
--log.level=info > kafka_exporter.log
--web.listen-address=:29092 &
ps -ef | grep kafka_exporter| grep -v grep | awk '{print $2}'| xargs kill
2)promethus
需要对promethus做相关配置。
global:
scrape_interval: 60s
evaluation_interval: 60s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
labels:
instance: prometheus
- job_name: 'linux'
metrics_path: "/metrics"
static_configs:
- targets: ['192.168.1.88:9100','192.168.1.89:9100','192.168.1.90:9100','192.168.1.91:9100']
- job_name: 'kafka_exporter'
metrics_path: "/metrics"
scrape_interval: 5s
static_configs:
- targets: ['192.168.1.88:29092','192.168.1.89:29092','192.168.1.90:29092']
- job_name: 'flume_exporter'
metrics_path: "/metrics"
scrape_interval: 5s
static_configs:
- targets: ['192.168.1.89:9360','192.168.1.90:9360
测试
javademo关键代码
application.yml
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: 192.168.1.88:9092
group-id: kafka_test
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 192.168.1.88:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
retries: 3
buffer-memory: 33554432
batch-size: 16384
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-512
ssl.endpoint.identification.algorithm: ""
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kafka' password='kafka1234';
Controller
package com.imprexion.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}
}
Producer
package com.imprexion.test;
import javafx.scene.input.DataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "kafka_sasl_1";
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
for (int i = 0; i < 10000; i++) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = simpleDateFormat.format(new Date());
long time = System.currentTimeMillis();
message = "{ "a": " " + format + ""," +
" "app_version": "4.2.1"," +
" "device_id": "1f43f912c2"," +
" "e": "faceID_launch"," +
" "faceImage": ""," +
" "p": {" +
" "package": "com.imprexion.member"," +
" "page": "com.Orbbec.MagicSalad2"" +
" "valume": "" + i + "," +
" }," +
" "package_name": "com.imprexion.service.facerecognition"," +
" "pre_login_id": "1f43f912c2_1626330120074"," +
" "source_channel": "com.imprexion.aibar"," +
" "t": "" + time + "," +
" "uid": -1," +
" "v": 1," +
" "st":"" + time +
"}";
this.kafkaTemplate.send(TOPIC, message);
}
}
}
Consumer
package com.imprexion.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "kafka_sasl_1", groupId = "kafka-acls-group")
public void consume(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
Application
package com.imprexion.sendmsg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = {"com.imprexion.test"})
public class SendmsgApplication {
public static void main(String[] args) {
SpringApplication.run(SendmsgApplication.class, args);
}
}
访问地址
http://localhost:9000/kafka/publish?message=hello
监控图表展示
数据生产前
数据生产后
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)