|
@@ -1,6 +1,8 @@
|
|
|
package org.bfkj.protocol;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.SerializationFeature;
|
|
|
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
@@ -18,6 +20,12 @@ public class MyKafKa {
|
|
|
|
|
|
private boolean isPool =false;
|
|
|
|
|
|
+ private static ObjectMapper mapper = new ObjectMapper();
|
|
|
+ static {
|
|
|
+ mapper.registerModule(new JavaTimeModule());
|
|
|
+ mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); //关闭
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 采集kafka数据
|
|
|
*
|
|
@@ -75,11 +83,10 @@ public class MyKafKa {
|
|
|
connectConfigMaps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
|
|
|
if (Objects.isNull(producer)) producer = new KafkaProducer<>(connectConfigMaps);
|
|
|
- ObjectMapper objectMapper = new ObjectMapper();
|
|
|
List<Object> sendResult = new ArrayList<>();
|
|
|
for (Object o : dataContent) {
|
|
|
try {
|
|
|
- sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName, objectMapper.writeValueAsString(o))));
|
|
|
+ sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName, mapper.writeValueAsString(o))));
|
|
|
} catch (Exception e) {
|
|
|
sendResult.add(producer.send(new ProducerRecord<>(sourceObjectName, o.toString())));
|
|
|
}
|