123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788 |
- package com.bfkj.unidia.DataBaseUtils;
- import com.bfkj.unidia.Result;
- import com.github.benmanes.caffeine.cache.Cache;
- import com.github.benmanes.caffeine.cache.Caffeine;
- import com.zaxxer.hikari.HikariDataSource;
- import com.zaxxer.hikari.HikariPoolMXBean;
- import lombok.extern.slf4j.Slf4j;
- import net.sf.jsqlparser.JSQLParserException;
- import net.sf.jsqlparser.parser.CCJSqlParserUtil;
- import net.sf.jsqlparser.statement.Statement;
- import net.sf.jsqlparser.statement.alter.Alter;
- import net.sf.jsqlparser.statement.create.table.CreateTable;
- import net.sf.jsqlparser.statement.delete.Delete;
- import net.sf.jsqlparser.statement.insert.Insert;
- import net.sf.jsqlparser.statement.select.Select;
- import net.sf.jsqlparser.statement.update.Update;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
- import org.springframework.jdbc.core.namedparam.SqlParameterSource;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
- import java.sql.*;
- import java.util.*;
- import java.util.concurrent.*;
- import java.util.function.Function;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import java.util.stream.Collectors;
- import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
- /**
- * 高性能 JDBC 执行器,支持:
- * - SQL 注入检测(基于 JSqlParser)
- * - 批量插入自动降级
- * - 分批次插入
- * - 自动事务控制
- */
- @Service
- @Slf4j
- public class JdbcExecutor {
- private static final Logger logger = LoggerFactory.getLogger(JdbcExecutor.class);
- private final ConnectionPoolManager poolManager;
- private final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(1000); // 默认每秒 1000 请求
- // 线程池
- private final ExecutorService executor = ForkJoinPool.commonPool();
- // 安全 SQL 缓存(避免重复检测)
- private final Cache<String, Boolean> safeSqlCache = buildCaffeineCache();
- @Autowired
- public JdbcExecutor(ConnectionPoolManager poolManager) {
- this.poolManager = poolManager;
- }
- private static final Pattern SQL_INJECTION_PATTERN = Pattern.compile(
- "(?i)('|\\b(?:exec|execute|unhex|declare|drop|truncate|table|database|if|exists|script|xp_cmdshell|sp_executesql)\\b)|" +
- "(\\b(?:and|or)\\b\\s+.*(?:=|like).*'.*)"
- );
- /**
- * 执行查询操作(统一入口,含 SQL 注入检测)
- *
- * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
- * @param sql 查询的SQL语句
- * @param params SQL语句中的参数列表
- * @param rowMapper 一个函数接口,用于将结果集中的每一行映射为一个对象
- * @param <T> 泛型参数,表示结果集中每一行数据映射后的类型
- * @return 返回一个Result对象,包含查询结果列表如果查询失败,返回失败的Result对象
- */
- public <T> Result<List<T>> query(Map<String, String> dbConfig,
- String sql,
- List<Object> params,
- Function<ResultSet, T> rowMapper) {
- //复用标准查询即可
- return query(dbConfig, sql, params, 0, 0, rowMapper);
- }
- /**
- * 执行查询操作并返回结果列表
- * 该方法负责根据提供的SQL查询语句、数据库配置、参数以及分页信息来执行数据库查询操作
- * 它使用了泛型来允许调用者指定返回类型,并利用函数式接口来处理结果集映射
- *
- * @param dbConfig 数据库配置信息,包括URL、用户名、密码等
- * @param sql 查询语句
- * @param params 查询语句的参数
- * @param page 当前页码
- * @param pageSize 每页记录数
- * @param rowMapper 函数式接口,用于将结果集一行映射为一个对象
- * @param <T> 泛型,表示返回的对象类型
- * @return 查询结果,包含一个对象列表
- */
- public <T> Result<List<T>> query(Map<String, String> dbConfig,
- String sql,
- List<Object> params,
- int page,
- int pageSize,
- Function<ResultSet, T> rowMapper) {
- String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
- return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
- try(Connection connection = dataSource.getConnection()) {
- try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
- // 设置参数
- if (params != null && !params.isEmpty()) {
- for (int i = 0; i < params.size(); i++) {
- ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
- }
- }
- logger.info("ps.toString():{}", ps.toString());
- // 执行查询
- ResultSet rs = ps.executeQuery();
- // 映射结果
- List<T> result = new ArrayList<>();
- while (rs.next()) {
- result.add(rowMapper.apply(rs));
- }
- return Result.success(result);
- }catch (SQLException e){
- return Result.fail("查询失败: " + e.getMessage());
- }
- } catch (Exception e) {
- logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- public Result<List<Map<String, Object>>> queryForSql(Map<String, String> dbConfig,
- String sql,
- List<Object> params ) {
- if (!StringUtils.hasText(sql)){
- return Result.fail("sql 不存在或为空");
- }
- return customQueryWithDatasource(dbConfig, sql, dataSource -> {
- try(Connection connection = dataSource.getConnection()) {
- try(PreparedStatement ps = connection.prepareStatement(sql)){
- // 设置参数
- if (params != null && !params.isEmpty()) {
- for (int i = 0; i < params.size(); i++) {
- ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
- }
- }
- logger.info("执行 SQL: {}", sql);
- List<Map<String, Object>> resultList = new ArrayList<>();
- try (ResultSet rs = ps.executeQuery()) {
- ResultSetMetaData metaData = rs.getMetaData();
- int columnCount = metaData.getColumnCount();
- // 获取所有列名
- List<String> columnNames = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- columnNames.add(metaData.getColumnLabel(i)); // 使用别名
- }
- // 遍历结果集
- while (rs.next()) {
- Map<String, Object> row = new LinkedHashMap<>();
- for (int i = 0; i < columnCount; i++) {
- String columnName = columnNames.get(i);
- Object value = rs.getObject(i + 1);
- row.put(columnName, value);
- }
- resultList.add(row);
- }
- }
- logger.info("Query returned {} rows", resultList.size());
- return Result.success(resultList);
- }catch (SQLException e){
- return Result.fail("查询失败: " + e.getMessage());
- }
- } catch (Exception e) {
- logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), sql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- public Result<List<Map<String, Object>>> jdbcSearch(Map<String, String> dbConfig,
- String sql,
- List<Object> params,
- int page,
- int pageSize ) {
- String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
- return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
- try(Connection connection = dataSource.getConnection()) {
- try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
- // 设置参数
- if (params != null && !params.isEmpty()) {
- for (int i = 0; i < params.size(); i++) {
- ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
- }
- }
- logger.info("ps.toString():{}", ps.toString());
- logger.info("Executing paginated SQL: {}", paginatedSql);
- List<Map<String, Object>> resultList = new ArrayList<>();
- try (ResultSet rs = ps.executeQuery()) {
- ResultSetMetaData metaData = rs.getMetaData();
- int columnCount = metaData.getColumnCount();
- // 获取所有列名
- List<String> columnNames = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- columnNames.add(metaData.getColumnLabel(i)); // 使用别名
- }
- // 遍历结果集
- while (rs.next()) {
- Map<String, Object> row = new LinkedHashMap<>();
- for (int i = 0; i < columnCount; i++) {
- String columnName = columnNames.get(i);
- Object value = rs.getObject(i + 1);
- row.put(columnName, value);
- }
- resultList.add(row);
- }
- }
- logger.info("Query returned {} rows", resultList.size());
- return Result.success(resultList);
- }catch (SQLException e){
- return Result.fail("查询失败: " + e.getMessage());
- }
- } catch (Exception e) {
- logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- /**
- * 执行SQL查询并返回结果列表
- * 此方法重载了另一个queryForList方法,为查询数据库提供了便利
- * 它使用了默认的分页参数(0,0),表示不分页
- *
- * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
- * @param sql 要执行的SQL查询语句
- * @param params SQL查询语句中的参数列表
- * @return 包含查询结果的列表,每个结果是一个键值对映射,表示一行数据
- */
- public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
- String sql,
- List<Object> params) {
- //复用标准查询即可
- return queryForList(dbConfig, sql, params, 0, 0);
- }
- /**
- * 根据提供的数据库配置、SQL查询语句、参数列表以及分页信息来查询数据库并返回结果列表
- *
- * @param dbConfig 数据库配置信息,包含URL、用户名、密码等连接数据库所需的信息
- * @param sql 要执行的SQL查询语句
- * @param params SQL查询语句中的参数列表
- * @param page 当前查询的页码
- * @param pageSize 每页显示的记录数
- * @return 返回一个Result对象,其中包含查询结果列表如果查询失败,返回失败结果及错误信息
- */
- public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
- String sql,
- List<Object> params,
- int page,
- int pageSize) {
- String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
- return queryWithJdbcTemplate(dbConfig, paginatedSql, jdbcTemplate -> {
- try {
- List<Object> test = new ArrayList<>();
- test.add(1);
- List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(paginatedSql, test);
- return Result.success(queryForList);
- }catch (Exception e){
- logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), paginatedSql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- /**
- * 根据SQL查询数据库并以Map形式返回结果
- *
- * @param dbConfig 数据库配置信息,包括连接地址、用户名、密码等
- * @param sql 执行的SQL查询语句
- * @param params SQL语句中的参数列表
- * @return 返回一个Result对象,包含查询结果的Map或错误信息
- */
- public Result<Map<String,Object>> queryForMap(Map<String, String> dbConfig,
- String sql,
- List<Object> params) {
- return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
- try {
- return Result.success(jdbcTemplate.queryForMap(sql, params));
- }catch (Exception e){
- logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- /**
- * 执行查询并返回单个对象结果
- * 该方法用于执行SQL查询,并期望得到一个结果对象如果查询成功,返回一个包含查询结果的Result对象;
- * 如果查询失败,则返回一个表示失败的Result对象,并包含错误信息
- *
- * @param dbConfig 数据库配置信息,包括连接数据库所需的URL、用户名和密码等
- * @param sql 要执行的SQL查询语句
- * @param params SQL语句的参数列表,用于预编译SQL语句以防止SQL注入
- * @return 返回一个Result对象,包含查询结果或错误信息
- */
- public Result<Object> queryForObject(Map<String, String> dbConfig,
- String sql,
- List<Object> params) {
- return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
- try {
- Object queriedForObject = jdbcTemplate.queryForObject(sql, Object.class, params.toArray());
- return Result.success(queriedForObject);
- }catch (Exception e){
- logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
- return Result.fail(e.getMessage());
- }
- });
- }
- /**
- * 异步执行查询(适用于大数据量导出等场景)
- * 该方法通过CompletableFuture实现异步执行查询操作,适用于需要处理大量数据的场景,例如数据导出
- * 使用异步执行可以提高程序的响应性,特别是在处理时间较长的查询时,不会阻塞主线程
- *
- * @param dbConfig 数据库配置参数,包括数据库URL、用户名、密码等信息
- * @param sql SQL查询语句,用于从数据库获取数据
- * @param params SQL语句参数,用于预编译SQL语句,提高安全性和性能
- * @param rowMapper 行映射函数,用于将查询结果集中的每一行转换为指定类型的对象
- * @param <T> 泛型参数,表示查询结果集中每条记录映射成的对象类型
- * @return 返回一个CompletableFuture对象,表示异步执行的结果
- * 通过这个对象可以进行后续的异步处理,如转换、消费等操作
- */
- public <T> CompletableFuture<Result<List<T>>> asyncQuery(
- Map<String, String> dbConfig,
- String sql,
- List<Object> params,
- Function<ResultSet, T> rowMapper) {
- return CompletableFuture.supplyAsync(() -> query(dbConfig, sql, params, rowMapper), executor);
- }
- /**
- * 执行数据库查询操作
- *
- * @param dbConfig 数据库配置信息,包含数据库连接所需的各种参数
- * @param sql 待执行的SQL查询语句
- * @param operation 函数式接口,定义了如何执行查询操作并返回结果
- * @param <T> 查询结果的类型
- * @return 返回一个Result对象,包含查询结果或错误信息
- */
- private <T> Result<T> queryWithJdbcTemplate(Map<String, String> dbConfig,String sql, Function<JdbcTemplate, Result<T>> operation) {
- // 检查SQL语句中是否包含潜在的SQL注入风险
- if (isPotentialSqlInjection(sql,"query")) {
- logger.warn("检测到潜在 SQL 注入风险: {}", sql);
- return Result.fail("SQL 含非法字符");
- }
- if (!rateLimiter.allowRequest()) {
- logger.warn("请求被限流");
- return Result.fail("请求被限流,请稍后重试");
- }
- //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
- Result<JdbcTemplate> jdbcTemplateResult = poolManager.getJdbcTemplate(dbConfig);
- if(!jdbcTemplateResult.isSuccess()){
- return Result.fail(jdbcTemplateResult.getError());
- }
- return operation.apply(jdbcTemplateResult.getData());
- }
- private <T> Result<T> queryWithDatasource(Map<String, String> dbConfig,String sql, Function<HikariDataSource, Result<T>> operation) {
- // 检查SQL语句中是否包含潜在的SQL注入风险
- if (isPotentialSqlInjection(sql,"query")) {
- logger.warn("检测到潜在 SQL 注入风险: {}", sql);
- return Result.fail("SQL 含非法字符");
- }
- if (!rateLimiter.allowRequest()) {
- logger.warn("请求被限流");
- return Result.fail("请求被限流,请稍后重试");
- }
- //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
- Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
- if(!dataSourceResult.isSuccess()){
- return Result.fail(dataSourceResult.getError());
- }
- return operation.apply(dataSourceResult.getData());
- }
- private <T> Result<T> customQueryWithDatasource(Map<String, String> dbConfig,String sql, Function<HikariDataSource, Result<T>> operation) {
- // 检查SQL语句中是否包含潜在的SQL注入风险
- if (customerHandelSqlInjection(sql,"query")) {
- logger.warn("检测到潜在 SQL 注入风险: {}", sql);
- return Result.fail("SQL 含非法字符");
- }
- if (!rateLimiter.allowRequest()) {
- logger.warn("请求被限流");
- return Result.fail("请求被限流,请稍后重试");
- }
- //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
- Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
- if(!dataSourceResult.isSuccess()){
- return Result.fail(dataSourceResult.getError());
- }
- return operation.apply(dataSourceResult.getData());
- }
- /**
- * 根据不同数据库应用分页逻辑
- */
- private String applyPagination(String sql, String dbUrl, int page, int pageSize) {
- if(page < 1 || pageSize < 1){
- return sql;
- }
- int offset = (page - 1) * pageSize;
- if (dbUrl.contains(":oracle:") || dbUrl.contains(":kingbase:") ||
- dbUrl.contains(":dm:")) {
- int start = offset + 1;
- int end = offset + pageSize;
- return "SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (" + sql + ") a WHERE ROWNUM <= " + end + ") WHERE rnum >= " + start;
- } else if (dbUrl.contains(":sqlserver:")) {
- return sql + " OFFSET " + offset + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
- } else {
- return sql + " LIMIT " + pageSize + " OFFSET " + offset;
- }
- }
- //----------------------------------------------------------------------------------
- public Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params) {
- return update(dbConfig, sql, params,"update");
- }
- /**
- * 执行更新操作(统一入口,含 SQL 注入检测)
- * 该方法旨在提供一个安全、通用的数据库更新操作接口,通过预编译 SQL 语句和参数设置来防止 SQL 注入攻击
- * 同时,它通过事务控制确保数据一致性,在操作失败时进行回滚
- *
- * @param dbConfig 数据库连接配置,包含连接数据库所需的信息,如 URL、用户名、密码等
- * @param sql 待执行的 SQL 更新语句,应包含占位符以安全地插入参数
- * @param params SQL 语句中的参数列表,用于替换 SQL 语句中的占位符
- * @return 返回一个 Result 对象,包含受影响的行数,表示更新操作的结果
- */
- private Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params,String type) {
- if(type == null || type.isEmpty()) type = "update";
- return validateParams(dbConfig,sql, params,type,dataSource -> {
- try (Connection connection = dataSource.getConnection()){
- connection.setAutoCommit(false);
- try (PreparedStatement ps = connection.prepareStatement(sql)) {
- logger.info("sql:{}",sql);
- setParameters(ps, params);// 设置 SQL 语句的参数,防止 SQL 注入
- int executedUpdate = ps.executeUpdate();// 执行更新操作
- connection.commit();// 提交事务
- return Result.success(executedUpdate);// 返回成功结果
- } catch (Exception e) {
- // 异常处理:回滚事务并记录错误日志
- try {
- connection.rollback();
- } catch (SQLException ex) {
- logger.warn("回滚事务失败", ex);
- }
- logger.error("更新失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
- // 返回失败结果
- return Result.fail("更新失败: " + e.getMessage());
- }
- }catch (Exception e){
- logger.error("获取数据库连接失败:{}", e.getMessage());
- return Result.fail("获取数据库连接失败: " + e.getMessage());
- }
- });
- }
- /**
- * 批量执行更新操作,并返回总的更新成功条数
- *
- * @param dbConfig 数据库配置信息
- * @param sql SQL 插入/更新语句
- * @param batchParams 每一组参数代表一条记录
- * @return Result<Integer> 包含成功更新的总记录数
- */
- public Result<Integer> batchUpdate(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
- return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
- int totalSuccess = 0;
- // 根据负载情况返回相应的批处理大小
- int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
- //参数分批次
- List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
- //循环批次
- for (List<List<Object>> batch : batches) {
- Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
- if (!result.isSuccess()) {
- logger.warn("整体批量插入失败,尝试单条插入");
- // 单条插入并统计成功数量
- int successCount = batch.parallelStream()
- .map(params -> update(dbConfig, sql, params))
- .filter(Result::isSuccess)
- .mapToInt(Result::getData)
- .sum();
- totalSuccess += successCount;
- } else {
- totalSuccess += result.getData();
- }
- }
- return Result.success(totalSuccess);
- });
- }
- /**
- * 批量执行更新操作,并返回失败的记录
- *
- * @param dbConfig 数据库配置信息
- * @param sql SQL 插入/更新语句
- * @param batchParams 每一组参数代表一条记录
- * @return Result<Integer> 包含成功更新的总记录数
- */
- public Result<List<List<Object>>> batchUpdateWithFailures(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
- return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
- // 根据负载情况返回相应的批处理大小
- int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
- //参数分批次
- List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
- List<List<Object>> failedParams = null;
- //循环批次
- for (List<List<Object>> batch : batches) {
- Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
- if (!result.isSuccess()) {
- logger.warn("整体批量插入失败,尝试单条插入");
- // 单条插入并统计成功数量
- failedParams = batch.parallelStream()
- .filter(params -> !update(dbConfig, sql, params).isSuccess())
- .toList();
- }
- }
- return Result.success(failedParams);
- });
- }
- /**
- * 验证参数并执行数据库操作
- * 此方法主要用于验证传入的数据库配置和SQL语句是否符合规范,以防止SQL注入等安全问题
- * 同时,它还负责限流,以保护系统在高并发情况下能够稳定运行
- *
- * @param dbConfig 数据库配置信息,包括数据库地址、用户名、密码等
- * @param sql 待执行的SQL语句
- * @param operation 数据库操作,这是一个函数式接口,允许传递一个操作数据库的函数
- * @param <T> 泛型参数,表示操作结果的具体类型
- * @return 返回一个Result对象,包含操作结果或错误信息
- */
- private <T> Result<T> validateParams(Map<String, String> dbConfig,
- String sql,
- Object params,
- String type,
- Function<HikariDataSource, Result<T>> operation) {
- if (!rateLimiter.allowRequest()) {
- logger.warn("请求被限流");
- return Result.fail("请求被限流,请稍后重试");
- }
- if(type.equalsIgnoreCase("update") && (params == null || sql == null || sql.isEmpty())){
- return Result.fail("请检查执行参数");
- }
- if (isPotentialSqlInjection(sql,type)) {
- logger.warn("检测到潜在 SQL 注入风险: {}", sql);
- return Result.fail("SQL 含非法字符");
- }
- Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
- if (!dataSourceResult.isSuccess()) {
- return Result.fail(dataSourceResult.getError());
- }
- return operation.apply(dataSourceResult.getData());
- }
- /**
- * 根据数据库配置动态计算批处理大小
- * 该方法旨在根据当前数据库连接池的负载情况,动态调整批处理的大小,以优化数据库性能
- * 当连接池负载较高时,减小批处理大小以减少数据库压力;当负载较低时,增大批处理大小以提高处理效率
- *
- * @param dataSource 数据源,用于获取数据源和连接池信息
- * @return 根据当前数据库负载动态计算出的批处理大小
- */
- private int calculateDynamicBatchSize(HikariDataSource dataSource) {
- // 获取连接池管理代理对象
- HikariPoolMXBean poolProxy = dataSource.getHikariPoolMXBean();
- // 计算当前数据库负载
- double load = (double) poolProxy.getActiveConnections() / dataSource.getMaximumPoolSize(); // 获取数据库负载(如连接池利用率)
- // 根据负载情况返回相应的批处理大小
- return load > 0.8 ? Math.max(100, 450)
- : load < 0.3 ? Math.min(1000, 550)
- : 500;
- }
- /**
- * 分批次处理
- * 将一个列表按照指定大小分割成多个子列表
- * 这个方法有助于处理大量数据时,分批次进行处理,以提高系统性能和避免内存溢出
- *
- * @param list 待分割的列表
- * @param size 每个子列表的大小
- * @return 返回一个包含多个子列表的列表
- */
- public static <T> List<List<T>> splitBatch(List<T> list, int size) {
- // 检查批次大小是否有效,如果小于等于0,则返回空列表
- if (size <= 0) return Collections.emptyList();
- // 创建一个用于存储分割后子列表的列表
- List<List<T>> result = new ArrayList<>();
- // 遍历原始列表,每次增加size个元素
- for (int i = 0; i < list.size(); i += size) {
- // 根据当前索引和批次大小,获取子列表,并添加到结果列表中
- // 使用Math.min函数来确保不会超出原始列表的大小
- result.add(list.subList(i, Math.min(i + size, list.size())));
- }
- // 返回分割后的子列表列表
- return result;
- }
- /**
- * 执行批量插入或更新操作,并返回总的更新成功条数
- *
- * @param dataSource 数据源
- * @param sql SQL 插入/更新语句
- * @param batchParams 批处理参数列表,每个元素是一组参数
- * @return 返回一个 Result<Integer>,包含所有批次中成功更新的记录总数
- */
- private Result<Integer> executeBatchWithRetry(HikariDataSource dataSource,
- String sql,
- List<List<Object>> batchParams) {
- try(Connection connection = dataSource.getConnection()) {
- connection.setAutoCommit(false);
- try (PreparedStatement ps = connection.prepareStatement(sql)) {
- // 添加所有批次
- for (List<Object> params : batchParams) {
- setParameters(ps, params);
- ps.addBatch();
- }
- // 执行批处理
- int[] results = ps.executeBatch();
- connection.commit(); // 提交事务
- // 计算总成功条数(注意:非 Oracle 等数据库可能返回 -1 表示“成功但不返回数量”)
- int totalSuccess = Arrays.stream(results)
- .filter(count -> count > 0) // 过滤掉失败或未知的情况(如 Statement.SUCCESS_NO_INFO)
- .sum();
- return Result.success(totalSuccess);
- } catch (Exception e) {
- try {
- connection.rollback(); // 出现异常时回滚
- } catch (SQLException ex) {
- logger.warn("事务回滚失败", ex);
- }
- logger.warn("执行批量插入失败: {}; SQL: {}; params: {}", e.getMessage(), sql, batchParams);
- if (e instanceof BatchUpdateException) {
- int[] counts = ((BatchUpdateException) e).getUpdateCounts();
- return Result.fail("部分数据插入失败: " + Arrays.toString(counts));
- }
- return Result.fail("批量插入失败: " + e.getMessage());
- }
- }catch (Exception e){
- logger.error("获取数据库连接失败:{}", e.getMessage());
- return Result.fail("获取数据库连接失败: " + e.getMessage());
- }
- }
- //----------------------------------------------------------------------------------
- /**
- * 执行 DDL 语句(如 CREATE TABLE、ALTER TABLE 等)
- *
- * @param dbConfig 数据库配置信息,包含连接数据库所需的参数
- * @param ddlSql DDL 语句,用于操作数据库架构
- * @return 返回一个 Result 对象,其中包含受影响的行数
- */
- public Result<Integer> executeDDL(Map<String, String> dbConfig, String ddlSql) {
- // 调用 update 方法执行 DDL 语句,传入数据库配置信息、DDL 语句、空参数列表和执行标识
- return update(dbConfig, ddlSql, Collections.emptyList(),"exec");
- }
- /**
- * 设置参数到 SQL
- * 该方法将列表中的参数设置到预编译的 SQL 语句中
- * 使用 PreparedStatement 对象,通过遍历参数列表,将每个参数按位置设置到 SQL 语句中
- *
- * @param ps PreparedStatement 对象,用于执行 SQL 语句
- * @param params 包含 SQL 语句参数的列表
- * @throws SQLException 如果设置参数过程中发生错误,抛出此异常
- */
- private void setParameters(PreparedStatement ps, List<Object> params) throws SQLException {
- for (int i = 0; i < params.size(); i++) {
- ps.setObject(i + 1, params.get(i));
- }
- }
- /**
- * SQL 注入检测(所有 SQL 必须经过此检测)
- * 此方法用于检查给定的SQL语句是否包含潜在的SQL注入操作
- * 它通过解析SQL语句并检查其中是否包含危险操作来判断
- *
- * @param sql 待检测的SQL语句
- * @param type SQL语句的类型,用于区分不同数据库的特定语法
- * @return 如果SQL语句包含潜在的SQL注入操作,则返回true;否则返回false
- */
- public boolean isPotentialSqlInjection(String sql,String type) {
- if (sql == null || sql.isEmpty()) return false;
- if(type == null || type.isEmpty()) {
- logger.warn("未指定数据操作类型type");
- return true;
- }
- // 首先尝试从缓存中获取该SQL语句的安全性信息
- Boolean cached = safeSqlCache.getIfPresent(sql);
- // 如果缓存中存在且值为false,则说明该SQL语句已被标记为不安全,直接返回false
- if (Boolean.TRUE.equals(cached)) {
- return false;
- }
- try {
- //优先使用正则表达式检测非法SQL关键词
- Matcher matcher = SQL_INJECTION_PATTERN.matcher(sql);
- if(matcher.find()) {
- logger.info("SQL_INJECTION_PATTERN.matcher(sql); 692");
- return true;
- }
- // 使用CCJSqlParser解析SQL语句,以便进行进一步的检查
- Statement stmt = CCJSqlParserUtil.parse(sql);
- // 检查解析后的SQL语句是否包含危险操作
- boolean isInject = containsDangerousOperation(stmt,type);
- // 如果包含危险操作,则将该SQL语句添加到白名单缓存中,避免重复检测
- if (!isInject) {
- logger.info("containsDangerousOperation(stmt,type); 701");
- safeSqlCache.put(sql, true); // 白名单缓存
- }
- // 返回检测结果
- return isInject;
- } catch (JSQLParserException e) {
- // 如果SQL解析失败,记录警告日志并认为该SQL语句疑似注入
- logger.error("抛出异常:\n",e);
- logger.warn("SQL 解析失败:{};疑似注入: {}",e.getMessage(), sql);
- return true;
- }
- }
- /**
- * 自定义sql验证
- * @param sql
- * @param type
- * @return
- */
- public boolean customerHandelSqlInjection(String sql,String type) {
- if (sql == null || sql.isEmpty()) return false;
- if(type == null || type.isEmpty()) {
- logger.warn("未指定数据操作类型type");
- return true;
- }
- // 首先尝试从缓存中获取该SQL语句的安全性信息
- Boolean cached = safeSqlCache.getIfPresent(sql);
- // 如果缓存中存在且值为false,则说明该SQL语句已被标记为不安全,直接返回false
- if (Boolean.TRUE.equals(cached)) {
- return false;
- }
- try {
- //优先使用正则表达式检测非法SQL关键词
- // todo 暂且注释掉,自定义sql被拦住了
- /*Matcher matcher = SQL_INJECTION_PATTERN.matcher(sql);
- if(matcher.find()) {
- logger.info("SQL_INJECTION_PATTERN.matcher(sql); 692");
- return true;
- }*/
- // 使用CCJSqlParser解析SQL语句,以便进行进一步的检查
- Statement stmt = CCJSqlParserUtil.parse(sql);
- // 检查解析后的SQL语句是否包含危险操作
- boolean isInject = containsDangerousOperation(stmt,type);
- // 如果包含危险操作,则将该SQL语句添加到白名单缓存中,避免重复检测
- if (!isInject) {
- safeSqlCache.put(sql, true); // 白名单缓存
- }
- // 返回检测结果
- return isInject;
- } catch (JSQLParserException e) {
- // 如果SQL解析失败,记录警告日志并认为该SQL语句疑似注入
- logger.error("抛出异常:\n",e);
- logger.warn("SQL 解析失败:{};疑似注入: {}",e.getMessage(), sql);
- return true;
- }
- }
- /**
- * 判断 SQL 是否包含危险操作
- */
- private boolean containsDangerousOperation(Statement stmt,String type) {
- return switch (type.toLowerCase()) {
- case "query" -> !(stmt instanceof Select);
- case "update" -> !(stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete);
- case "exec" -> !(stmt instanceof CreateTable || stmt instanceof Alter);
- default ->
- // 更复杂的检测逻辑可在此处扩展
- false;
- };
- }
- }
|