Эх сурвалжийг харах

ibmmq 部署发送数据调试通过

andy 1 жил өмнө
parent
commit
a813368565

+ 23 - 23
pom.xml

@@ -24,29 +24,6 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.mysql</groupId>
-            <artifactId>mysql-connector-j</artifactId>
-            <scope>runtime</scope>
-        </dependency>
-        <!-- https://mvnrepository.com/artifact/cn.com.kingbase/kingbase8 -->
-        <dependency>
-            <groupId>cn.com.kingbase</groupId>
-            <artifactId>kingbase8</artifactId>
-            <version>8.6.0</version>
-            <scope>runtime</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-test</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
@@ -120,6 +97,29 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/cn.com.kingbase/kingbase8 -->
+        <dependency>
+            <groupId>cn.com.kingbase</groupId>
+            <artifactId>kingbase8</artifactId>
+            <version>8.6.0</version>
+            <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

+ 8 - 1
src/main/java/com/scbfkj/uni/library/DataFormatUtil.java

@@ -94,7 +94,7 @@ public final class DataFormatUtil {
         return LocalDateTime.parse(dateTimeStr);
     }
 
-    public static List<Object> toList(Object value) {
+    public static List toList(Object value) {
         if (Objects.isNull(value)) {
             return null;
         }
@@ -107,6 +107,13 @@ public final class DataFormatUtil {
             result.addAll(Arrays.asList(((Object[]) value)));
             return result;
         }
+        if (value instanceof String) {
+            try {
+                return getObjectMapper().readValue(value.toString(), List.class);
+            } catch (JsonProcessingException e) {
+                return Collections.singletonList(value);
+            }
+        }
         return Collections.singletonList(value);
     }
 

+ 5 - 2
src/main/java/com/scbfkj/uni/library/script/JsScriptEngineUtil.java

@@ -13,6 +13,7 @@ import org.graalvm.polyglot.Source;
 import org.graalvm.polyglot.Value;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -44,12 +45,14 @@ public final class JsScriptEngineUtil {
     }, new GenericKeyedObjectPoolConfig<>());
 
 
