
2. 编写测试代码org.apache.kafka kafka-clients0.11.0.2
package com.demo;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
int totalMessageCount = 10000;
for (int i = 0; i < totalMessageCount; i++) {
String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
producer.send(new ProducerRecord<>("flink-topic", value), new Callback() {
@Override
public void onCompletion(Recordmetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Failed to send message with exception " + exception);
}
}
});
Thread.sleep(1000L);
}
producer.close();
}
private static long currentMemSize() {
return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
}
}
辅助类:
package com.demo;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
public class MemoryUsageExtrator {
private static OperatingSystemMXBean mxBean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
public static long currentFreeMemorySizeInBytes() {
return mxBean.getFreePhysicalMemorySize();
}
}
3. 准备测试环境
启动zookeeper和kafka,并创建主题flink-topic,启动消费者
zkServer.cmd .binwindowskafka-server-start.bat .configserver.properties .binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-topic .binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic flink-topic --from-beginning4. 查看结果
启动程序,可以看到kafka消费者接收到的数据。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)