
业务上有大量从硬件采集到的数据通过Kafka入库GreenPlum数据库,虽然数据表已进行分区,每个分区少的有100+万条多的时候有1000+万条记录,现在有一个接口要获取最新的20条数据用来展示,即便是从单个分区上查询由于需要全量数据排序,时间长的时候需要7~8秒,这个时候就考虑直接从Kafka获取最新数据。
2.代码实现 2.1 配置信息这里只贴出使用到的配置信息。
# kafka的服务地址
spring:
kafka:
bootstrap-servers: 127.0.0.1:xxxx
# tableName与topic的映射
tableNameKafkaTopic:
mapping: "{"table_name":"topic_name"}"
2.2 映射对象cn.hutool hutool-all5.6.6
由于Kafka内的字段跟数据库的字段名称不同,这里要创建映射关系(仅保留几个字段用来说明问题)。
@Data
@ApiModel(value = "数据封装对象", description = "用于对Kafka内的数据进行封装")
public class DataRes implements Serializable {
@ApiModelProperty(name = "LOCATION", value = "设备位置")
@JsonProperty(value = "LOCATION")
private String location;
@ApiModelProperty(name = "IP", value = "设备ID")
@JsonProperty(value = "IP")
private String equip;
@ApiModelProperty(name = "TME", value = "创建时间")
@JsonProperty(value = "TME")
private String tme;
@ConstructorProperties({"LOCATION", "IP", "TME"})
public DataGsmRes(String location, String ip, String tme) {
this.location = location;
this.equip = ip;
this.tme = tme;
}
}
Kafka的记录信息:
{"LOCATION":"河南郑州","IP":"xxxx","TME":"2022-01-12 15:29:55"}
接口返回的数据:
{
"location": "河南郑州",
"equip": "xxxx",
"tme": "2022-01-12 15:29:55"
}
2.3 代码实现
为了简洁删掉了一些业务相关的代码。
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${tableNameKafkaTopic.mapping}")
private String tableNameKafkaTopicMapping;
@Override
public baseResult> queryNewest(Map mapParam) {
// 参数解析(根据tableName获取对应的Kafka主题)
String tableName = MapUtils.getString(mapParam, "table_name", "");
if (StringUtils.isBlank(tableName)) {
return baseResult.getInstance(101, "数据源参数table_name不能为空!");
}
// 获取equip信息用来筛选数据
JSonObject jsonParam = new JSONObject();
try {
String paramStr = MapUtils.getString(mapParam, "param_json", "");
JSonObject json = JSONObject.parseObject(paramStr);
if (json != null) {
for (String key : json.keySet()) {
jsonParam.put(key.toLowerCase(), json.get(key));
}
}
} catch (Exception e) {
return baseResult.getInstance(102, "请求参数param_json非JSON格式");
}
Object equip = jsonParam.get("equip");
if (equip == null || StringUtils.isBlank(equip.toString())) {
return baseResult.getInstance(101, "请求参数param_json内的equip不能为空!");
}
List equipList = Arrays.asList(equip.toString().split(","));
// 从Kafka获取的符合条件的数据条数 equipKey用于筛选数据 timeKey用于排序
String equipKey = "IP";
String timeKey = "TME";
int pageSize = MapUtils.getInteger(mapParam, "pageSize");
int querySize = 1000;
int queryTime = 50;
int queryTotal = querySize * queryTime;
// 结果数据封装
List
3.算法分析
- 筛选的是查询时最大偏移量向前queryTotal条数据,这个可以根据业务进行调整。重新封装Kafka数据的算法实际上是修改map对象key的方法。max.poll.records参数明确了每次poll的记录数便于统计。特别注意:当前代码适用的是Partition只有1️⃣个的情况,多个分区的情况需要先查询分区数,再轮询获取每个分区的最新数据,合并后重新排序。
// 仅查询了1个分区 TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0); // 获取主题的分区列表 ListpartitionInfoList = consumer.partitionsFor(topicName.toString()); //Partition(topic = gp_gsmdata, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)