-    public static Map<String, Object> eval(String script, Object... args) throws Exception {
+    public static Map<String, Object> eval( Object[] args) throws Exception {
         Value function = null;
+        String script = ((String) args[0]);
+        List<Object> arg = Arrays.stream(args).skip(1).toList();
         try {
             function = scriptPool.borrowObject(script);
             if (function.canExecute()) {
-                Value result = function.execute(Arrays.stream(args).map(Value::asValue).toArray());
+                Value result = function.execute(arg.stream().map(Value::asValue).toArray());
                 return UniReturnUtil.success(toHostObject(result));
             } else {
                 return UniReturnUtil.success(toString(function));

+ 13 - 13
src/main/java/com/scbfkj/uni/process/Kafka.java

@@ -8,13 +8,11 @@ import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.util.StringUtils;
 
 import java.time.Duration;
@@ -75,8 +73,10 @@ public class Kafka {
 
     public static Map<String, Object> receptionMessage(String connectConfig, String topic, String groupId) throws Exception {
         Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(connectConfig);
+            connectConfigMaps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+            connectConfigMaps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         if (Objects.nonNull(groupId)) {
-            connectConfigMaps.put("group.id", groupId);
+            connectConfigMaps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         }
 
         String key = DataFormatUtil.toString(connectConfigMaps);
@@ -84,16 +84,16 @@ public class Kafka {
         try {
             List<String> messageList = new ArrayList<>();
             consumer.subscribe(Collections.singleton(topic));
-            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-            if (records.count() > 0) {
-                for (ConsumerRecord<String, String> record : records.records(topic)) {
-                    String readValue = record.value().trim();
-                    if (!StringUtils.hasLength(readValue) || readValue.equals("{}")) {
-                        continue;
-                    }
-                    messageList.add(readValue);
+            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+            for (ConsumerRecord<String, String> record : records.records(topic)) {
+                String readValue = record.value().trim();
+                if (!StringUtils.hasLength(readValue) || readValue.equals("{}")) {
+                    continue;
                 }
+                messageList.add(readValue);
             }
+            consumer.commitAsync();
+
             System.out.println(DataFormatUtil.toString(messageList));
             return UniReturnUtil.success(messageList);
         } finally {

+ 13 - 6
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -132,7 +132,8 @@ public class DataProcessService {
                     String preCode;
 //                    前置算法参数
                     List<Object> params = getParams(Optional.ofNullable(preparameterset).map(DataFormatUtil::toString).orElse(null), source);
-                    Map<String, Object> eval = JsScriptEngineUtil.eval(DataFormatUtil.toString(preConditions), params.toArray());
+                    params.add(0, DataFormatUtil.toString(preConditions));
+                    Map<String, Object> eval = JsScriptEngineUtil.eval(params.toArray());
 
                     preData.put("preResult", eval);
                     if (!Objects.equals(eval.get("code"), "0")) {
@@ -242,8 +243,14 @@ public class DataProcessService {
                     HashMap<String, Object> configMap = new HashMap<>();
                     configMap.put("methodName", algorithmsourcelibrary.get("code"));
                     configMap.put("path", algorithmsourcelibrary.get("filepath"));
-                    configMap.put("className", algorithmsourcelibrary.get("library"));
-                    Map<String, Object> result = JavaScriptEngineUtil.invoke(configMap, algorithmsourcelibrary.get("code").toString(), parameters.toArray());
+                    Object className = algorithmsourcelibrary.get("library");
+                    configMap.put("className", className);
+                    Map<String, Object> result;
+//                    if ("com.scbfkj.uni.library.script.JsScriptEngineUtil".equals(className)) {
+//                        result = JsScriptEngineUtil.eval(parameters.toArray());
+//                    } else {
+                        result = JavaScriptEngineUtil.invoke(configMap, algorithmsourcelibrary.get("code").toString(), parameters.toArray());
+//                    }
 
                     if ("0".equals(result.get("code"))) {
                         Object returnData = result.get("returnData");
@@ -310,7 +317,7 @@ public class DataProcessService {
                     logData.put("endtime", dateTime);
                     logData.put("serviceid", finalServiceId);
                     String string = DataFormatUtil.toString(resource);
-                    System.out.println("resources:"+string);
+                    System.out.println("resources:" + string);
                     logData.put("inputdata", string);
                     logData.put("prepesource", DataFormatUtil.toString(preResource));
                     logData.put("returnmessage", finalMessage);
@@ -326,7 +333,7 @@ public class DataProcessService {
                     DATA_BASE.update(Config.getCenterConnectionStr(), """
                             update servicestate
                             set lasttime = ?,workpath = ?
-                            where serviceid=? and containercode=?""", LocalDateTime.now(), new File(".").getAbsolutePath(), finalServiceId1, Config.getContainerCode());
+                            where serviceid=? and containercode=?""", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyy-MM-dd HH:mm:ss")), new File(".").getAbsolutePath(), finalServiceId1, Config.getContainerCode());
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -352,7 +359,7 @@ public class DataProcessService {
             }
             // JS表达式
             case "2", "JS" -> {
-                return JsScriptEngineUtil.eval(DataFormatUtil.toString(parameters.get(0)), parameters.subList(1, parameters.size()).toArray());
+                return JsScriptEngineUtil.eval(parameters.toArray());
             }
             // 数据库
             case "3", "DB" -> {

+ 1 - 0
src/main/resources/application-dev.yml

@@ -4,6 +4,7 @@ app:
   debug: true
   security:
     encrypt: false
+    enable: false
 db:
   center:
     config: '{"jdbcUrl":"jdbc:mysql://120.26.64.82:3306/systemset","username":"root","password":"123@bigdata","driverClassName":"com.mysql.cj.jdbc.Driver"}'

BIN
systemset.sqlite


BIN
uniauth.sqlite