|
@@ -1,165 +0,0 @@
|
|
|
-package org.bfkj.protocol;
|
|
|
-
|
|
|
-import org.apache.http.HttpHost;
|
|
|
-import org.apache.http.auth.AuthScope;
|
|
|
-import org.apache.http.auth.UsernamePasswordCredentials;
|
|
|
-import org.apache.http.client.CredentialsProvider;
|
|
|
-import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
|
-import org.bfkj.utils.MapTools;
|
|
|
-import org.elasticsearch.action.ShardOperationFailedException;
|
|
|
-import org.elasticsearch.action.index.IndexRequest;
|
|
|
-import org.elasticsearch.action.index.IndexResponse;
|
|
|
-import org.elasticsearch.action.support.replication.ReplicationResponse;
|
|
|
-import org.elasticsearch.client.RequestOptions;
|
|
|
-import org.elasticsearch.client.RestClient;
|
|
|
-import org.elasticsearch.client.RestClientBuilder;
|
|
|
-import org.elasticsearch.client.RestHighLevelClient;
|
|
|
-import org.elasticsearch.xcontent.XContentType;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
-
|
|
|
- * Es 算法
|
|
|
- */
|
|
|
-public class EsAlgorithm {
|
|
|
- private final HashMap<String, RestHighLevelClient> restHighLevelClientHashMap = new HashMap<>();
|
|
|
-
|
|
|
-
|
|
|
- * 创建ES客户端
|
|
|
- * @param connectConfig 连接配置信息
|
|
|
- * @return ES客户端
|
|
|
- */
|
|
|
- private Map<String, Object> getRestHighLevelClient(String connectConfig) {
|
|
|
- RestHighLevelClient restHighLevelClient = restHighLevelClientHashMap.get(connectConfig);
|
|
|
- if (null == restHighLevelClient) {
|
|
|
- Map<String, Object> connectConfigMaps = (Map<String, Object>) MapTools.strToObj(connectConfig);
|
|
|
- if (Objects.isNull(connectConfigMaps.get("scheme"))) {
|
|
|
- return MapTools.processFail("请求类型为空,此值应该为http或则https");
|
|
|
- }
|
|
|
- if (Objects.isNull(connectConfigMaps.get("hostname"))) {
|
|
|
- return MapTools.processFail("请求IP为空");
|
|
|
- }
|
|
|
- if (Objects.isNull(connectConfigMaps.get("port"))) {
|
|
|
- return MapTools.processFail("请求端口为空");
|
|
|
- }
|
|
|
- String scheme = connectConfigMaps.get("scheme").toString();;
|
|
|
- String hostname = connectConfigMaps.get("hostname").toString();
|
|
|
- int port = Integer.parseInt(connectConfigMaps.get("hostname").toString());
|
|
|
- Object account = connectConfigMaps.get("auth");
|
|
|
- if (null != account && Objects.equals("true", account)) {
|
|
|
- if (Objects.isNull(connectConfigMaps.get("username"))) {
|
|
|
- return MapTools.processFail("有权限验证用户名为空");
|
|
|
- }
|
|
|
- if (Objects.nonNull(connectConfigMaps.get("password"))) {
|
|
|
- return MapTools.processFail("有权限验证用户密码为空");
|
|
|
- }
|
|
|
- String username = connectConfigMaps.get("username").toString();
|
|
|
- String password = connectConfigMaps.get("password").toString();
|
|
|
-
|
|
|
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
|
- credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
|
|
- RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port, scheme)).setHttpClientConfigCallback((httpAsyncClientBuilder) -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
|
|
|
- restHighLevelClient = new RestHighLevelClient(restClientBuilder);
|
|
|
- } else {
|
|
|
-
|
|
|
- restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
|
|
|
- }
|
|
|
- restHighLevelClientHashMap.put(connectConfig, restHighLevelClient);
|
|
|
- }
|
|
|
- return MapTools.processSuccess(null);
|
|
|
- }
|
|
|
-
|
|
|
- public Map<String, Object> sendMethod(String indexName, String connectConfig, Integer serviceId, String data) {
|
|
|
- RestHighLevelClient restHighLevelClient = null;
|
|
|
- try {
|
|
|
- Map<String, Object> restHighLevelClientMap = getRestHighLevelClient(connectConfig);
|
|
|
- if (restHighLevelClientMap.get("code").equals("-1")) return restHighLevelClientMap;
|
|
|
- restHighLevelClient = restHighLevelClientHashMap.get(connectConfig);
|
|
|
- IndexRequest request = new IndexRequest(indexName);
|
|
|
- request.id(createLifeCycleCol(1L, serviceId));
|
|
|
- request.timeout("1s");
|
|
|
- request.source(data, XContentType.JSON);
|
|
|
- IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
|
|
|
- ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
|
|
- if (shardInfo.getFailed() > 0) {
|
|
|
- return MapTools.processSuccess(Arrays.stream(shardInfo.getFailures()).map(ShardOperationFailedException::reason).collect(Collectors.joining(";\n")));
|
|
|
- } else {
|
|
|
- return MapTools.processFail("数据发送到ES异常");
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- try {
|
|
|
- Thread.sleep(1000 * 6);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- System.out.println("发送到ES出现异常,暂停6分钟时异常");
|
|
|
- }
|
|
|
- return MapTools.processFail(e.getMessage());
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- restHighLevelClientHashMap.remove(connectConfig);
|
|
|
- if (null != restHighLevelClient) {
|
|
|
- restHighLevelClient.close();
|
|
|
- }
|
|
|
- } catch (IOException ex) {
|
|
|
- System.out.println("发送到ES出现异常,暂停6分钟时异常");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- * 支持的最大机器id,结果是31 (这个移位算法可以很快计算出几位二进制数所能表示的最大十进制数)
|
|
|
- */
|
|
|
- private final static String maxWorkerId = "4";
|
|
|
-
|
|
|
- * 支持的最大数据标识id,结果是31
|
|
|
- */
|
|
|
- private final static String maxServiceId = "4";
|
|
|
-
|
|
|
- * 毫秒内序列(0~4095)
|
|
|
- */
|
|
|
- private static final Long maxSequence = 999L;
|
|
|
-
|
|
|
- private static long sequence = 0L;
|
|
|
-
|
|
|
-
|
|
|
- * 上次生成ID的时间截
|
|
|
- */
|
|
|
- private static long lastTimestamp = -1L;
|
|
|
-
|
|
|
-
|
|
|
- * 生命周期ID生成
|
|
|
- *
|
|
|
- * @param workerId
|
|
|
- * @param serviceId
|
|
|
- * @return
|
|
|
- */
|
|
|
- public static synchronized String createLifeCycleCol(Long workerId, Integer serviceId) {
|
|
|
- long timestamp = System.currentTimeMillis();
|
|
|
-
|
|
|
-
|
|
|
- if (lastTimestamp == timestamp) {
|
|
|
- sequence++;
|
|
|
-
|
|
|
- if (sequence > maxSequence) {
|
|
|
-
|
|
|
- sequence = 0;
|
|
|
- while (lastTimestamp == System.currentTimeMillis()) {
|
|
|
- }
|
|
|
- timestamp = System.currentTimeMillis();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- else {
|
|
|
- sequence = 0L;
|
|
|
- }
|
|
|
-
|
|
|
- lastTimestamp = timestamp;
|
|
|
-
|
|
|
- return timestamp + "" + (String.format("%0" + (maxSequence.toString().length()) + "d", sequence)) + (String.format("%0" + maxWorkerId + "d", workerId)) + (String.format("%0" + maxServiceId + "d", serviceId));
|
|
|
- }
|
|
|
-}
|