pms 2 жил өмнө
parent
commit
da53aa0990

+ 26 - 13
mainFactory/src/main/java/org/bfkj/application/DataProcess.java

@@ -101,7 +101,7 @@ public class DataProcess {
     Long begin = 0L;
 
     public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
-         //事件判断由算法进行,不再作为入口必须条件  ?
+        //事件判断由算法进行,不再作为入口必须条件  ?
         if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
             LogUtils.log("processData:1", "-1", null, "服务不可用" + errorMessage, serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(inputData), null, null, null);
             return processFail("服务不可用" + errorMessage, null);
@@ -167,7 +167,7 @@ public class DataProcess {
         Map<String, List<Map<String, Object>>> execResult = execCalultion(calculationResult, null, event, dataObjectId, inputData.get("page"), inputData.get("pageSize"));
         calculationResult = execResult.get("calcData");//执行当前服务对应的算法库
         Map<String, Object> lastResult = calculationResult.get(calculationResult.size() - 1);//获取最后一个计算结果
-        String libraryId = lastResult.get("library_id").toString();
+        String libraryId = lastResult.containsKey("library_id") ? lastResult.get("library_id").toString() : null;
         if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
             Object message = lastResult.get("message");
             LogUtils.log("processData:3", "-1", libraryId, Objects.nonNull(message) ? message.toString() : null, serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(calculationResult), dataObjectId, MapTools.jacksonObjToStr(execResult.get("calcData")), event);
@@ -321,7 +321,7 @@ public class DataProcess {
                     return returnData;
                 }
                 //算法取参
-                List<Map<String, Object>> dbPrams = new ArrayList<>();
+                List<Map<String,Object>> dbPrams = new ArrayList<>();
                 Object tempDBPrams = null;
                 try {//添加try捕获异常
                     if (Objects.isNull(calculationLibrary.get("parmIndex"))) {//未配置取参则默认取上一个算法
@@ -331,17 +331,29 @@ public class DataProcess {
                         tempDBPrams = calcData;//默认等于全量参数
                         String[] parmSplit = calculationLibrary.get("parmIndex").toString().split("\\.");//按.进行分割
                         for (String item : parmSplit) {//循环提取参数
-                            if (MapTools.isNumber(item)) {//数字代表数组中的位置
-                                tempDBPrams = ((List<?>) tempDBPrams).get(Integer.parseInt(item));
+                            if (MapTools.isNumber(item) && tempDBPrams instanceof List<?> tempList) {//数字代表数组中的位置
+                                tempDBPrams = tempList.get(Integer.parseInt(item));
                             } else {
-                                tempDBPrams = ((Map) tempDBPrams).get(item);
+                                if (tempDBPrams instanceof Map<?,?> tempMap){
+                                    tempDBPrams = tempMap.get(item);
+                                }
                             }
                         }
                     }
-                    if (tempDBPrams instanceof Map) {//最终参数是Map则转换为List<Map<String, Object>>
-                        dbPrams.add((Map) tempDBPrams);
-                    } else if (tempDBPrams instanceof List) {
-                        dbPrams = (List<Map<String, Object>>) tempDBPrams;
+                    if (tempDBPrams instanceof Map tempMap) {//最终参数是Map则转换为List<Map<String, Object>>
+                        if (tempMap.isEmpty()) {
+                            calcData.add(new HashMap<>());
+                            returnData.put("calcData", calcData);
+                            return returnData;
+                        }
+                        dbPrams.add(tempMap);
+                    } else if (tempDBPrams instanceof List tempList && tempList.get(0) instanceof Map<?,?> tempMap) {
+                        if (tempMap.isEmpty()) {
+                            calcData.add(new HashMap<>());
+                            returnData.put("calcData", calcData);
+                            return returnData;
+                        }
+                        dbPrams = tempList;
                     }
                 } catch (Exception e) {
                     calcData.add(processFail("入参数据格式错误:不是List<Map<String,Object> " + tempDBPrams, library_id));
@@ -358,9 +370,10 @@ public class DataProcess {
             } else {
                 currentResult = execEngine(calculationLibrary, inData, dataObjectId);
             }
-            if (!currentResult.get("code").equals("0")) {//算法执行异常
-                currentResult.put("library_id", library_id);
-            }
+//            if (!currentResult.get("code").equals("0")) {//算法执行异常
+//                currentResult.put("library_id", library_id);
+//            }
+            currentResult.put("library_id", library_id);
             calcData.add(currentResult);
             returnData.put("calcData", calcData);
 //            System.out.println("算法ID : " + library_id + "  执行时间为:" + (System.currentTimeMillis() - l));

+ 10 - 12
mainFactory/src/main/java/org/bfkj/protocol/MyKafKa.java

@@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.bfkj.config.AppConfig;
 import org.bfkj.utils.MapTools;
 
 import java.time.Duration;
@@ -22,7 +21,7 @@ public class MyKafKa {
      *
      * @return 读取内容信息
      */
-    public Map<String,Object> readMethod(String sourceObjectName, String library_id, String connectConfig) {
+    public Map<String, Object> readMethod(String sourceObjectName, String library_id, String connectConfig, String  pollNumber) {
         if (Objects.isNull(sourceObjectName)) {
             return MapTools.processFail("队列为空");
         }
@@ -31,7 +30,7 @@ public class MyKafKa {
         if (!connectConfigMaps.containsKey("group.id")) {
             connectConfigMaps.put("group.id", "groupid" + (String.format("%s", library_id)));
         }
-        connectConfigMaps.put("max.poll.records", 1);
+        connectConfigMaps.put("max.poll.records", pollNumber == null ? 1 : Integer.parseInt(pollNumber));
         connectConfigMaps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         connectConfigMaps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
@@ -41,13 +40,12 @@ public class MyKafKa {
         }
 //        System.out.println(consumer +"   library_id: " + library_id);
         //todo 1. 如果一次行读取多条 2.偏移修改  : // Duration.ofSeconds(1): 如果没有数据则此处会等待1秒
-        ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(1));
+        ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(1));
         int count = records.count();
-        Object message = null;
+        String message = "";
         if (count > 0) {
             for (ConsumerRecord<String, Object> record : records.records(sourceObjectName)) {
-                message = record.value();
-                break;
+                message = message.concat(record.value().toString());
             }
         }
         return MapTools.processSuccess(message);
@@ -58,11 +56,11 @@ public class MyKafKa {
      *
      * @return 发送结果信息
      */
-    public Map<String,Object> sendMethod(String sourceObjectName, List<Object> dataContent, String connectConfig) {
+    public Map<String, Object> sendMethod(String sourceObjectName, List<Object> dataContent, String connectConfig) {
         if (Objects.isNull(sourceObjectName)) {
             return MapTools.processFail("队列为空: " + dataContent);
         }
-        if (Objects.isNull(dataContent)){
+        if (Objects.isNull(dataContent)) {
             return MapTools.processSuccess(null);
         }
         Map<String, Object> connectConfigMaps = MapTools.mapStrToMap_connect(connectConfig);
@@ -77,12 +75,12 @@ public class MyKafKa {
         List<Object> sendResult = new ArrayList<>();
         for (Object o : dataContent) {
             try {
-                sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName,objectMapper.writeValueAsString(o))));
+                sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName, objectMapper.writeValueAsString(o))));
             } catch (Exception e) {
-                sendResult.add( producer.send(new ProducerRecord<>(sourceObjectName, o.toString())));
+                sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName, o.toString())));
             }
         }
-         return MapTools.processSuccess(sendResult);
+        return MapTools.processSuccess(sendResult);
     }
 
     /**

+ 7 - 19
mainFactory/src/main/java/org/bfkj/utils/MapTools.java

@@ -368,7 +368,7 @@ public class MapTools implements Serializable {
         ObjectMapper mapper = new ObjectMapper();
         try {
             mapper.registerModule(new JavaTimeModule());
-            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,false); //关闭
+            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); //关闭
             return mapper.writeValueAsString(mapStr);
         } catch (JsonProcessingException e) {
             return null;
@@ -378,31 +378,19 @@ public class MapTools implements Serializable {
     public static String jacksonObjToStr(Object obj) { //"[1111]","[{"A":"B"}]"
         ObjectMapper mapper = new ObjectMapper();
         try {
-            if (null ==  obj){
+            if (null == obj) {
                 return "";
             }
-            mapper.registerModule(new JavaTimeModule());
-            if (!(obj instanceof String) && obj.toString().contains("{") && obj.toString().contains("}")||
-                    !(obj instanceof String) && obj.toString().contains("[") && obj.toString().contains("]")){
-                return mapper.writeValueAsString(obj);
+            if (obj instanceof String str) {
+                return str;
             }
-             return obj.toString();
+            mapper.registerModule(new JavaTimeModule());
+            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); //关闭
+            return  mapper.writeValueAsString(obj);
         } catch (JsonProcessingException e) {
             return "日志数据转json字符串异常: " + e.getMessage() + ": " + obj.toString();
         }
     }
-
-
-    /**
-     * 验证字符串是否是map
-     *
-     * @param mapStr map字符串
-     * @return true:是map字符串,false不是map字符串
-     */
-    public static boolean isMap(String mapStr) {
-        return null != jsonStringToMap(mapStr);
-    }
-
     /**
      * list为空
      *