KafkaHelper.java 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package com.bfkj.unidia.IOUtils;
  2. import com.bfkj.unidia.logUtils.Log;
  3. import com.github.benmanes.caffeine.cache.Cache;
  4. import org.apache.kafka.clients.consumer.*;
  5. import org.apache.kafka.clients.producer.*;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.errors.InterruptException;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. import org.apache.kafka.common.serialization.StringSerializer;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.time.Duration;
  13. import java.util.*;
  14. import java.util.concurrent.*;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. import java.util.concurrent.atomic.AtomicReference;
  17. import com.bfkj.unidia.Result;
  18. import com.bfkj.unidia.DataUtils.DataFormatConverter;
  19. import org.springframework.stereotype.Service;
  20. import javax.annotation.PreDestroy;
  21. import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
  22. @Service
  23. public class KafkaHelper {
  24. private static final Logger logger = LoggerFactory.getLogger(KafkaHelper.class);
  25. private static final int MAX_POLL_RECORDS = 500;
  26. private static final Cache<String, Producer<String, String>> producerCache = buildCaffeineCache();
  27. private static final Cache<String, Consumer<String, String>> consumerCache = buildCaffeineCache();
  28. private static final Object producerLock = new Object();
  29. private static final Object consumerLock = new Object(); // 新增消费者锁
  30. // 私有构造函数
  31. private KafkaHelper() {}
  32. /**
  33. * 发送 Kafka 消息
  34. *
  35. * @param kafkaConfig Kafka生产者配置参数,包含broker地址、序列化器等必要配置
  36. * @param topic 目标Topic名称,消息将被发布到该主题
  37. * @param data 待发送的消息数据,支持单条对象或对象列表形式
  38. * @return 操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息
  39. */
  40. public static Result<List<?>> send(Map<String, String> kafkaConfig, String topic, Object data) {
  41. //判断消息是否为List类型,直接处理批量消息场景 非批量消息统一转换为单元素列表进行发送处理
  42. return batchSendMessage(kafkaConfig, topic,
  43. data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
  44. }
  45. /**
  46. * 批量发送消息到Kafka主题,支持异步发送并处理结果。
  47. *
  48. * @param topic 目标Kafka主题名称
  49. * @param dataList 待发送的数据列表,元素类型为可序列化对象
  50. * @return 返回包含发送结果的Result对象:
  51. * - 成功时返回成功发送的消息数量
  52. * - 失败时返回错误信息(超时/部分失败/异常)
  53. */
  54. private static Result<List<?>> batchSendMessage(Map<String, String> kafkaConfig,
  55. String topic,
  56. List<?> dataList) {
  57. String cacheKey = null;
  58. Producer<String, String> producer = null;
  59. try {
  60. // 校验基础参数有效性
  61. Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, dataList);
  62. if(!checkBaseParamResult.isSuccess()){
  63. return Result.fail(checkBaseParamResult.getError());
  64. }
  65. cacheKey = checkBaseParamResult.getData();
  66. // 获取或创建Kafka生产者实例
  67. Result<Producer<String, String>> producerResult = getOrCreateProducer(kafkaConfig, cacheKey);
  68. if(!producerResult.isSuccess()){
  69. return Result.fail(producerResult.getError());
  70. }
  71. producer = producerResult.getData();
  72. //数据预处理阶段: 将输入数据列表转换为Kafka ProducerRecord对象集合
  73. List<ProducerRecord<String, String>> records = dataList.stream()
  74. .map(item -> new ProducerRecord<String, String>(topic, DataFormatConverter.convertObjectToString(item)))
  75. .toList();
  76. List<String> failList = new ArrayList<>();
  77. //异步发送初始化
  78. AtomicInteger successCount = new AtomicInteger(0);
  79. AtomicReference<Exception> firstError = new AtomicReference<>(null);
  80. CountDownLatch latch = new CountDownLatch(records.size());
  81. //异步发送执行
  82. Producer<String, String> finalProducer = producer;
  83. records.forEach(record -> finalProducer.send(record, (metadata, exception) -> {
  84. if (exception != null) {
  85. if (firstError.get() == null) {
  86. firstError.set(exception);
  87. Log.error("批量发送失败", exception);
  88. failList.add(record.value());
  89. }
  90. } else {
  91. successCount.incrementAndGet();
  92. }
  93. latch.countDown();
  94. }));
  95. //发送结果等待与超时处理
  96. try {
  97. boolean completed = latch.await(30, TimeUnit.SECONDS); // 捕获返回值
  98. if (!completed) {
  99. Log.error("批量发送超时,强制关闭生产者");
  100. Thread.currentThread().interrupt();
  101. producer.close(); // 主动关闭生产者释放资源
  102. return Result.fail("批量发送超时超过30秒");
  103. }
  104. } catch (InterruptedException e) {
  105. Thread.currentThread().interrupt();
  106. producer.close(); // 主动关闭生产者释放资源
  107. return Result.fail("批量发送超时中断: " + e.getMessage());
  108. }
  109. //最终结果判定
  110. if (firstError.get() != null) {
  111. return Result.fail("部分消息发送失败: " + firstError.get().getMessage());
  112. }
  113. return Result.success(failList);
  114. } catch (Exception e) {
  115. if(producer != null && cacheKey != null) {
  116. closeProducer(cacheKey,producer);
  117. }
  118. Log.error("批量处理异常", e);
  119. return Result.fail("批量处理失败: " + e.getMessage());
  120. }
  121. }
  122. /**
  123. * 获取或创建指定主题的Kafka生产者实例。
  124. * 通过配置参数和主题名称构建缓存键,优先从生产者缓存中获取现有实例。
  125. * 若缓存中不存在,则在同步锁保护下创建新生产者,并存入缓存供后续复用。
  126. *
  127. * @param config 包含Kafka生产者配置参数的Map,必须包含"bootstrap.servers"键
  128. * @return Result对象,包含以下两种情况:
  129. * - 成功时返回可用的Producer实例
  130. * - 失败时返回包含错误信息的Result.fail对象
  131. */
  132. private static Result<Producer<String, String>> getOrCreateProducer(Map<String, String> config, String cacheKey) {
  133. Producer<String, String> existingProducer = producerCache.getIfPresent(cacheKey);
  134. if (existingProducer == null) {
  135. // 进入线程安全区域,确保生产者创建过程的原子性操作
  136. synchronized (producerLock) {
  137. // 二次检查缓存状态,防止并发创建重复实例
  138. existingProducer = producerCache.getIfPresent(cacheKey);
  139. if (existingProducer == null) {
  140. // 执行实际生产者创建流程
  141. Result<Producer<String, String>> producerResult = createProducer(config);
  142. if(!producerResult.isSuccess()){
  143. return producerResult;
  144. }
  145. Producer<String, String> newProducer = producerResult.getData();
  146. // 验证生产者创建结果有效性
  147. if(newProducer == null){
  148. return Result.fail(producerResult.getError());
  149. }
  150. // 将新建生产者存入缓存供后续复用
  151. producerCache.put(cacheKey, newProducer);
  152. return producerResult;
  153. }
  154. }
  155. }
  156. return Result.success(existingProducer);
  157. }
  158. /**
  159. * 创建Kafka生产者实例并进行基础配置优化
  160. *
  161. * @param config 用户提供的基础配置参数,用于初始化Kafka生产者
  162. * @return Result对象包含生产者实例创建结果:
  163. * - 成功时返回包含KafkaProducer的Result实例
  164. * - 失败时返回包含错误信息的Result实例
  165. * 本方法主要完成以下配置处理:
  166. * 1. 确保消息确认机制配置(acks)存在,默认使用-1(所有副本确认)
  167. * 2. 启用幂等性保障机制,默认开启
  168. * 3. 设置关键超时参数优化生产者性能
  169. * 4. 固化键值序列化器为StringSerializer
  170. */
  171. private static Result<Producer<String, String>> createProducer(Map<String, String> config) {
  172. try {
  173. //检查并设置消息确认机制配置
  174. if (!config.containsKey(ProducerConfig.ACKS_CONFIG)) {
  175. logger.warn("未配置acks参数,默认使用-1(all)");
  176. config.put(ProducerConfig.ACKS_CONFIG, "-1");
  177. }
  178. //检查并启用幂等性保障
  179. if (!config.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
  180. Log.info("启用幂等性保障");
  181. config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  182. }
  183. Properties props = new Properties();
  184. props.putAll(config);
  185. // 优化配置
  186. props.putIfAbsent(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");//最大阻塞时间60秒
  187. props.putIfAbsent(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");//消息投递超时时间120秒
  188. props.putIfAbsent(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");//请求超时时间30秒
  189. //固定使用String序列化器
  190. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  191. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  192. return Result.success(new KafkaProducer<>(props));
  193. } catch (Exception e) {
  194. Log.error("创建Kafka生产者失败", e);
  195. return Result.fail("创建Kafka生产者失败: " + e.getMessage());
  196. }
  197. }
  198. //---------------------------------------------------------------------------------------------
  199. /**
  200. * 消费 Kafka 消息
  201. * 功能说明:
  202. * 1. 校验基础参数并获取Kafka消费者实例
  203. * 2. 订阅指定主题并拉取消息
  204. * 3. 处理消息后同步提交偏移量
  205. * 4. 包含连接健康检测机制
  206. *
  207. * @param kafkaConfig Kafka基础配置参数,包含bootstrap.servers等必要配置项
  208. * @param topic 需要消费的主题名称
  209. * @param groupId 消费者组ID(可为空,单机模式可不传)
  210. * @param maxRecords 单次拉取最大记录数限制
  211. * @return Result<List<String>> 返回消费结果对象,成功时包含消息列表,失败时包含错误信息
  212. * - 成功状态:包含消费到的消息字符串列表
  213. * - 失败状态:包含具体错误原因(参数校验失败/连接异常/消费异常等)
  214. */
  215. public static Result<List<String>> consume(Map<String, String> kafkaConfig, String topic, String groupId, Integer maxRecords) {
  216. // 参数校验
  217. Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig, groupId,topic, "data");
  218. if(!checkBaseParamResult.isSuccess()){
  219. return Result.fail(checkBaseParamResult.getError());
  220. }
  221. //生成消费者缓存键
  222. String cacheKey = checkBaseParamResult.getData();
  223. Result<Consumer<String, String>> consumerResult = getOrCreateConsumer(kafkaConfig, cacheKey, groupId, maxRecords);
  224. if(!consumerResult.isSuccess()){
  225. return Result.fail(consumerResult.getError());
  226. }
  227. Consumer<String, String> consumer = consumerResult.getData();
  228. try {
  229. consumer.subscribe(Collections.singletonList(topic));
  230. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
  231. List<String> messages = new ArrayList<>();
  232. for (ConsumerRecord<String, String> record : records) {
  233. messages.add(record.value());
  234. Log.info("Kafka消费数据: {}", record.value());
  235. }
  236. if (!messages.isEmpty()) {
  237. consumer.commitSync(); // 同步提交偏移量
  238. }else {// 2. 主动连接检测
  239. if (!testKafkaConnection(consumer, Duration.ofSeconds(5)) || !isConsumerHealthy(consumer)) {
  240. closeConsumer(cacheKey,consumer);
  241. return Result.fail("Kafka消费者{".concat(cacheKey).concat("}异常"));
  242. }
  243. }
  244. Log.info("消费数量:");
  245. Log.info(String.valueOf(messages.size()));
  246. if (messages.size()>0) Log.info(messages.get(0));
  247. return Result.success(messages);
  248. }catch (Exception e) {
  249. closeConsumer(cacheKey,consumer);
  250. Log.error("Kafka消费异常{}",cacheKey, e);
  251. return Result.fail("Kafka消费异常: " + e.getMessage());
  252. }
  253. }
  254. /**
  255. * 获取或创建Kafka消费者实例,采用双重检查锁机制保证线程安全。
  256. *
  257. * @param kafkaConfig Kafka基础配置参数映射表
  258. * @param cacheKey 消费者实例的唯一标识键
  259. * @param groupId 消费者组ID
  260. * @param maxRecords 单次拉取最大记录数
  261. * @return 包含Kafka消费者实例的操作结果对象
  262. */
  263. private static Result<Consumer<String, String>> getOrCreateConsumer(Map<String, String> kafkaConfig,
  264. String cacheKey,
  265. String groupId,
  266. Integer maxRecords) {
  267. Consumer<String, String> consumer = consumerCache.getIfPresent(cacheKey);
  268. if (consumer == null) {
  269. // 使用双重检查锁确保线程安全
  270. synchronized (consumerLock) {
  271. // 二次检查缓存是否存在有效消费者
  272. consumer = consumerCache.getIfPresent(cacheKey);
  273. if (consumer == null) {
  274. // 构建消费者配置属性
  275. Properties props = buildConsumerProps(kafkaConfig, groupId, maxRecords);
  276. // 实例化Kafka消费者
  277. consumer = new KafkaConsumer<>(props);
  278. // 将新创建的消费者存入缓存
  279. consumerCache.put(cacheKey, consumer);
  280. }
  281. }
  282. }
  283. return Result.success(consumer);
  284. }
  285. /**
  286. * 构建Kafka消费者配置属性
  287. *
  288. * @param kafkaConfig 基础Kafka配置映射,包含原始配置键值对
  289. * @param groupId 消费者组ID,用于标识消费者所属的组
  290. * @param maxRecords 单次poll操作的最大记录数,若为null则使用默认值MAX_POLL_RECORDS
  291. * @return 配置好的Properties对象,包含所有消费者配置属性
  292. */
  293. private static Properties buildConsumerProps(Map<String, String> kafkaConfig, String groupId, Integer maxRecords) {
  294. Properties props = new Properties();
  295. props.putAll(kafkaConfig);
  296. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  297. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  298. props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536"); // 64KB
  299. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
  300. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 1MB
  301. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords == null?MAX_POLL_RECORDS:maxRecords);
  302. props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
  303. return props;
  304. }
  305. // ------------------------ 私有方法区 ------------------------
  306. /**
  307. * 校验Kafka基础参数合法性并生成配置标识符
  308. *
  309. * @param kafkaConfig Kafka配置参数映射表,需包含bootstrap.servers配置项
  310. * @param groupId 消费者组ID,用于标识消费者组
  311. * @param topic Kafka消息主题名称
  312. * @param data 待发送的业务数据对象
  313. * @return Result<String> 校验结果封装对象,成功时返回由bootstrap.servers:groupId:topic组成的配置标识符字符串,
  314. * 失败时返回包含具体错误信息的失败结果
  315. */
  316. private static Result<String> checkBaseParam(Map<String, String> kafkaConfig,
  317. String groupId,
  318. String topic,
  319. Object data){
  320. if (data == null) {
  321. return Result.fail("发送数据不能为空");
  322. }
  323. if (topic == null || topic.isEmpty()) {
  324. return Result.fail("Kafka配置中缺少topic");
  325. }
  326. if(kafkaConfig.isEmpty() || !kafkaConfig.containsKey("bootstrap.servers") ||
  327. kafkaConfig.get("bootstrap.servers").isEmpty()){
  328. return Result.fail("Kafka配置中缺少bootstrap.servers");
  329. }
  330. return Result.success(kafkaConfig.get("bootstrap.servers")
  331. .concat(":")
  332. .concat(groupId)
  333. .concat(":")
  334. .concat(topic));
  335. }
  336. /**
  337. * 检查Kafka消费者实例的健康状态
  338. *
  339. * @param consumer 需要检查的Kafka消费者实例
  340. * @return 如果消费者健康(成功获取非空分配分区和有效偏移量)返回true,否则返回false
  341. */
  342. private static boolean isConsumerHealthy(Consumer<String, String> consumer) {
  343. try {
  344. // 检查分配分区
  345. Set<TopicPartition> assignments = consumer.assignment();
  346. if (assignments.isEmpty()) {
  347. logger.warn("消费者未分配任何分区");
  348. return false;
  349. }
  350. // 检查最新偏移量
  351. Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
  352. Log.info("消费者最新偏移量: {}", endOffsets);
  353. return !endOffsets.isEmpty();
  354. } catch (Exception e) {
  355. logger.warn("消费者健康检查失败", e);
  356. return false;
  357. }
  358. }
  359. /**
  360. * 测试Kafka消费者的连接状态。
  361. * 通过尝试列出主题来验证连接是否正常。
  362. *
  363. * @param consumer Kafka消费者实例,用于测试连接
  364. * @param timeout 测试连接的超时时间,单位为毫秒
  365. * @return 如果连接成功返回true,否则返回false
  366. */
  367. private static boolean testKafkaConnection(Consumer<String, String> consumer, Duration timeout) {
  368. try {
  369. // 通过获取topic列表测试连接
  370. consumer.listTopics(timeout);
  371. return true;
  372. } catch (Exception e) {
  373. // 记录详细的网络异常日志
  374. logger.warn("Kafka连接测试失败", e);
  375. return false;
  376. }
  377. }
  378. /**
  379. * 关闭指定的Kafka生产者实例并清理相关缓存。
  380. *
  381. * <p>该方法安全关闭生产者实例,捕获并记录关闭过程中发生的异常,
  382. * 并确保无论关闭成功与否都强制清理缓存中的生产者实例。</p>
  383. *
  384. * @param cacheKey 用于定位缓存中生产者实例的键,不能为空
  385. * @param producer 待关闭的生产者实例,可能为null
  386. */
  387. private static void closeProducer(String cacheKey, Producer<String, String> producer) {
  388. try {
  389. if (producer != null) {
  390. producer.close();
  391. }
  392. } catch (Exception e) {
  393. Log.error("关闭生产者异常: {}", cacheKey, e);
  394. }finally {
  395. producerCache.invalidate(cacheKey);
  396. }
  397. }
  398. /**
  399. * 关闭指定的Kafka消费者并清理关联的缓存条目。
  400. *
  401. * <p>该方法安全地关闭消费者实例,记录任何关闭过程中发生的异常,
  402. * 并确保最终从缓存中移除对应的缓存键。</p>
  403. *
  404. * @param cacheKey 用于标识消费者实例的缓存键,不可为空
  405. * @param consumer 需要关闭的Kafka消费者实例,可能为null
  406. */
  407. private static void closeConsumer(String cacheKey, Consumer<String, String> consumer) {
  408. try {
  409. if (consumer != null) {
  410. consumer.close();
  411. }
  412. } catch (Exception e) {
  413. Log.error("关闭生产者异常: {}", cacheKey, e);
  414. }finally {
  415. consumerCache.invalidate(cacheKey);
  416. }
  417. }
  418. //销毁前清除缓存
  419. @PreDestroy
  420. public void destroy() {
  421. producerCache.asMap().forEach(KafkaHelper::closeProducer);
  422. producerCache.invalidateAll();
  423. Log.info("生产者缓存清除");
  424. consumerCache.asMap().forEach(KafkaHelper::closeConsumer);
  425. consumerCache.invalidateAll();
  426. Log.info("消费者缓存清除");
  427. }
  428. }