package com.bfkj.unidia.IOUtils; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.bfkj.unidia.Result; import com.bfkj.unidia.DataUtils.DataFormatConverter; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @Component public class KafkaHelper { private static final Logger logger = LoggerFactory.getLogger(KafkaHelper.class); private static final int MAX_CACHE_SIZE = 100; private static final int MAX_POLL_RECORDS = 500; private static final Cache> producerCache; static { producerCache = Caffeine.newBuilder() .maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(Duration.ofMinutes(16)) .removalListener((key, value, cause) -> logger.info("Producer缓存移除: {} 原因: {}", key, cause)) .build(); } private static final Cache> consumerCache; static { consumerCache = Caffeine.newBuilder() .maximumSize(100) .expireAfterAccess(Duration.ofMinutes(16)) .removalListener((key, value, cause) -> logger.info("Consumer缓存移除: {} 原因: {}", key, cause)) .build(); } private static final Object producerLock = new Object(); private static final Object consumerLock = new Object(); // 新增消费者锁 // 私有构造函数 private KafkaHelper() {} /** * 发送 Kafka 消息 * * @param kafkaConfig Kafka生产者配置参数,包含broker地址、序列化器等必要配置 * @param topic 目标Topic名称,消息将被发布到该主题 * @param data 待发送的消息数据,支持单条对象或对象列表形式 * @return 操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息 */ public static Result send(Map kafkaConfig, String topic, Object data) { try { // 校验基础参数有效性 Result checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, data); if(!checkBaseParamResult.isSuccess()){ return Result.fail(checkBaseParamResult.getError()); } String cacheKey = checkBaseParamResult.getData(); // 获取或创建Kafka生产者实例 Result> producerResult = getOrCreateProducer(kafkaConfig, cacheKey); if(!producerResult.isSuccess()){ return Result.fail(producerResult.getError()); } Producer producer = producerResult.getData(); Result integerResult = null; // 处理不同数据类型的消息发送 if (data instanceof List dataList) { integerResult = batchSend(producer, topic, dataList, cacheKey); } else { integerResult = sendSingleMessage(producer, topic, data, cacheKey); } logger.info("Kafka发送消息返回结果:"); logger.info(integerResult.toString()); return integerResult; } catch (Exception e) { // 记录发送异常日志并返回错误结果 logger.error("Kafka发送异常", e); return Result.fail("Kafka发送异常: " + e.getMessage()); } } /** * 向指定Kafka主题发送单条消息,并通过CountDownLatch同步等待发送结果 * * @param producer Kafka生产者实例,负责实际的消息发送 * @param topic 目标Kafka主题名称 * @param data 待发送的消息数据对象(将转换为JSON字符串) * @return Result 包含发送结果的包装对象,成功返回1,失败返回异常信息 */ private static Result sendSingleMessage(Producer producer, String topic, Object data, String cacheKey) { String value = DataFormatConverter.convertObjectToString(data); ProducerRecord record = new ProducerRecord<>(topic, value); AtomicReference> resultRef = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); // 异步发送Kafka消息并处理回调结果 producer.send(record, (metadata, exception) -> { if (exception != null) { if (exception instanceof InterruptException) { logger.warn("生产者操作被中断,准备关闭资源"); Thread.currentThread().interrupt(); // 保留中断状态 //producerCache.invalidate(generateProducerCacheKey(kafkaConfig, topic)); producer.close(); // 安全关闭生产者 } logger.error("Kafka消息发送失败", exception); resultRef.set(Result.fail(exception.getMessage())); } else { resultRef.set(Result.success(1)); } latch.countDown(); }); // 等待消息发送完成并获取执行结果 try { latch.await(); // 等待发送完成 return resultRef.get(); // 通过原子引用获取结果 } catch (InterruptedException e) { Thread.currentThread().interrupt(); producerCache.invalidate(cacheKey); producer.close(); // 安全关闭生产者 return Result.fail("发送线程被中断" + e.getMessage()); } } /** * 批量发送消息到Kafka主题,支持异步发送并处理结果。 * * @param producer Kafka生产者实例,用于发送消息 * @param topic 目标Kafka主题名称 * @param dataList 待发送的数据列表,元素类型为可序列化对象 * @return 返回包含发送结果的Result对象: * - 成功时返回成功发送的消息数量 * - 失败时返回错误信息(超时/部分失败/异常) */ private static Result batchSend(Producer producer, String topic, List dataList, String cacheKey) { try { //数据预处理阶段: 将输入数据列表转换为Kafka ProducerRecord对象集合 List> records = dataList.stream() .map(item -> new ProducerRecord(topic, DataFormatConverter.convertObjectToString(item))) .toList(); //异步发送初始化 AtomicInteger successCount = new AtomicInteger(0); AtomicReference firstError = new AtomicReference<>(null); CountDownLatch latch = new CountDownLatch(records.size()); //异步发送执行 records.forEach(record -> producer.send(record, (metadata, exception) -> { if (exception != null) { if (firstError.get() == null) { firstError.set(exception); logger.error("批量发送失败", exception); } } else { successCount.incrementAndGet(); } latch.countDown(); })); //发送结果等待与超时处理 try { boolean completed = latch.await(30, TimeUnit.SECONDS); // 捕获返回值 if (!completed) { logger.error("批量发送超时,强制关闭生产者"); Thread.currentThread().interrupt(); producer.close(); // 主动关闭生产者释放资源 return Result.fail("批量发送超时超过30秒"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); producer.close(); // 主动关闭生产者释放资源 return Result.fail("批量发送超时中断: " + e.getMessage()); } //最终结果判定 if (firstError.get() != null) { return Result.fail("部分消息发送失败: " + firstError.get().getMessage()); } return Result.success(successCount.get()); } catch (Exception e) { closeProducer(cacheKey,producer); logger.error("批量处理异常", e); return Result.fail("批量处理失败: " + e.getMessage()); } } /** * 获取或创建指定主题的Kafka生产者实例。 * 通过配置参数和主题名称构建缓存键,优先从生产者缓存中获取现有实例。 * 若缓存中不存在,则在同步锁保护下创建新生产者,并存入缓存供后续复用。 * * @param config 包含Kafka生产者配置参数的Map,必须包含"bootstrap.servers"键 * @return Result对象,包含以下两种情况: * - 成功时返回可用的Producer实例 * - 失败时返回包含错误信息的Result.fail对象 */ private static Result> getOrCreateProducer(Map config, String cacheKey) { Producer existingProducer = producerCache.getIfPresent(cacheKey); if (existingProducer == null) { // 进入线程安全区域,确保生产者创建过程的原子性操作 synchronized (producerLock) { // 二次检查缓存状态,防止并发创建重复实例 existingProducer = producerCache.getIfPresent(cacheKey); if (existingProducer == null) { // 执行实际生产者创建流程 Result> producerResult = createProducer(config); if(!producerResult.isSuccess()){ return producerResult; } Producer newProducer = producerResult.getData(); // 验证生产者创建结果有效性 if(newProducer == null){ return Result.fail(producerResult.getError()); } // 将新建生产者存入缓存供后续复用 producerCache.put(cacheKey, newProducer); return producerResult; } } } return Result.success(existingProducer); } /** * 创建Kafka生产者实例并进行基础配置优化 * * @param config 用户提供的基础配置参数,用于初始化Kafka生产者 * @return Result对象包含生产者实例创建结果: * - 成功时返回包含KafkaProducer的Result实例 * - 失败时返回包含错误信息的Result实例 * 本方法主要完成以下配置处理: * 1. 确保消息确认机制配置(acks)存在,默认使用-1(所有副本确认) * 2. 启用幂等性保障机制,默认开启 * 3. 设置关键超时参数优化生产者性能 * 4. 固化键值序列化器为StringSerializer */ private static Result> createProducer(Map config) { try { //检查并设置消息确认机制配置 if (!config.containsKey(ProducerConfig.ACKS_CONFIG)) { logger.warn("未配置acks参数,默认使用-1(all)"); config.put(ProducerConfig.ACKS_CONFIG, "-1"); } //检查并启用幂等性保障 if (!config.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { logger.info("启用幂等性保障"); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); } Properties props = new Properties(); props.putAll(config); // 优化配置 props.putIfAbsent(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");//最大阻塞时间60秒 props.putIfAbsent(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");//消息投递超时时间120秒 props.putIfAbsent(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");//请求超时时间30秒 //固定使用String序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return Result.success(new KafkaProducer<>(props)); } catch (Exception e) { logger.error("创建Kafka生产者失败", e); return Result.fail("创建Kafka生产者失败: " + e.getMessage()); } } //--------------------------------------------------------------------------------------------- /** * 消费 Kafka 消息 * 功能说明: * 1. 校验基础参数并获取Kafka消费者实例 * 2. 订阅指定主题并拉取消息 * 3. 处理消息后同步提交偏移量 * 4. 包含连接健康检测机制 * * @param kafkaConfig Kafka基础配置参数,包含bootstrap.servers等必要配置项 * @param topic 需要消费的主题名称 * @param groupId 消费者组ID(可为空,单机模式可不传) * @param maxRecords 单次拉取最大记录数限制 * @return Result> 返回消费结果对象,成功时包含消息列表,失败时包含错误信息 * - 成功状态:包含消费到的消息字符串列表 * - 失败状态:包含具体错误原因(参数校验失败/连接异常/消费异常等) */ public static Result> consume(Map kafkaConfig, String topic, String groupId, Integer maxRecords) { // 参数校验 Result checkBaseParamResult = checkBaseParam(kafkaConfig, groupId,topic, "data"); if(!checkBaseParamResult.isSuccess()){ return Result.fail(checkBaseParamResult.getError()); } //生成消费者缓存键 String cacheKey = checkBaseParamResult.getData(); Result> consumerResult = getOrCreateConsumer(kafkaConfig, cacheKey, groupId, maxRecords); if(!consumerResult.isSuccess()){ return Result.fail(consumerResult.getError()); } Consumer consumer = consumerResult.getData(); try { consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); List messages = new ArrayList<>(); for (ConsumerRecord record : records) { messages.add(record.value()); logger.info("Kafka消费数据: {}", record.value()); } if (!messages.isEmpty()) { consumer.commitSync(); // 同步提交偏移量 }else {// 2. 主动连接检测 if (!testKafkaConnection(consumer, Duration.ofSeconds(5)) || !isConsumerHealthy(consumer)) { closeConsumer(cacheKey,consumer); return Result.fail("Kafka消费者{".concat(cacheKey).concat("}异常")); } } logger.info("消费数量:"); logger.info(String.valueOf(messages.size())); if (messages.size()>0) logger.info(messages.get(0)); return Result.success(messages); }catch (Exception e) { closeConsumer(cacheKey,consumer); logger.error("Kafka消费异常{}",cacheKey, e); return Result.fail("Kafka消费异常: " + e.getMessage()); } } /** * 获取或创建Kafka消费者实例,采用双重检查锁机制保证线程安全。 * * @param kafkaConfig Kafka基础配置参数映射表 * @param cacheKey 消费者实例的唯一标识键 * @param groupId 消费者组ID * @param maxRecords 单次拉取最大记录数 * @return 包含Kafka消费者实例的操作结果对象 */ private static Result> getOrCreateConsumer(Map kafkaConfig, String cacheKey, String groupId, Integer maxRecords) { Consumer consumer = consumerCache.getIfPresent(cacheKey); if (consumer == null) { // 使用双重检查锁确保线程安全 synchronized (consumerLock) { // 二次检查缓存是否存在有效消费者 consumer = consumerCache.getIfPresent(cacheKey); if (consumer == null) { // 构建消费者配置属性 Properties props = buildConsumerProps(kafkaConfig, groupId, maxRecords); // 实例化Kafka消费者 consumer = new KafkaConsumer<>(props); // 将新创建的消费者存入缓存 consumerCache.put(cacheKey, consumer); } } } return Result.success(consumer); } /** * 构建Kafka消费者配置属性 * * @param kafkaConfig 基础Kafka配置映射,包含原始配置键值对 * @param groupId 消费者组ID,用于标识消费者所属的组 * @param maxRecords 单次poll操作的最大记录数,若为null则使用默认值MAX_POLL_RECORDS * @return 配置好的Properties对象,包含所有消费者配置属性 */ private static Properties buildConsumerProps(Map kafkaConfig, String groupId, Integer maxRecords) { Properties props = new Properties(); props.putAll(kafkaConfig); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536"); // 64KB props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 1MB props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords == null?MAX_POLL_RECORDS:maxRecords); props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId); return props; } // ------------------------ 私有方法区 ------------------------ /** * 校验Kafka基础参数合法性并生成配置标识符 * * @param kafkaConfig Kafka配置参数映射表,需包含bootstrap.servers配置项 * @param groupId 消费者组ID,用于标识消费者组 * @param topic Kafka消息主题名称 * @param data 待发送的业务数据对象 * @return Result 校验结果封装对象,成功时返回由bootstrap.servers:groupId:topic组成的配置标识符字符串, * 失败时返回包含具体错误信息的失败结果 */ private static Result checkBaseParam(Map kafkaConfig, String groupId, String topic, Object data){ if (data == null) { return Result.fail("发送数据不能为空"); } if (topic == null || topic.isEmpty()) { return Result.fail("Kafka配置中缺少topic"); } if(kafkaConfig.isEmpty() || !kafkaConfig.containsKey("bootstrap.servers") || kafkaConfig.get("bootstrap.servers").isEmpty()){ return Result.fail("Kafka配置中缺少bootstrap.servers"); } return Result.success(kafkaConfig.get("bootstrap.servers") .concat(":") .concat(groupId) .concat(":") .concat(topic)); } /** * 检查Kafka消费者实例的健康状态 * * @param consumer 需要检查的Kafka消费者实例 * @return 如果消费者健康(成功获取非空分配分区和有效偏移量)返回true,否则返回false */ private static boolean isConsumerHealthy(Consumer consumer) { try { // 检查分配分区 Set assignments = consumer.assignment(); if (assignments.isEmpty()) { logger.warn("消费者未分配任何分区"); return false; } // 检查最新偏移量 Map endOffsets = consumer.endOffsets(assignments); logger.info("消费者最新偏移量: {}", endOffsets); return !endOffsets.isEmpty(); } catch (Exception e) { logger.warn("消费者健康检查失败", e); return false; } } /** * 测试Kafka消费者的连接状态。 * 通过尝试列出主题来验证连接是否正常。 * * @param consumer Kafka消费者实例,用于测试连接 * @param timeout 测试连接的超时时间,单位为毫秒 * @return 如果连接成功返回true,否则返回false */ private static boolean testKafkaConnection(Consumer consumer, Duration timeout) { try { // 通过获取topic列表测试连接 consumer.listTopics(timeout); return true; } catch (Exception e) { // 记录详细的网络异常日志 logger.warn("Kafka连接测试失败", e); return false; } } /** * 关闭指定的Kafka生产者实例并清理相关缓存。 * *

该方法安全关闭生产者实例,捕获并记录关闭过程中发生的异常, * 并确保无论关闭成功与否都强制清理缓存中的生产者实例。

* * @param cacheKey 用于定位缓存中生产者实例的键,不能为空 * @param producer 待关闭的生产者实例,可能为null */ private static void closeProducer(String cacheKey, Producer producer) { try { if (producer != null) { producer.close(); } } catch (Exception e) { logger.error("关闭生产者异常: {}", cacheKey, e); }finally { producerCache.invalidate(cacheKey); } } /** * 关闭指定的Kafka消费者并清理关联的缓存条目。 * *

该方法安全地关闭消费者实例,记录任何关闭过程中发生的异常, * 并确保最终从缓存中移除对应的缓存键。

* * @param cacheKey 用于标识消费者实例的缓存键,不可为空 * @param consumer 需要关闭的Kafka消费者实例,可能为null */ private static void closeConsumer(String cacheKey, Consumer consumer) { try { if (consumer != null) { consumer.close(); } } catch (Exception e) { logger.error("关闭生产者异常: {}", cacheKey, e); }finally { consumerCache.invalidate(cacheKey); } } //销毁前清除缓存 @PreDestroy public void destroy() { producerCache.asMap().forEach(KafkaHelper::closeProducer); producerCache.invalidateAll(); logger.info("生产者缓存清除"); consumerCache.asMap().forEach(KafkaHelper::closeConsumer); consumerCache.invalidateAll(); logger.info("消费者缓存清除"); } }