如何用Java向kafka发送json数据

如何用Java向kafka发送json数据,第1张

发送json也可以看成字符串处理

We have 2 Options as listed below

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer

Code Reference below

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

public void configure(Map map, boolean b) {

}

public byte[] serialize(String s, Object o) {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream()

ObjectOutputStream oos = new ObjectOutputStream(baos)

oos.writeObject(o)

oos.close()

byte[] b = baos.toByteArray()

return b

} catch (IOException e) {

return new byte[0]

}

}

public void close() {

}

}

And set the value serializer accordingly

<entry key="value.serializer"

value="com.spring.kafka.PayloadSerializer" />

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

Java Object ->String (Preferrably JSON represenation instead of toString)->byteArray

从复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可 *** 作的schema,然后可以通过 field as xx.xx.xx 来使用

version flink 1.13.0

参考

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/json/

https://blog.csdn.net/xianpanjia4616/article/details/112690791

https://blog.csdn.net/YouLoveItY/article/details/108276799

最近我们生产环境的kafka集群有增加节点的需求,然而kafka在新增节点后并不会像elasticsearch那样感知到新节点加入后自动将数据reblance到新集群中,因此这个过程需要我们手动分配。一番折腾之后,实现了增加kafka集群节点并将原有数据均匀分配到扩容后的集群。下面结合一个例子谈一下整个过程。

假定当前的cluster中只有(101,102,103)三个kafka节点,有一个名为think_tank的topic,该topic有2个replica,均匀分布在三个节点上.

我们要做的是在cluster中新增两个节点(记为104,105)后,将的数据均匀分到新集群中的5个节点上。

其实官方文档的这一小节关于集群扩容讲解很详细: Expanding your cluster ,整个过程需要分为三个步骤:获取kafka给出的建议分配方案、按照给出的分配方案执行分配、查看分配的进度以及状态。这三个步骤对应了kafka脚本提供的三个partition reassigment工具。

结合例子具体说明:

脚本的参数是以json文件的形式传入的,首先要新建一个json文件并设置需要分配哪些topic,think_tank-to-move.json:

使用/bin目录中提供的 kafka-reassign-partitions.sh 的脚本请求获取生成分配方案:

--broker-lsit 的参数 "101,102,103,104,105"是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的5台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(01,102,103)迁移到新节点(104,105)实现数据平滑迁移,这时的参数应"104,105".

脚本执行后返回的结果如下:

可以看出当前正在运行的方案中,think_tank的replica都是分布在101,102,103这3个节点,新给出的建议方案中replica均匀分布在扩容后的5个节点中。

将上一个步骤中生成的建议方案复制到新建的think_tank_reassignment.json中:

使用脚本执行:

脚本执行,返回内容:

如上,成功开始执行分配数据,同时提示你如果有需要将之前的分配方案备份便于回滚到原方案。

查看脚本的方法如下,注意这次的json文件要和执行步骤中的json是同一个文件:

返回结果:

is still in progress表示还在处理中,全部迁移成功后每个partition都会显示 completed successfully.注意如果topic数据量大,这个过程可能会时间长一些, 不要轻易重启节点! 可能会导致数据不一致!!!

这个partion reassignment工具同样可以按需手动地将某个特定的topic指定到特定的broker上,所要做的就是按照步骤一给定的格式关联partition到borker即可,如,将think_tank的partition0指定到101、102两节点上:

另外,如果有增加replica的个数的需求,同样可以使用这个脚本,可以翻一下官网文档。

一点儿感触,在确定问题所在后,官方的文档应该作为我们优先考虑的一个重要资料源,网上的资料由于时间较早、版本不同的原因,解决方式可能需要细微的改动才能达到目的,这些坑在官方的一手资料上其实是可以规避的。

欢迎拍砖,欢迎交流~


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/sjk/6748243.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-03-27
下一篇2023-03-27

发表评论

登录后才能评论

评论列表(0条)

    保存