
通常ES存储数据是直接插入JSON数据(此 *** 作在另一篇博文中记录:ES中JSON文档存储),而在实际Java开发中一般定义数据接收对象是个实体对象,对ES进行新增修改 *** 作时就需要进行进一步的转换;
1.新增保存
public boolean save(T entity) {
boolean result = false;
String index = indexName(entity);
Assert.notNull(index, INDEX_NULL);
IndexRequest request = new IndexRequest(index).source(removeNull(entity));
if(isNotEmpty(entity.get_id())){
//指定id,如果ES中存在则更新,不存在则新增
//不指定时,ES自己生成唯一主键
request.id(entity.get_id());
}
if(isNotEmpty(entity.getRouting())){
//指定routing,在插入父子文档数据时为必需
request.routing(entity.getRouting());
}
try {
client.index(request, RequestOptions.DEFAULT);
result = true;
} catch (IOException e) {
log.error("保存数据失败:", e);
}
return result;
}
entity转map,过滤null
private MapremoveNull(T entity) { BeanMap beanMap = BeanMap.create(entity); Map map = new HashMap<>(); beanMap.forEach((key, value) -> { if (null != value && !EXCLUDE_FIELD.contains(String.valueOf(key))) { if(value instanceof List){ map.put(String.valueOf(key), parseArray(JSON.toJSonString(value))); }else if(value instanceof EvntRel){ map.put(String.valueOf(key), JSON.toJSON(value)); }else{ map.put(String.valueOf(key), value); } } }); return map; }
ES的entity主类
public class ESEntity implements Serializable {
protected static final long serialVersionUID = 1L;
protected String id;
private String bucket;//用于分组聚合存储聚合的key
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
protected int size = 10;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
protected int current = 1;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
protected transient Page page;
public Page getPage() {
if (page == null) {
page = new Page(current, size);
}
return page;
}
}
public class ESbaseEntity extends ESEntity {
private String _id;
private String _index;
private String routing = "";
}
public BulkResponse bulkAddOrUpdate(List list) {
Assert.notEmpty(list, BULK_LIST_NULL);
Assert.notNull(indexName(list.get(0)), INDEX_NULL);
BulkRequest bulkRequest = new BulkRequest();
for (T obj : list) {
IndexRequest indexRequest = new IndexRequest(obj.get_index()).source(removeNull(obj));
if(isNotEmpty(obj.get_id())){
indexRequest.id(obj.get_id());
}
if(isNotEmpty(obj.getRouting())){
indexRequest.routing(obj.getRouting());
}
bulkRequest.add(indexRequest);
}
try {
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("批量插入数据失败:", e);
}
return null;
}
3.更新指定文档
public boolean updateById(String id, T entity) {
String index = indexName(Objects.requireNonNull(entity, "对象不能为null"));
return updateById(index, id, entity, entity.getRouting());
}
public boolean updateById(String index, String id, T entity, String routing) {
boolean result;
UpdateRequest request = new UpdateRequest(index, id).doc(removeNull(entity));
if (isNotEmpty(routing)) {
request.routing(routing);
}
try {
client.update(request, RequestOptions.DEFAULT);
result = true;
} catch (Exception e) {
log.error("指定索引更新失败,Exception:", e);
result = false;
}
return result;
}
3.根据查询条件批量更新文档
public boolean updateByQuery(String index, QueryBuilder queryBuilder, script script) {
boolean result = false;
try {
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setQuery(queryBuilder);
request.setscript(script);
request.setBatchSize(BATCH_NUM); //documents processed per batch default = 1000 大了,意味着消耗更多的内存
client.updateByQuery(request, RequestOptions.DEFAULT);
result = true;
} catch (IOException e) {
log.error("更新失败,Exception:", e);
}
return result;
}
4.大量更新
public BulkByScrollResponse updateByQueryResponse(String index, QueryBuilder queryBuilder, script script) {
Assert.notNull(index, INDEX_NULL);
try {
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setQuery(queryBuilder);
request.setscript(script);
request.setSize(MAX_NUM);//最大值-自己设置,一般10000
request.setBatchSize(BATCH_NUM); //批处理数量 documents processed per batch default = 1000 大了,意味着消耗更多的内存
return client.updateByQuery(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("更新失败,Exception:", e);
}
return null;
}
5.批处理
private boolean batch(final String[] ids, final List list, final T entity, final Action action) {
BulkProcessor processor = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
processorListener).build();
if (action.equals(Action.UPDATE)) {
if (!CollectionUtils.isEmpty(list)) {
log.info("多值批量更新");
list.forEach(e -> processor.add(new UpdateRequest(indexName(e), e.getId()).doc(removeNull(entity))));
} else {
log.info("单值批量更新");
Assert.notNull(ids, PARAM_IDS_NULL);
Stream.of(ids)
.forEach(id -> processor.add(new UpdateRequest(indexName(entity), id).doc(beanToMap(entity))));
}
} else if (action.equals(Action.DELETE)) {
log.info("批量删除");
if (!CollectionUtils.isEmpty(list)) {
list.forEach(e -> processor.add(new DeleteRequest(indexName(e), e.getId())));
} else {
Assert.notNull(ids, PARAM_IDS_NULL);
String index = indexName(entity);
Stream.of(ids).forEach(id -> processor.add(new DeleteRequest(index, id)));
}
} else {
log.info("参数错误没有执行,ids:{},list:{},entity:{},action:{}", ids, list, entity, action);
}
try {
processor.awaitClose(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("bulkProcessor关闭异常:", e);
}
return true;
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)