123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- package com.bfkj.unidia.IOUtils;
- import com.github.benmanes.caffeine.cache.Cache;
- import com.github.benmanes.caffeine.cache.Caffeine;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.errors.InterruptException;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.time.Duration;
- import java.util.*;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicReference;
- import com.bfkj.unidia.Result;
- import com.bfkj.unidia.DataUtils.DataFormatConverter;
- import org.springframework.stereotype.Component;
- import javax.annotation.PreDestroy;
- @Component
- public class KafkaHelper {
- private static final Logger logger = LoggerFactory.getLogger(KafkaHelper.class);
- private static final int MAX_CACHE_SIZE = 100;
- private static final int MAX_POLL_RECORDS = 500;
- private static final Cache<String, Producer<String, String>> producerCache;
- static {
- producerCache = Caffeine.newBuilder()
- .maximumSize(MAX_CACHE_SIZE)
- .expireAfterAccess(Duration.ofMinutes(16))
- .removalListener((key, value, cause) -> logger.info("Producer缓存移除: {} 原因: {}", key, cause))
- .build();
- }
- private static final Cache<String, Consumer<String, String>> consumerCache;
- static {
- consumerCache = Caffeine.newBuilder()
- .maximumSize(100)
- .expireAfterAccess(Duration.ofMinutes(16))
- .removalListener((key, value, cause) -> logger.info("Consumer缓存移除: {} 原因: {}", key, cause))
- .build();
- }
- private static final Object producerLock = new Object();
- private static final Object consumerLock = new Object(); // 新增消费者锁
- // 私有构造函数
- private KafkaHelper() {}
- /**
- * 发送 Kafka 消息
- *
- * @param kafkaConfig Kafka生产者配置参数,包含broker地址、序列化器等必要配置
- * @param topic 目标Topic名称,消息将被发布到该主题
- * @param data 待发送的消息数据,支持单条对象或对象列表形式
- * @return 操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息
- */
- public static Result<Integer> send(Map<String, String> kafkaConfig, String topic, Object data) {
- try {
- // 校验基础参数有效性
- Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, data);
- if(!checkBaseParamResult.isSuccess()){
- return Result.fail(checkBaseParamResult.getError());
- }
- String cacheKey = checkBaseParamResult.getData();
- // 获取或创建Kafka生产者实例
- Result<Producer<String, String>> producerResult = getOrCreateProducer(kafkaConfig, cacheKey);
- if(!producerResult.isSuccess()){
- return Result.fail(producerResult.getError());
- }
- Producer<String, String> producer = producerResult.getData();
- Result<Integer> integerResult = null;
- // 处理不同数据类型的消息发送
- if (data instanceof List<?> dataList) {
- integerResult = batchSend(producer, topic, dataList, cacheKey);
- } else {
- integerResult = sendSingleMessage(producer, topic, data, cacheKey);
- }
- logger.info("Kafka发送消息返回结果:");
- logger.info(integerResult.toString());
- return integerResult;
- } catch (Exception e) {
- // 记录发送异常日志并返回错误结果
- logger.error("Kafka发送异常", e);
- return Result.fail("Kafka发送异常: " + e.getMessage());
- }
- }
- /**
- * 向指定Kafka主题发送单条消息,并通过CountDownLatch同步等待发送结果
- *
- * @param producer Kafka生产者实例,负责实际的消息发送
- * @param topic 目标Kafka主题名称
- * @param data 待发送的消息数据对象(将转换为JSON字符串)
- * @return Result<Integer> 包含发送结果的包装对象,成功返回1,失败返回异常信息
- */
- private static Result<Integer> sendSingleMessage(Producer<String, String> producer,
- String topic,
- Object data,
- String cacheKey) {
- String value = DataFormatConverter.convertObjectToString(data);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
- AtomicReference<Result<Integer>> resultRef = new AtomicReference<>();
- CountDownLatch latch = new CountDownLatch(1);
- // 异步发送Kafka消息并处理回调结果
- producer.send(record, (metadata, exception) -> {
- if (exception != null) {
- if (exception instanceof InterruptException) {
- logger.warn("生产者操作被中断,准备关闭资源");
- Thread.currentThread().interrupt(); // 保留中断状态
- //producerCache.invalidate(generateProducerCacheKey(kafkaConfig, topic));
- producer.close(); // 安全关闭生产者
- }
- logger.error("Kafka消息发送失败", exception);
- resultRef.set(Result.fail(exception.getMessage()));
- } else {
- resultRef.set(Result.success(1));
- }
- latch.countDown();
- });
- // 等待消息发送完成并获取执行结果
- try {
- latch.await(); // 等待发送完成
- return resultRef.get(); // 通过原子引用获取结果
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- producerCache.invalidate(cacheKey);
- producer.close(); // 安全关闭生产者
- return Result.fail("发送线程被中断" + e.getMessage());
- }
- }
- /**
- * 批量发送消息到Kafka主题,支持异步发送并处理结果。
- *
- * @param producer Kafka生产者实例,用于发送消息
- * @param topic 目标Kafka主题名称
- * @param dataList 待发送的数据列表,元素类型为可序列化对象
- * @return 返回包含发送结果的Result对象:
- * - 成功时返回成功发送的消息数量
- * - 失败时返回错误信息(超时/部分失败/异常)
- */
- private static Result<Integer> batchSend(Producer<String, String> producer,
- String topic,
- List<?> dataList,
- String cacheKey) {
- try {
- //数据预处理阶段: 将输入数据列表转换为Kafka ProducerRecord对象集合
- List<ProducerRecord<String, String>> records = dataList.stream()
- .map(item -> new ProducerRecord<String, String>(topic, DataFormatConverter.convertObjectToString(item)))
- .toList();
- //异步发送初始化
- AtomicInteger successCount = new AtomicInteger(0);
- AtomicReference<Exception> firstError = new AtomicReference<>(null);
- CountDownLatch latch = new CountDownLatch(records.size());
- //异步发送执行
- records.forEach(record -> producer.send(record, (metadata, exception) -> {
- if (exception != null) {
- if (firstError.get() == null) {
- firstError.set(exception);
- logger.error("批量发送失败", exception);
- }
- } else {
- successCount.incrementAndGet();
- }
- latch.countDown();
- }));
- //发送结果等待与超时处理
- try {
- boolean completed = latch.await(30, TimeUnit.SECONDS); // 捕获返回值
- if (!completed) {
- logger.error("批量发送超时,强制关闭生产者");
- Thread.currentThread().interrupt();
- producer.close(); // 主动关闭生产者释放资源
- return Result.fail("批量发送超时超过30秒");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- producer.close(); // 主动关闭生产者释放资源
- return Result.fail("批量发送超时中断: " + e.getMessage());
- }
- //最终结果判定
- if (firstError.get() != null) {
- return Result.fail("部分消息发送失败: " + firstError.get().getMessage());
- }
- return Result.success(successCount.get());
- } catch (Exception e) {
- closeProducer(cacheKey,producer);
- logger.error("批量处理异常", e);
- return Result.fail("批量处理失败: " + e.getMessage());
- }
- }
- /**
- * 获取或创建指定主题的Kafka生产者实例。
- * 通过配置参数和主题名称构建缓存键,优先从生产者缓存中获取现有实例。
- * 若缓存中不存在,则在同步锁保护下创建新生产者,并存入缓存供后续复用。
- *
- * @param config 包含Kafka生产者配置参数的Map,必须包含"bootstrap.servers"键
- * @return Result对象,包含以下两种情况:
- * - 成功时返回可用的Producer实例
- * - 失败时返回包含错误信息的Result.fail对象
- */
- private static Result<Producer<String, String>> getOrCreateProducer(Map<String, String> config, String cacheKey) {
- Producer<String, String> existingProducer = producerCache.getIfPresent(cacheKey);
- if (existingProducer == null) {
- // 进入线程安全区域,确保生产者创建过程的原子性操作
- synchronized (producerLock) {
- // 二次检查缓存状态,防止并发创建重复实例
- existingProducer = producerCache.getIfPresent(cacheKey);
- if (existingProducer == null) {
- // 执行实际生产者创建流程
- Result<Producer<String, String>> producerResult = createProducer(config);
- if(!producerResult.isSuccess()){
- return producerResult;
- }
- Producer<String, String> newProducer = producerResult.getData();
- // 验证生产者创建结果有效性
- if(newProducer == null){
- return Result.fail(producerResult.getError());
- }
- // 将新建生产者存入缓存供后续复用
- producerCache.put(cacheKey, newProducer);
- return producerResult;
- }
- }
- }
- return Result.success(existingProducer);
- }
- /**
- * 创建Kafka生产者实例并进行基础配置优化
- *
- * @param config 用户提供的基础配置参数,用于初始化Kafka生产者
- * @return Result对象包含生产者实例创建结果:
- * - 成功时返回包含KafkaProducer的Result实例
- * - 失败时返回包含错误信息的Result实例
- * 本方法主要完成以下配置处理:
- * 1. 确保消息确认机制配置(acks)存在,默认使用-1(所有副本确认)
- * 2. 启用幂等性保障机制,默认开启
- * 3. 设置关键超时参数优化生产者性能
- * 4. 固化键值序列化器为StringSerializer
- */
- private static Result<Producer<String, String>> createProducer(Map<String, String> config) {
- try {
- //检查并设置消息确认机制配置
- if (!config.containsKey(ProducerConfig.ACKS_CONFIG)) {
- logger.warn("未配置acks参数,默认使用-1(all)");
- config.put(ProducerConfig.ACKS_CONFIG, "-1");
- }
- //检查并启用幂等性保障
- if (!config.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
- logger.info("启用幂等性保障");
- config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
- }
- Properties props = new Properties();
- props.putAll(config);
- // 优化配置
- props.putIfAbsent(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");//最大阻塞时间60秒
- props.putIfAbsent(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");//消息投递超时时间120秒
- props.putIfAbsent(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");//请求超时时间30秒
- //固定使用String序列化器
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- return Result.success(new KafkaProducer<>(props));
- } catch (Exception e) {
- logger.error("创建Kafka生产者失败", e);
- return Result.fail("创建Kafka生产者失败: " + e.getMessage());
- }
- }
- //---------------------------------------------------------------------------------------------
- /**
- * 消费 Kafka 消息
- * 功能说明:
- * 1. 校验基础参数并获取Kafka消费者实例
- * 2. 订阅指定主题并拉取消息
- * 3. 处理消息后同步提交偏移量
- * 4. 包含连接健康检测机制
- *
- * @param kafkaConfig Kafka基础配置参数,包含bootstrap.servers等必要配置项
- * @param topic 需要消费的主题名称
- * @param groupId 消费者组ID(可为空,单机模式可不传)
- * @param maxRecords 单次拉取最大记录数限制
- * @return Result<List<String>> 返回消费结果对象,成功时包含消息列表,失败时包含错误信息
- * - 成功状态:包含消费到的消息字符串列表
- * - 失败状态:包含具体错误原因(参数校验失败/连接异常/消费异常等)
- */
- public static Result<List<String>> consume(Map<String, String> kafkaConfig, String topic, String groupId, Integer maxRecords) {
- // 参数校验
- Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig, groupId,topic, "data");
- if(!checkBaseParamResult.isSuccess()){
- return Result.fail(checkBaseParamResult.getError());
- }
- //生成消费者缓存键
- String cacheKey = checkBaseParamResult.getData();
- Result<Consumer<String, String>> consumerResult = getOrCreateConsumer(kafkaConfig, cacheKey, groupId, maxRecords);
- if(!consumerResult.isSuccess()){
- return Result.fail(consumerResult.getError());
- }
- Consumer<String, String> consumer = consumerResult.getData();
- try {
- consumer.subscribe(Collections.singletonList(topic));
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
- List<String> messages = new ArrayList<>();
- for (ConsumerRecord<String, String> record : records) {
- messages.add(record.value());
- logger.info("Kafka消费数据: {}", record.value());
- }
- if (!messages.isEmpty()) {
- consumer.commitSync(); // 同步提交偏移量
- }else {// 2. 主动连接检测
- if (!testKafkaConnection(consumer, Duration.ofSeconds(5)) || !isConsumerHealthy(consumer)) {
- closeConsumer(cacheKey,consumer);
- return Result.fail("Kafka消费者{".concat(cacheKey).concat("}异常"));
- }
- }
- logger.info("消费数量:");
- logger.info(String.valueOf(messages.size()));
- if (messages.size()>0) logger.info(messages.get(0));
- return Result.success(messages);
- }catch (Exception e) {
- closeConsumer(cacheKey,consumer);
- logger.error("Kafka消费异常{}",cacheKey, e);
- return Result.fail("Kafka消费异常: " + e.getMessage());
- }
- }
- /**
- * 获取或创建Kafka消费者实例,采用双重检查锁机制保证线程安全。
- *
- * @param kafkaConfig Kafka基础配置参数映射表
- * @param cacheKey 消费者实例的唯一标识键
- * @param groupId 消费者组ID
- * @param maxRecords 单次拉取最大记录数
- * @return 包含Kafka消费者实例的操作结果对象
- */
- private static Result<Consumer<String, String>> getOrCreateConsumer(Map<String, String> kafkaConfig,
- String cacheKey,
- String groupId,
- Integer maxRecords) {
- Consumer<String, String> consumer = consumerCache.getIfPresent(cacheKey);
- if (consumer == null) {
- // 使用双重检查锁确保线程安全
- synchronized (consumerLock) {
- // 二次检查缓存是否存在有效消费者
- consumer = consumerCache.getIfPresent(cacheKey);
- if (consumer == null) {
- // 构建消费者配置属性
- Properties props = buildConsumerProps(kafkaConfig, groupId, maxRecords);
- // 实例化Kafka消费者
- consumer = new KafkaConsumer<>(props);
- // 将新创建的消费者存入缓存
- consumerCache.put(cacheKey, consumer);
- }
- }
- }
- return Result.success(consumer);
- }
- /**
- * 构建Kafka消费者配置属性
- *
- * @param kafkaConfig 基础Kafka配置映射,包含原始配置键值对
- * @param groupId 消费者组ID,用于标识消费者所属的组
- * @param maxRecords 单次poll操作的最大记录数,若为null则使用默认值MAX_POLL_RECORDS
- * @return 配置好的Properties对象,包含所有消费者配置属性
- */
- private static Properties buildConsumerProps(Map<String, String> kafkaConfig, String groupId, Integer maxRecords) {
- Properties props = new Properties();
- props.putAll(kafkaConfig);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536"); // 64KB
- props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
- props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 1MB
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords == null?MAX_POLL_RECORDS:maxRecords);
- props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
- return props;
- }
- // ------------------------ 私有方法区 ------------------------
- /**
- * 校验Kafka基础参数合法性并生成配置标识符
- *
- * @param kafkaConfig Kafka配置参数映射表,需包含bootstrap.servers配置项
- * @param groupId 消费者组ID,用于标识消费者组
- * @param topic Kafka消息主题名称
- * @param data 待发送的业务数据对象
- * @return Result<String> 校验结果封装对象,成功时返回由bootstrap.servers:groupId:topic组成的配置标识符字符串,
- * 失败时返回包含具体错误信息的失败结果
- */
- private static Result<String> checkBaseParam(Map<String, String> kafkaConfig,
- String groupId,
- String topic,
- Object data){
- if (data == null) {
- return Result.fail("发送数据不能为空");
- }
- if (topic == null || topic.isEmpty()) {
- return Result.fail("Kafka配置中缺少topic");
- }
- if(kafkaConfig.isEmpty() || !kafkaConfig.containsKey("bootstrap.servers") ||
- kafkaConfig.get("bootstrap.servers").isEmpty()){
- return Result.fail("Kafka配置中缺少bootstrap.servers");
- }
- return Result.success(kafkaConfig.get("bootstrap.servers")
- .concat(":")
- .concat(groupId)
- .concat(":")
- .concat(topic));
- }
- /**
- * 检查Kafka消费者实例的健康状态
- *
- * @param consumer 需要检查的Kafka消费者实例
- * @return 如果消费者健康(成功获取非空分配分区和有效偏移量)返回true,否则返回false
- */
- private static boolean isConsumerHealthy(Consumer<String, String> consumer) {
- try {
- // 检查分配分区
- Set<TopicPartition> assignments = consumer.assignment();
- if (assignments.isEmpty()) {
- logger.warn("消费者未分配任何分区");
- return false;
- }
- // 检查最新偏移量
- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
- logger.info("消费者最新偏移量: {}", endOffsets);
- return !endOffsets.isEmpty();
- } catch (Exception e) {
- logger.warn("消费者健康检查失败", e);
- return false;
- }
- }
- /**
- * 测试Kafka消费者的连接状态。
- * 通过尝试列出主题来验证连接是否正常。
- *
- * @param consumer Kafka消费者实例,用于测试连接
- * @param timeout 测试连接的超时时间,单位为毫秒
- * @return 如果连接成功返回true,否则返回false
- */
- private static boolean testKafkaConnection(Consumer<String, String> consumer, Duration timeout) {
- try {
- // 通过获取topic列表测试连接
- consumer.listTopics(timeout);
- return true;
- } catch (Exception e) {
- // 记录详细的网络异常日志
- logger.warn("Kafka连接测试失败", e);
- return false;
- }
- }
- /**
- * 关闭指定的Kafka生产者实例并清理相关缓存。
- *
- * <p>该方法安全关闭生产者实例,捕获并记录关闭过程中发生的异常,
- * 并确保无论关闭成功与否都强制清理缓存中的生产者实例。</p>
- *
- * @param cacheKey 用于定位缓存中生产者实例的键,不能为空
- * @param producer 待关闭的生产者实例,可能为null
- */
- private static void closeProducer(String cacheKey, Producer<String, String> producer) {
- try {
- if (producer != null) {
- producer.close();
- }
- } catch (Exception e) {
- logger.error("关闭生产者异常: {}", cacheKey, e);
- }finally {
- producerCache.invalidate(cacheKey);
- }
- }
- /**
- * 关闭指定的Kafka消费者并清理关联的缓存条目。
- *
- * <p>该方法安全地关闭消费者实例,记录任何关闭过程中发生的异常,
- * 并确保最终从缓存中移除对应的缓存键。</p>
- *
- * @param cacheKey 用于标识消费者实例的缓存键,不可为空
- * @param consumer 需要关闭的Kafka消费者实例,可能为null
- */
- private static void closeConsumer(String cacheKey, Consumer<String, String> consumer) {
- try {
- if (consumer != null) {
- consumer.close();
- }
- } catch (Exception e) {
- logger.error("关闭生产者异常: {}", cacheKey, e);
- }finally {
- consumerCache.invalidate(cacheKey);
- }
- }
- //销毁前清除缓存
- @PreDestroy
- public void destroy() {
- producerCache.asMap().forEach(KafkaHelper::closeProducer);
- producerCache.invalidateAll();
- logger.info("生产者缓存清除");
- consumerCache.asMap().forEach(KafkaHelper::closeConsumer);
- consumerCache.invalidateAll();
- logger.info("消费者缓存清除");
- }
- }
|