|
@@ -5,18 +5,18 @@ import org.apache.http.auth.AuthScope;
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
|
|
import org.apache.http.client.CredentialsProvider;
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
|
-import org.bfkj.config.AppConfig;
|
|
|
import org.bfkj.utils.LogUtils;
|
|
|
import org.bfkj.utils.MapTools;
|
|
|
import org.elasticsearch.action.ShardOperationFailedException;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
+import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
|
|
import org.elasticsearch.client.RequestOptions;
|
|
|
import org.elasticsearch.client.RestClient;
|
|
|
import org.elasticsearch.client.RestClientBuilder;
|
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
|
-import org.elasticsearch.xcontent.XContentType;
|
|
|
+import org.elasticsearch.common.xcontent.XContentType;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
@@ -28,18 +28,21 @@ import java.util.stream.Collectors;
|
|
|
/*es协议*/
|
|
|
public class ElasticHandler {
|
|
|
|
|
|
- private RestHighLevelClient restHighLevelClient;
|
|
|
+ private static RestHighLevelClient restHighLevelClient;
|
|
|
|
|
|
- public synchronized Map<String, Object> sendData(String indexName, List<Object> dataContent, String connectConfig, String serviceId) {
|
|
|
+ public boolean sendData(String indexName, List<Object> dataContent, String connectConfig, String serviceId) {
|
|
|
if (Objects.isNull(indexName)) {
|
|
|
- return MapTools.processFail("队列为空: " + dataContent);
|
|
|
+ MapTools.processFail("队列为空: " + dataContent);
|
|
|
+ return true;
|
|
|
}
|
|
|
if (Objects.isNull(dataContent)) {
|
|
|
- return MapTools.processSuccess(null);
|
|
|
+ MapTools.processSuccess(null);
|
|
|
+ return true;
|
|
|
}
|
|
|
- try {
|
|
|
- Map<String, Object> connectConfigMaps = (Map<String, Object>) MapTools.strToObj(connectConfig);
|
|
|
- if (Objects.isNull(restHighLevelClient)) {
|
|
|
+
|
|
|
+ if (Objects.isNull(restHighLevelClient) ){
|
|
|
+ try {
|
|
|
+ Map<String, Object> connectConfigMaps = (Map<String, Object>) MapTools.strToObj(connectConfig);
|
|
|
String scheme = connectConfigMaps.get("scheme").toString();
|
|
|
String hostname = connectConfigMaps.get("hostname").toString();
|
|
|
int port = Integer.parseInt(connectConfigMaps.get("port").toString());
|
|
@@ -56,40 +59,45 @@ public class ElasticHandler {
|
|
|
//无权限验证
|
|
|
restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ MapTools.processFail("es发送参数异常:" + LogUtils.getException(e));
|
|
|
+ return false;
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- return MapTools.processFail("es发送参数异常:" + LogUtils.getException(e));
|
|
|
}
|
|
|
String dataJSON = "";
|
|
|
try {
|
|
|
- IndexRequest request = new IndexRequest(indexName);
|
|
|
- dataJSON = MapTools.objToJSONStr(dataContent);
|
|
|
- request.id(createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.valueOf(MapTools.isBlank(serviceId) ? "0" : serviceId)));
|
|
|
- request.timeout("1s");
|
|
|
- request.source(dataJSON, XContentType.JSON);
|
|
|
- IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
- ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
|
|
- if (shardInfo.getFailed() > 0) {
|
|
|
- String collect = Arrays.stream(shardInfo.getFailures()).map(ShardOperationFailedException::reason).collect(Collectors.joining(";\n"));
|
|
|
- return MapTools.processSuccess(collect);
|
|
|
+ if (dataContent.size() > 0) {
|
|
|
+ IndexRequest request = new IndexRequest(indexName);
|
|
|
+ for (Object data : dataContent) {
|
|
|
+ dataJSON = MapTools.objToJSONStr(data);
|
|
|
+ request.id(createLifeCycleCol(1L, Integer.valueOf(MapTools.isBlank(serviceId) ? "0" : serviceId)));
|
|
|
+ request.timeout("1s");
|
|
|
+ request.source(dataJSON, XContentType.JSON);
|
|
|
+ request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
+ ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
|
|
+ if (shardInfo.getFailed() > 0) {
|
|
|
+ String collect = Arrays.stream(shardInfo.getFailures()).map(ShardOperationFailedException::reason).collect(Collectors.joining(";\n"));
|
|
|
+ MapTools.processSuccess(collect);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
try {
|
|
|
- Thread.sleep(1000 * 6);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
+ restHighLevelClient.close();
|
|
|
+ } catch (IOException ex) {
|
|
|
System.out.println("es数据发送异常: ".concat(LogUtils.getException(e)));
|
|
|
}
|
|
|
- return MapTools.processFail(LogUtils.getException(e));
|
|
|
- } finally {
|
|
|
try {
|
|
|
- if (null != restHighLevelClient) {
|
|
|
- restHighLevelClient.close();
|
|
|
- }
|
|
|
- } catch (IOException ex) {
|
|
|
- System.out.println("es数据发送异常: ".concat(LogUtils.getException(ex)));
|
|
|
+ Thread.sleep(1000 * 6);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ System.out.println("es数据发送异常: ".concat(LogUtils.getException(e)));
|
|
|
}
|
|
|
+ restHighLevelClient = null;
|
|
|
+ return false;
|
|
|
}
|
|
|
- return MapTools.processSuccess("数据发送成功:".concat(dataJSON));
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
private long sequence = 0L;
|