KafkaHelper.java 24 KB


  1. package com.bfkj.unidia.IOUtils;
  2. import com.github.benmanes.caffeine.cache.Cache;
  3. import com.github.benmanes.caffeine.cache.Caffeine;
  4. import org.apache.kafka.clients.consumer.*;
  5. import org.apache.kafka.clients.producer.*;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.errors.InterruptException;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. import org.apache.kafka.common.serialization.StringSerializer;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.time.Duration;
  13. import java.util.*;
  14. import java.util.concurrent.*;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. import java.util.concurrent.atomic.AtomicReference;
  17. import com.bfkj.unidia.Result;
  18. import com.bfkj.unidia.DataUtils.DataFormatConverter;
  19. import org.springframework.stereotype.Component;
  20. import javax.annotation.PreDestroy;
  21. @Component
  22. public class KafkaHelper {
  23. private static final Logger logger = LoggerFactory.getLogger(KafkaHelper.class);
  24. private static final int MAX_CACHE_SIZE = 100;
  25. private static final int MAX_POLL_RECORDS = 500;
  26. private static final Cache<String, Producer<String, String>> producerCache;
  27. static {
  28. producerCache = Caffeine.newBuilder()
  29. .maximumSize(MAX_CACHE_SIZE)
  30. .expireAfterAccess(Duration.ofMinutes(16))
  31. .removalListener((key, value, cause) -> logger.info("Producer缓存移除: {} 原因: {}", key, cause))
  32. .build();
  33. }
  34. private static final Cache<String, Consumer<String, String>> consumerCache;
  35. static {
  36. consumerCache = Caffeine.newBuilder()
  37. .maximumSize(100)
  38. .expireAfterAccess(Duration.ofMinutes(16))
  39. .removalListener((key, value, cause) -> logger.info("Consumer缓存移除: {} 原因: {}", key, cause))
  40. .build();
  41. }
  42. private static final Object producerLock = new Object();
  43. private static final Object consumerLock = new Object(); // 新增消费者锁
  44. // 私有构造函数
  45. private KafkaHelper() {}
  46. /**
  47. * 发送 Kafka 消息
  48. *
  49. * @param kafkaConfig Kafka生产者配置参数,包含broker地址、序列化器等必要配置
  50. * @param topic 目标Topic名称,消息将被发布到该主题
  51. * @param data 待发送的消息数据,支持单条对象或对象列表形式
  52. * @return 操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息
  53. */
  54. public static Result<Integer> send(Map<String, String> kafkaConfig, String topic, Object data) {
  55. try {
  56. // 校验基础参数有效性
  57. Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, data);
  58. if(!checkBaseParamResult.isSuccess()){
  59. return Result.fail(checkBaseParamResult.getError());
  60. }
  61. String cacheKey = checkBaseParamResult.getData();
  62. // 获取或创建Kafka生产者实例
  63. Result<Producer<String, String>> producerResult = getOrCreateProducer(kafkaConfig, cacheKey);
  64. if(!producerResult.isSuccess()){
  65. return Result.fail(producerResult.getError());
  66. }
  67. Producer<String, String> producer = producerResult.getData();
  68. Result<Integer> integerResult = null;
  69. // 处理不同数据类型的消息发送
  70. if (data instanceof List<?> dataList) {
  71. integerResult = batchSend(producer, topic, dataList, cacheKey);
  72. } else {
  73. integerResult = sendSingleMessage(producer, topic, data, cacheKey);
  74. }
  75. logger.info("Kafka发送消息返回结果:");
  76. logger.info(integerResult.toString());
  77. return integerResult;
  78. } catch (Exception e) {
  79. // 记录发送异常日志并返回错误结果
  80. logger.error("Kafka发送异常", e);
  81. return Result.fail("Kafka发送异常: " + e.getMessage());
  82. }
  83. }
  84. /**
  85. * 向指定Kafka主题发送单条消息,并通过CountDownLatch同步等待发送结果
  86. *
  87. * @param producer Kafka生产者实例,负责实际的消息发送
  88. * @param topic 目标Kafka主题名称
  89. * @param data 待发送的消息数据对象(将转换为JSON字符串)
  90. * @return Result<Integer> 包含发送结果的包装对象,成功返回1,失败返回异常信息
  91. */
  92. private static Result<Integer> sendSingleMessage(Producer<String, String> producer,
  93. String topic,
  94. Object data,
  95. String cacheKey) {
  96. String value = DataFormatConverter.convertObjectToString(data);
  97. ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
  98. AtomicReference<Result<Integer>> resultRef = new AtomicReference<>();
  99. CountDownLatch latch = new CountDownLatch(1);
  100. // 异步发送Kafka消息并处理回调结果
  101. producer.send(record, (metadata, exception) -> {
  102. if (exception != null) {
  103. if (exception instanceof InterruptException) {
  104. logger.warn("生产者操作被中断,准备关闭资源");
  105. Thread.currentThread().interrupt(); // 保留中断状态
  106. //producerCache.invalidate(generateProducerCacheKey(kafkaConfig, topic));
  107. producer.close(); // 安全关闭生产者
  108. }
  109. logger.error("Kafka消息发送失败", exception);
  110. resultRef.set(Result.fail(exception.getMessage()));
  111. } else {
  112. resultRef.set(Result.success(1));
  113. }
  114. latch.countDown();
  115. });
  116. // 等待消息发送完成并获取执行结果
  117. try {
  118. latch.await(); // 等待发送完成
  119. return resultRef.get(); // 通过原子引用获取结果
  120. } catch (InterruptedException e) {
  121. Thread.currentThread().interrupt();
  122. producerCache.invalidate(cacheKey);
  123. producer.close(); // 安全关闭生产者
  124. return Result.fail("发送线程被中断" + e.getMessage());
  125. }
  126. }
  127. /**
  128. * 批量发送消息到Kafka主题,支持异步发送并处理结果。
  129. *
  130. * @param producer Kafka生产者实例,用于发送消息
  131. * @param topic 目标Kafka主题名称
  132. * @param dataList 待发送的数据列表,元素类型为可序列化对象
  133. * @return 返回包含发送结果的Result对象:
  134. * - 成功时返回成功发送的消息数量
  135. * - 失败时返回错误信息(超时/部分失败/异常)
  136. */
  137. private static Result<Integer> batchSend(Producer<String, String> producer,
  138. String topic,
  139. List<?> dataList,
  140. String cacheKey) {
  141. try {
  142. //数据预处理阶段: 将输入数据列表转换为Kafka ProducerRecord对象集合
  143. List<ProducerRecord<String, String>> records = dataList.stream()
  144. .map(item -> new ProducerRecord<String, String>(topic, DataFormatConverter.convertObjectToString(item)))
  145. .toList();
  146. //异步发送初始化
  147. AtomicInteger successCount = new AtomicInteger(0);
  148. AtomicReference<Exception> firstError = new AtomicReference<>(null);
  149. CountDownLatch latch = new CountDownLatch(records.size());
  150. //异步发送执行
  151. records.forEach(record -> producer.send(record, (metadata, exception) -> {
  152. if (exception != null) {
  153. if (firstError.get() == null) {
  154. firstError.set(exception);
  155. logger.error("批量发送失败", exception);
  156. }
  157. } else {
  158. successCount.incrementAndGet();
  159. }
  160. latch.countDown();
  161. }));
  162. //发送结果等待与超时处理
  163. try {
  164. boolean completed = latch.await(30, TimeUnit.SECONDS); // 捕获返回值
  165. if (!completed) {
  166. logger.error("批量发送超时,强制关闭生产者");
  167. Thread.currentThread().interrupt();
  168. producer.close(); // 主动关闭生产者释放资源
  169. return Result.fail("批量发送超时超过30秒");
  170. }
  171. } catch (InterruptedException e) {
  172. Thread.currentThread().interrupt();
  173. producer.close(); // 主动关闭生产者释放资源
  174. return Result.fail("批量发送超时中断: " + e.getMessage());
  175. }
  176. //最终结果判定
  177. if (firstError.get() != null) {
  178. return Result.fail("部分消息发送失败: " + firstError.get().getMessage());
  179. }
  180. return Result.success(successCount.get());
  181. } catch (Exception e) {
  182. closeProducer(cacheKey,producer);
  183. logger.error("批量处理异常", e);
  184. return Result.fail("批量处理失败: " + e.getMessage());
  185. }
  186. }
  187. /**
  188. * 获取或创建指定主题的Kafka生产者实例。
  189. * 通过配置参数和主题名称构建缓存键,优先从生产者缓存中获取现有实例。
  190. * 若缓存中不存在,则在同步锁保护下创建新生产者,并存入缓存供后续复用。
  191. *
  192. * @param config 包含Kafka生产者配置参数的Map,必须包含"bootstrap.servers"键
  193. * @return Result对象,包含以下两种情况:
  194. * - 成功时返回可用的Producer实例
  195. * - 失败时返回包含错误信息的Result.fail对象
  196. */
  197. private static Result<Producer<String, String>> getOrCreateProducer(Map<String, String> config, String cacheKey) {
  198. Producer<String, String> existingProducer = producerCache.getIfPresent(cacheKey);
  199. if (existingProducer == null) {
  200. // 进入线程安全区域,确保生产者创建过程的原子性操作
  201. synchronized (producerLock) {
  202. // 二次检查缓存状态,防止并发创建重复实例
  203. existingProducer = producerCache.getIfPresent(cacheKey);
  204. if (existingProducer == null) {
  205. // 执行实际生产者创建流程
  206. Result<Producer<String, String>> producerResult = createProducer(config);
  207. if(!producerResult.isSuccess()){
  208. return producerResult;
  209. }
  210. Producer<String, String> newProducer = producerResult.getData();
  211. // 验证生产者创建结果有效性
  212. if(newProducer == null){
  213. return Result.fail(producerResult.getError());
  214. }
  215. // 将新建生产者存入缓存供后续复用
  216. producerCache.put(cacheKey, newProducer);
  217. return producerResult;
  218. }
  219. }
  220. }
  221. return Result.success(existingProducer);
  222. }
  223. /**
  224. * 创建Kafka生产者实例并进行基础配置优化
  225. *
  226. * @param config 用户提供的基础配置参数,用于初始化Kafka生产者
  227. * @return Result对象包含生产者实例创建结果:
  228. * - 成功时返回包含KafkaProducer的Result实例
  229. * - 失败时返回包含错误信息的Result实例
  230. * 本方法主要完成以下配置处理:
  231. * 1. 确保消息确认机制配置(acks)存在,默认使用-1(所有副本确认)
  232. * 2. 启用幂等性保障机制,默认开启
  233. * 3. 设置关键超时参数优化生产者性能
  234. * 4. 固化键值序列化器为StringSerializer
  235. */
  236. private static Result<Producer<String, String>> createProducer(Map<String, String> config) {
  237. try {
  238. //检查并设置消息确认机制配置
  239. if (!config.containsKey(ProducerConfig.ACKS_CONFIG)) {
  240. logger.warn("未配置acks参数,默认使用-1(all)");
  241. config.put(ProducerConfig.ACKS_CONFIG, "-1");
  242. }
  243. //检查并启用幂等性保障
  244. if (!config.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
  245. logger.info("启用幂等性保障");
  246. config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  247. }
  248. Properties props = new Properties();
  249. props.putAll(config);
  250. // 优化配置
  251. props.putIfAbsent(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");//最大阻塞时间60秒
  252. props.putIfAbsent(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");//消息投递超时时间120秒
  253. props.putIfAbsent(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");//请求超时时间30秒
  254. //固定使用String序列化器
  255. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  256. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  257. return Result.success(new KafkaProducer<>(props));
  258. } catch (Exception e) {
  259. logger.error("创建Kafka生产者失败", e);
  260. return Result.fail("创建Kafka生产者失败: " + e.getMessage());
  261. }
  262. }
  263. //---------------------------------------------------------------------------------------------
  264. /**
  265. * 消费 Kafka 消息
  266. * 功能说明:
  267. * 1. 校验基础参数并获取Kafka消费者实例
  268. * 2. 订阅指定主题并拉取消息
  269. * 3. 处理消息后同步提交偏移量
  270. * 4. 包含连接健康检测机制
  271. *
  272. * @param kafkaConfig Kafka基础配置参数,包含bootstrap.servers等必要配置项
  273. * @param topic 需要消费的主题名称
  274. * @param groupId 消费者组ID(可为空,单机模式可不传)
  275. * @param maxRecords 单次拉取最大记录数限制
  276. * @return Result<List<String>> 返回消费结果对象,成功时包含消息列表,失败时包含错误信息
  277. * - 成功状态:包含消费到的消息字符串列表
  278. * - 失败状态:包含具体错误原因(参数校验失败/连接异常/消费异常等)
  279. */
  280. public static Result<List<String>> consume(Map<String, String> kafkaConfig, String topic, String groupId, Integer maxRecords) {
  281. // 参数校验
  282. Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig, groupId,topic, "data");
  283. if(!checkBaseParamResult.isSuccess()){
  284. return Result.fail(checkBaseParamResult.getError());
  285. }
  286. //生成消费者缓存键
  287. String cacheKey = checkBaseParamResult.getData();
  288. Result<Consumer<String, String>> consumerResult = getOrCreateConsumer(kafkaConfig, cacheKey, groupId, maxRecords);
  289. if(!consumerResult.isSuccess()){
  290. return Result.fail(consumerResult.getError());
  291. }
  292. Consumer<String, String> consumer = consumerResult.getData();
  293. try {
  294. consumer.subscribe(Collections.singletonList(topic));
  295. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
  296. List<String> messages = new ArrayList<>();
  297. for (ConsumerRecord<String, String> record : records) {
  298. messages.add(record.value());
  299. logger.info("Kafka消费数据: {}", record.value());
  300. }
  301. if (!messages.isEmpty()) {
  302. consumer.commitSync(); // 同步提交偏移量
  303. }else {// 2. 主动连接检测
  304. if (!testKafkaConnection(consumer, Duration.ofSeconds(5)) || !isConsumerHealthy(consumer)) {
  305. closeConsumer(cacheKey,consumer);
  306. return Result.fail("Kafka消费者{".concat(cacheKey).concat("}异常"));
  307. }
  308. }
  309. logger.info("消费数量:");
  310. logger.info(String.valueOf(messages.size()));
  311. if (messages.size()>0) logger.info(messages.get(0));
  312. return Result.success(messages);
  313. }catch (Exception e) {
  314. closeConsumer(cacheKey,consumer);
  315. logger.error("Kafka消费异常{}",cacheKey, e);
  316. return Result.fail("Kafka消费异常: " + e.getMessage());
  317. }
  318. }
  319. /**
  320. * 获取或创建Kafka消费者实例,采用双重检查锁机制保证线程安全。
  321. *
  322. * @param kafkaConfig Kafka基础配置参数映射表
  323. * @param cacheKey 消费者实例的唯一标识键
  324. * @param groupId 消费者组ID
  325. * @param maxRecords 单次拉取最大记录数
  326. * @return 包含Kafka消费者实例的操作结果对象
  327. */
  328. private static Result<Consumer<String, String>> getOrCreateConsumer(Map<String, String> kafkaConfig,
  329. String cacheKey,
  330. String groupId,
  331. Integer maxRecords) {
  332. Consumer<String, String> consumer = consumerCache.getIfPresent(cacheKey);
  333. if (consumer == null) {
  334. // 使用双重检查锁确保线程安全
  335. synchronized (consumerLock) {
  336. // 二次检查缓存是否存在有效消费者
  337. consumer = consumerCache.getIfPresent(cacheKey);
  338. if (consumer == null) {
  339. // 构建消费者配置属性
  340. Properties props = buildConsumerProps(kafkaConfig, groupId, maxRecords);
  341. // 实例化Kafka消费者
  342. consumer = new KafkaConsumer<>(props);
  343. // 将新创建的消费者存入缓存
  344. consumerCache.put(cacheKey, consumer);
  345. }
  346. }
  347. }
  348. return Result.success(consumer);
  349. }
  350. /**
  351. * 构建Kafka消费者配置属性
  352. *
  353. * @param kafkaConfig 基础Kafka配置映射,包含原始配置键值对
  354. * @param groupId 消费者组ID,用于标识消费者所属的组
  355. * @param maxRecords 单次poll操作的最大记录数,若为null则使用默认值MAX_POLL_RECORDS
  356. * @return 配置好的Properties对象,包含所有消费者配置属性
  357. */
  358. private static Properties buildConsumerProps(Map<String, String> kafkaConfig, String groupId, Integer maxRecords) {
  359. Properties props = new Properties();
  360. props.putAll(kafkaConfig);
  361. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  362. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  363. props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536"); // 64KB
  364. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
  365. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 1MB
  366. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords == null?MAX_POLL_RECORDS:maxRecords);
  367. props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
  368. return props;
  369. }
  370. // ------------------------ 私有方法区 ------------------------
  371. /**
  372. * 校验Kafka基础参数合法性并生成配置标识符
  373. *
  374. * @param kafkaConfig Kafka配置参数映射表,需包含bootstrap.servers配置项
  375. * @param groupId 消费者组ID,用于标识消费者组
  376. * @param topic Kafka消息主题名称
  377. * @param data 待发送的业务数据对象
  378. * @return Result<String> 校验结果封装对象,成功时返回由bootstrap.servers:groupId:topic组成的配置标识符字符串,
  379. * 失败时返回包含具体错误信息的失败结果
  380. */
  381. private static Result<String> checkBaseParam(Map<String, String> kafkaConfig,
  382. String groupId,
  383. String topic,
  384. Object data){
  385. if (data == null) {
  386. return Result.fail("发送数据不能为空");
  387. }
  388. if (topic == null || topic.isEmpty()) {
  389. return Result.fail("Kafka配置中缺少topic");
  390. }
  391. if(kafkaConfig.isEmpty() || !kafkaConfig.containsKey("bootstrap.servers") ||
  392. kafkaConfig.get("bootstrap.servers").isEmpty()){
  393. return Result.fail("Kafka配置中缺少bootstrap.servers");
  394. }
  395. return Result.success(kafkaConfig.get("bootstrap.servers")
  396. .concat(":")
  397. .concat(groupId)
  398. .concat(":")
  399. .concat(topic));
  400. }
  401. /**
  402. * 检查Kafka消费者实例的健康状态
  403. *
  404. * @param consumer 需要检查的Kafka消费者实例
  405. * @return 如果消费者健康(成功获取非空分配分区和有效偏移量)返回true,否则返回false
  406. */
  407. private static boolean isConsumerHealthy(Consumer<String, String> consumer) {
  408. try {
  409. // 检查分配分区
  410. Set<TopicPartition> assignments = consumer.assignment();
  411. if (assignments.isEmpty()) {
  412. logger.warn("消费者未分配任何分区");
  413. return false;
  414. }
  415. // 检查最新偏移量
  416. Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
  417. logger.info("消费者最新偏移量: {}", endOffsets);
  418. return !endOffsets.isEmpty();
  419. } catch (Exception e) {
  420. logger.warn("消费者健康检查失败", e);
  421. return false;
  422. }
  423. }
  424. /**
  425. * 测试Kafka消费者的连接状态。
  426. * 通过尝试列出主题来验证连接是否正常。
  427. *
  428. * @param consumer Kafka消费者实例,用于测试连接
  429. * @param timeout 测试连接的超时时间,单位为毫秒
  430. * @return 如果连接成功返回true,否则返回false
  431. */
  432. private static boolean testKafkaConnection(Consumer<String, String> consumer, Duration timeout) {
  433. try {
  434. // 通过获取topic列表测试连接
  435. consumer.listTopics(timeout);
  436. return true;
  437. } catch (Exception e) {
  438. // 记录详细的网络异常日志
  439. logger.warn("Kafka连接测试失败", e);
  440. return false;
  441. }
  442. }
  443. /**
  444. * 关闭指定的Kafka生产者实例并清理相关缓存。
  445. *
  446. * <p>该方法安全关闭生产者实例,捕获并记录关闭过程中发生的异常,
  447. * 并确保无论关闭成功与否都强制清理缓存中的生产者实例。</p>
  448. *
  449. * @param cacheKey 用于定位缓存中生产者实例的键,不能为空
  450. * @param producer 待关闭的生产者实例,可能为null
  451. */
  452. private static void closeProducer(String cacheKey, Producer<String, String> producer) {
  453. try {
  454. if (producer != null) {
  455. producer.close();
  456. }
  457. } catch (Exception e) {
  458. logger.error("关闭生产者异常: {}", cacheKey, e);
  459. }finally {
  460. producerCache.invalidate(cacheKey);
  461. }
  462. }
  463. /**
  464. * 关闭指定的Kafka消费者并清理关联的缓存条目。
  465. *
  466. * <p>该方法安全地关闭消费者实例,记录任何关闭过程中发生的异常,
  467. * 并确保最终从缓存中移除对应的缓存键。</p>
  468. *
  469. * @param cacheKey 用于标识消费者实例的缓存键,不可为空
  470. * @param consumer 需要关闭的Kafka消费者实例,可能为null
  471. */
  472. private static void closeConsumer(String cacheKey, Consumer<String, String> consumer) {
  473. try {
  474. if (consumer != null) {
  475. consumer.close();
  476. }
  477. } catch (Exception e) {
  478. logger.error("关闭生产者异常: {}", cacheKey, e);
  479. }finally {
  480. consumerCache.invalidate(cacheKey);
  481. }
  482. }
  483. //销毁前清除缓存
  484. @PreDestroy
  485. public void destroy() {
  486. producerCache.asMap().forEach(KafkaHelper::closeProducer);
  487. producerCache.invalidateAll();
  488. logger.info("生产者缓存清除");
  489. consumerCache.asMap().forEach(KafkaHelper::closeConsumer);
  490. consumerCache.invalidateAll();
  491. logger.info("消费者缓存清除");
  492. }
  493. }