andy 1 anno fa
parent
commit
58f0e8e74e

+ 9 - 9
src/main/java/com/scbfkj/uni/library/script/DatabaseScriptUtil.java

@@ -11,9 +11,9 @@ import java.util.stream.Collectors;
 
 public class DatabaseScriptUtil {
 
-    public static Map<String, Object> exec(String connectionStr, String expression, List<Map<String, Object>> args, Object executionNumber, List<String> filterColumns, List<Map<String, Object>> filterLines) throws Exception {
+    public static Map<String, Object> exec(String connectionStr, String expression, List<Map<String, Object>> args, Object event, List<String> filterColumns, List<Map<String, Object>> filterLines) throws Exception {
 
-        if (Objects.isNull(executionNumber) || !StringUtils.hasText(executionNumber.toString())) {
+        if (Objects.isNull(event) || !StringUtils.hasText(event.toString())) {
             throw new RuntimeException("执行编号不能为空");
         }
         expression = expression.replaceAll("\\s*(\\r)?\\n\\s*", " ").trim();
@@ -41,7 +41,7 @@ public class DatabaseScriptUtil {
             List<String> valueNames = null;
             List<String> filterNames = null;
 //            查询
-            if (Objects.equals("0", executionNumber)) {
+            if (Objects.equals("0", event)) {
                 Map<String, Object> map = args.get(0);
                 Map<String, Object> filter = ((Map<String, Object>) map.getOrDefault("filter", map));
                 filterNames = filter.keySet().stream().toList();
@@ -84,7 +84,7 @@ public class DatabaseScriptUtil {
                 List<Map<String, Object>> result = DataBase.query(connectionStr, expression, values);
                 return UniReturnUtil.success(result);
 //                更新或新增
-            } else if (Objects.equals("6", executionNumber)) {
+            } else if (Objects.equals("6", event)) {
                 Map<String, Object> map = args.get(0);
                 Map<String, Object> filter = ((Map<String, Object>) map.get("filter"));
                 filterNames = filter.keySet().stream().toList();
@@ -114,13 +114,13 @@ public class DatabaseScriptUtil {
             } else {
 
 //                新增
-                if (Objects.equals("1", executionNumber)) {
+                if (Objects.equals("1", event)) {
                     Map<String, Object> map = args.get(0);
                     Map<String, Object> value = ((Map<String, Object>) map.getOrDefault("value", map));
                     valueNames = value.keySet().stream().toList();
                     expression = "insert into %s ( %s) values(%s)".formatted(expression, String.join(",", valueNames), valueNames.stream().map(it -> "?").collect(Collectors.joining(",")));
 //                    更新
-                } else if (Objects.equals("2", executionNumber)) {
+                } else if (Objects.equals("2", event)) {
                     Map<String, Object> map = args.get(0);
                     Map<String, Object> value = ((Map<String, Object>) map.get("value"));
                     valueNames = value.keySet().stream().toList();
@@ -131,7 +131,7 @@ public class DatabaseScriptUtil {
                             filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and ")),
                             Objects.isNull(filterLineWhereStr)  ? " 1=1 " : filterLineWhereStr);
 //                    删除
-                } else if (Objects.equals("3", executionNumber)) {
+                } else if (Objects.equals("3", event)) {
                     Map<String, Object> map = args.get(0);
                     Map<String, Object> filter = ((Map<String, Object>) map.getOrDefault("filter", map));
                     filterNames = filter.keySet().stream().toList();
@@ -212,10 +212,10 @@ public class DatabaseScriptUtil {
             return UniReturnUtil.success(queryResult);
 
         } else {
-            if (Objects.equals(executionNumber, "0")) {
+            if (Objects.equals(event, "0")) {
                 List<Map<String, Object>> queryResult =  DataBase.query(connectionStr, expression, args, filterColumns, filterLines);
                 return UniReturnUtil.success(queryResult);
-            } else if (Objects.equals(executionNumber, "1")) {
+            } else if (Objects.equals(event, "1")) {
                 if (sqlStrVarList.containsKey(expression)) {
                     getSQLVarList(expression);
                 }

+ 20 - 13
src/main/java/com/scbfkj/uni/process/Kafka.java

@@ -19,10 +19,13 @@ import org.springframework.util.StringUtils;
 
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class Kafka {
-
-    private static final KeyedObjectPool<String, Producer<String, String>> producerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<String, Producer<String, String>>() {
+    private static final KeyedObjectPool<String, Producer<String, String>> producerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
         @Override
         public Producer<String, String> create(String key) throws Exception {
             Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(key);
@@ -40,7 +43,7 @@ public class Kafka {
             return new DefaultPooledObject<>(value);
         }
     });
-    private static final KeyedObjectPool<String, Consumer<String, String>> consumerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<String, Consumer<String, String>>() {
+    private static final KeyedObjectPool<String, Consumer<String, String>> consumerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
         @Override
         public Consumer<String, String> create(String key) throws Exception {
             Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(key);
@@ -57,22 +60,26 @@ public class Kafka {
         }
     });
 
-    public static Map<String, Object> sendMessage(String connection, String topic, Object data) throws Exception {
+    public static Map<String, Object> sendMessage(String connection, String topic, List<String> datas) throws Exception {
 
-        if (Objects.isNull(data)) {
+        if (Objects.isNull(datas) || datas.isEmpty()) {
             return UniReturnUtil.fail("数据为空");
         }
 
         List<Object> sendResult = new ArrayList<>();
-        if (data instanceof List<?> datas) {
-            for (Object o : datas) {
-                sendResult.add(sendMessage(connection, topic, DataFormatUtil.toString(o)));
-            }
-        } else {
-            Producer<String, String> producer = producerPool.borrowObject(connection);
-            sendResult.add(producer.send(new ProducerRecord<>(topic, DataFormatUtil.toString(data))));
-            producerPool.returnObject(connection, producer);
+        Producer<String, String> producer = null;
+        try {
+            producer = producerPool.borrowObject(connection);
+//            多线程发送
+            Producer<String, String> finalProducer = producer;
+            datas.parallelStream().forEach(it -> {
+                sendResult.add(finalProducer.send(new ProducerRecord<>(topic, DataFormatUtil.toString(it))));
+            });
+        } finally {
+            if (Objects.nonNull(producer))
+                producerPool.returnObject(connection, producer);
         }
+
         return UniReturnUtil.success(sendResult);
     }
 

+ 7 - 7
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -72,6 +72,9 @@ public class DataProcessService {
 //                记录生命周期ID
                 data.put("lifecycleid", lifecycleid);
                 data.put("algorithmlibraryid", algorithmlibraryid);
+                HashMap<String, Object> source = new HashMap<>();
+                source.put("args", resource);
+                source.put("algorithm", algorithmLibrary);
                 if (Objects.nonNull(preConditions)) {
                     HashMap<String, Object> preData = new HashMap<>();
                     preResource.add(preData);
@@ -80,7 +83,7 @@ public class DataProcessService {
 
 
                     String preCode;
-                    List<Object> params = getParams(Optional.ofNullable(preparameterset).map(DataFormatUtil::toString).orElse(null), resource);
+                    List<Object> params = getParams(Optional.ofNullable(preparameterset).map(DataFormatUtil::toString).orElse(null), source);
                     Map<String, Object> eval = JsScriptEngineUtil.eval(DataFormatUtil.toString(preConditions), params.toArray());
 
                     preData.put("preResult", eval);
@@ -108,9 +111,6 @@ public class DataProcessService {
 //        获取入参列表
                 parameters = new ArrayList<>();
                 if (Objects.nonNull(parameterSet)) {
-                    HashMap<String, Object> source = new HashMap<>();
-                    source.put("args", resource);
-                    source.put("algorithm", algorithmLibrary);
                     source.put("datasource", datasource);
                     parameters.addAll(getParams(parameterSet.toString(), source));
 //                    算法入参
@@ -132,7 +132,7 @@ public class DataProcessService {
                     throw new RuntimeException(message);
                 }
             }
-            return resource.get(resource.size() - 1);
+            return UniReturnUtil.success(((Map) resource.get(resource.size() - 1).get("result")).get("returnData"));
         } catch (Exception e) {
             message = e.getMessage();
             return UniReturnUtil.fail(e);
@@ -198,13 +198,13 @@ public class DataProcessService {
         for (String it : paths) {
             it = it.trim();
             String type = it.split("\\.")[0];
-            if(!List.of().contains(type)){
+            if(!List.of("string","list","map","array","long","integer","boolean","double","float","datetime").contains(type.toLowerCase())){
                 if(Objects.equals("null",type)) {
                     result.add(null);
                 }else{
                     result.add(type);
                 }
-                    break;
+                    continue;
             }
             String path = it.replace(type + ".", "");
             Object value = null;

+ 15 - 15
src/main/java/com/scbfkj/uni/service/LoggerService.java

@@ -64,20 +64,20 @@ public class LoggerService {
         //调用工具类的获取雪花编号getUniqueNumber(时间戳+当前容器编号+0)
         //打开雪花编号对应的临时Sqlite(日志文件夹下)
         //写入数据
-
-        try {
-            String tableName = "servicelog";
-            String connectionStr = getCurrentThreadConnection(tableName);
-            if (!Objects.equals(returnCode, "0")) {
-                tableName = "serviceerrlog";
-            }
-//            insertLog(connectionStr,
-//                    "insert into %s ('begintime','endtime','serviceid','resource','preresource','returncode','returnmessage','lifecycleid') values (?,?,?,?,?,?,?,?,?)".formatted(tableName),
-//                    List.of(beginTime, endTime, serviceID, resource, preResource, returnCode, returnMessage, dataObjectID));
-        } catch (Exception e) {
-            e.printStackTrace();
-            System.out.println(UniReturnUtil.getMessage(e));
-        }
+//
+//        try {
+//            String tableName = "servicelog";
+//            String connectionStr = getCurrentThreadConnection(tableName);
+//            if (!Objects.equals(returnCode, "0")) {
+//                tableName = "serviceerrlog";
+//            }
+////            insertLog(connectionStr,
+////                    "insert into %s ('begintime','endtime','serviceid','resource','preresource','returncode','returnmessage','lifecycleid') values (?,?,?,?,?,?,?,?,?)".formatted(tableName),
+////                    List.of(beginTime, endTime, serviceID, resource, preResource, returnCode, returnMessage, dataObjectID));
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            System.out.println(UniReturnUtil.getMessage(e));
+//        }
     }
 
     //系统异常日志
@@ -250,7 +250,7 @@ public class LoggerService {
                             DatabaseScriptUtil.exec(target, tableName, result, "1", null, null);
                         }
                         case "KAFKA" -> {
-                            Kafka.sendMessage(target, tableName, result);
+                            Kafka.sendMessage(target, tableName, result.stream().map(DataFormatUtil::toString).toList());
                         }
                         case "ES" -> {
                             Elasticsearch.send(target, tableName, result);

+ 6 - 0
src/main/resources/application-dev.properties

@@ -0,0 +1,6 @@
+db.center.config=hLcDKcDr4MgqYdb8j0gF0nF806yUy1UdEp1nmztEdo5rNL8IZliDj7/feOp2Fc7j19m9jtiwFp5nPvClI1Ni4kxluI8MQepS8nBK3bEzRzsGSswNHa/Sjyw0GK9/ZnOaiD+lDQyI7+fVbmpdvkLy7QE07bpTIjdI1tcLx8Z9QWs=
+db.security.config=Jnj84d14EmSgKEXyAbSH+bratWGkpV89/VA5Er4yQOt7qlnKtGYJzBVJNNYMBdmSlW0G+nqDHMhJQcmHrwbjjChYuGeDcmKSRmvFQ9u7LwqmgEfazzKKoVawXmJ40dMsec2yaFyNnCM92xn1hzHvle5BL7x3kza2htGm+iOqO7Y=
+log.target=B7xSbq4imA5zapX8kEO42mU/5sA2TyF/Ba2Y/++F3z9Np7iT4ywDUkbRC4w/Xrxv1kMSR8PQMJ4dfYwc3mYj0SJJivN5A5/6hI+ZSQBabfZZrYwaIIRdM1XIk4wo1SIrSCXKzef8X6YUH70R2tnh+Uq6KNNp08KaZ2ZXM8vX5Ss=
+server.port=9500
+app.container.code=dev
+app.security.enable=false

+ 7 - 0
src/test/java/com/scbfkj/uni/ControllerBaseTest.java

@@ -1,17 +1,24 @@
 package com.scbfkj.uni;
 
+import jakarta.servlet.http.Cookie;
 import org.junit.jupiter.api.BeforeEach;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.mock.web.MockCookie;
+import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
 @SpringBootTest
+@ActiveProfiles("dev")
 public class ControllerBaseTest {
 
 
     protected MockMvc mockMvc;
 
+    protected Cookie cookie = MockCookie.parse("JSESSIONID=78C3FBDF21FFFBE46B8DD728D5C5DBF7");
+    protected static String token;
+
     @BeforeEach
     public void before (WebApplicationContext webApplicationContext){
         mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();

+ 4 - 8
src/test/java/com/scbfkj/uni/api/SecurityApiTest.java

@@ -2,24 +2,18 @@ package com.scbfkj.uni.api;
 
 import com.jayway.jsonpath.JsonPath;
 import com.scbfkj.uni.ControllerBaseTest;
-import jakarta.servlet.http.Cookie;
 import org.hamcrest.core.IsEqual;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.springframework.http.MediaType;
-import org.springframework.mock.web.MockCookie;
-import org.springframework.mock.web.MockHttpSession;
 import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
 import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
 
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 class SecurityApiTest extends ControllerBaseTest {
-    private Cookie cookie = MockCookie.parse("JSESSIONID=78C3FBDF21FFFBE46B8DD728D5C5DBF7");
-    private static String token;
-
     public void testHello() throws Exception {
 
         /*
@@ -68,8 +62,10 @@ class SecurityApiTest extends ControllerBaseTest {
     void refreshToken() throws Exception {
         mockMvc.perform(MockMvcRequestBuilders.post("/user/refreshToken")
                         .contentType(MediaType.APPLICATION_JSON_VALUE)
-                        .header("token", token)
-                        .cookie(cookie))
+//                        .header("token", token)
+                        .cookie(cookie)
+                                .content("")
+                )
 
                 .andExpect(MockMvcResultMatchers.status().isOk())
                 .andExpect(MockMvcResultMatchers.jsonPath("code", IsEqual.equalTo("0")))

+ 30 - 0
src/test/java/com/scbfkj/uni/api/TableConfigurationControllerTest.java

@@ -0,0 +1,30 @@
+package com.scbfkj.uni.api;
+
+import com.scbfkj.uni.ControllerBaseTest;
+import org.hamcrest.collection.IsCollectionWithSize;
+import org.hamcrest.core.IsEqual;
+import org.junit.jupiter.api.Test;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+
+public class TableConfigurationControllerTest extends ControllerBaseTest {
+
+    @Test
+    void testQuery() throws Exception {
+
+        mockMvc.perform(MockMvcRequestBuilders.post("/openApi/query")
+                        .contentType(MediaType.APPLICATION_JSON_VALUE)
+//                        .header("token", token)
+                        .content("""
+                                {"serviceid":3,"datacontent":{"filter":{"pagecode":"applicationManagementTable"}},"event":"0"}
+                                """)
+                        .cookie(cookie))
+
+                .andExpect(MockMvcResultMatchers.status().isOk())
+                .andExpect(MockMvcResultMatchers.jsonPath("code", IsEqual.equalTo("0")))
+                .andExpect(MockMvcResultMatchers.jsonPath("returnData", IsCollectionWithSize.hasSize(15)))
+                .andDo(MockMvcResultHandlers.print());
+    }
+}