Bläddra i källkod

kafak 获取数据 新增获取条数参数

andy 1 år sedan
förälder
incheckning
dad1eecadb
1 ändrade filer med 5 tillägg och 5 borttagningar
  1. 5 5
      src/main/java/com/scbfkj/uni/library/script/KafkaScriptUtil.java

+ 5 - 5
src/main/java/com/scbfkj/uni/library/script/KafkaScriptUtil.java

@@ -13,16 +13,16 @@ public class KafkaScriptUtil {
 
     private static final DataBase DATA_BASE = new DataBase();
 
-    public static Map<String, Object> receptionMessage(String dataSourceId, String topic, String groupId) throws Exception {
-        return Kafka.receptionMessage(queryConnectionStr(dataSourceId), topic, groupId);
+    public static Map<String, Object> receptionMessage(String dataSourceId, String topic, String groupId,Long pollSize) throws Exception {
+        return Kafka.receptionMessage(queryConnectionStr(dataSourceId,pollSize), topic, groupId);
     }
 
     public static Map<String, Object> sendMessage(String dataSourceId, String topic, Object datas) throws Exception {
-        return Kafka.sendMessage(queryConnectionStr(dataSourceId), topic, DataFormatUtil.toList(datas));
+        return Kafka.sendMessage(queryConnectionStr(dataSourceId, 1000L), topic, DataFormatUtil.toList(datas));
     }
 
 
-    private static String queryConnectionStr(String datasourceId) throws Exception {
+    private static String queryConnectionStr(String datasourceId, Long pollSize) throws Exception {
         List<Map<String, Object>> result = DATA_BASE.query(Config.getCenterConnectionStr(), """
                 select host
                 from datasource
@@ -33,7 +33,7 @@ public class KafkaScriptUtil {
         return result.stream().findFirst().map(it -> {
             HashMap<String, Object> hashMap = new HashMap<>();
             hashMap.put("bootstrap.servers", it.get("host"));
-            hashMap.put("max.poll.records", "100");
+            hashMap.put("max.poll.records", pollSize);
             return hashMap;
         }).map(DataFormatUtil::toString).get();
     }