JdbcExecutor.java 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. package com.bfkj.unidia.DataBaseUtils;
  2. import com.bfkj.unidia.Result;
  3. import com.github.benmanes.caffeine.cache.Cache;
  4. import com.github.benmanes.caffeine.cache.Caffeine;
  5. import com.zaxxer.hikari.HikariDataSource;
  6. import com.zaxxer.hikari.HikariPoolMXBean;
  7. import lombok.extern.slf4j.Slf4j;
  8. import net.sf.jsqlparser.JSQLParserException;
  9. import net.sf.jsqlparser.parser.CCJSqlParserUtil;
  10. import net.sf.jsqlparser.statement.Statement;
  11. import net.sf.jsqlparser.statement.alter.Alter;
  12. import net.sf.jsqlparser.statement.create.table.CreateTable;
  13. import net.sf.jsqlparser.statement.delete.Delete;
  14. import net.sf.jsqlparser.statement.insert.Insert;
  15. import net.sf.jsqlparser.statement.select.Select;
  16. import net.sf.jsqlparser.statement.update.Update;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.jdbc.core.JdbcTemplate;
  21. import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
  22. import org.springframework.jdbc.core.namedparam.SqlParameterSource;
  23. import org.springframework.stereotype.Component;
  24. import org.springframework.stereotype.Service;
  25. import java.sql.*;
  26. import java.util.*;
  27. import java.util.concurrent.*;
  28. import java.util.function.Function;
  29. import java.util.regex.Matcher;
  30. import java.util.regex.Pattern;
  31. import java.util.stream.Collectors;
  32. import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
  33. /**
  34. * 高性能 JDBC 执行器,支持:
  35. * - SQL 注入检测(基于 JSqlParser)
  36. * - 批量插入自动降级
  37. * - 分批次插入
  38. * - 自动事务控制
  39. */
  40. @Service
  41. @Slf4j
  42. public class JdbcExecutor {
  43. private static final Logger logger = LoggerFactory.getLogger(JdbcExecutor.class);
  44. private final ConnectionPoolManager poolManager;
  45. private final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(1000); // 默认每秒 1000 请求
  46. // 线程池
  47. private final ExecutorService executor = ForkJoinPool.commonPool();
  48. // 安全 SQL 缓存(避免重复检测)
  49. private final Cache<String, Boolean> safeSqlCache = buildCaffeineCache();
  50. @Autowired
  51. public JdbcExecutor(ConnectionPoolManager poolManager) {
  52. this.poolManager = poolManager;
  53. }
  54. private static final Pattern SQL_INJECTION_PATTERN = Pattern.compile(
  55. "(?i)('|\\b(?:exec|execute|unhex|declare|drop|truncate|table|database|if|exists|script|xp_cmdshell|sp_executesql)\\b)|" +
  56. "(\\b(?:and|or)\\b\\s+.*(?:=|like).*'.*)"
  57. );
  58. /**
  59. * 执行查询操作(统一入口,含 SQL 注入检测)
  60. *
  61. * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
  62. * @param sql 查询的SQL语句
  63. * @param params SQL语句中的参数列表
  64. * @param rowMapper 一个函数接口,用于将结果集中的每一行映射为一个对象
  65. * @param <T> 泛型参数,表示结果集中每一行数据映射后的类型
  66. * @return 返回一个Result对象,包含查询结果列表如果查询失败,返回失败的Result对象
  67. */
  68. public <T> Result<List<T>> query(Map<String, String> dbConfig,
  69. String sql,
  70. List<Object> params,
  71. Function<ResultSet, T> rowMapper) {
  72. //复用标准查询即可
  73. return query(dbConfig, sql, params, 0, 0, rowMapper);
  74. }
  75. /**
  76. * 执行查询操作并返回结果列表
  77. * 该方法负责根据提供的SQL查询语句、数据库配置、参数以及分页信息来执行数据库查询操作
  78. * 它使用了泛型来允许调用者指定返回类型,并利用函数式接口来处理结果集映射
  79. *
  80. * @param dbConfig 数据库配置信息,包括URL、用户名、密码等
  81. * @param sql 查询语句
  82. * @param params 查询语句的参数
  83. * @param page 当前页码
  84. * @param pageSize 每页记录数
  85. * @param rowMapper 函数式接口,用于将结果集一行映射为一个对象
  86. * @param <T> 泛型,表示返回的对象类型
  87. * @return 查询结果,包含一个对象列表
  88. */
  89. public <T> Result<List<T>> query(Map<String, String> dbConfig,
  90. String sql,
  91. List<Object> params,
  92. int page,
  93. int pageSize,
  94. Function<ResultSet, T> rowMapper) {
  95. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  96. return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
  97. try(Connection connection = dataSource.getConnection()) {
  98. try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
  99. // 设置参数
  100. if (params != null && !params.isEmpty()) {
  101. for (int i = 0; i < params.size(); i++) {
  102. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  103. }
  104. }
  105. logger.info("ps.toString():{}", ps.toString());
  106. // 执行查询
  107. ResultSet rs = ps.executeQuery();
  108. // 映射结果
  109. List<T> result = new ArrayList<>();
  110. while (rs.next()) {
  111. result.add(rowMapper.apply(rs));
  112. }
  113. return Result.success(result);
  114. }catch (SQLException e){
  115. return Result.fail("查询失败: " + e.getMessage());
  116. }
  117. } catch (Exception e) {
  118. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
  119. return Result.fail(e.getMessage());
  120. }
  121. });
  122. }
  123. public Result<List<Map<String, Object>>> queryForSql(Map<String, String> dbConfig,
  124. String sql,
  125. List<Object> params ) {
  126. return queryWithDatasource(dbConfig, sql, dataSource -> {
  127. try(Connection connection = dataSource.getConnection()) {
  128. try(PreparedStatement ps = connection.prepareStatement(sql)){
  129. // 设置参数
  130. if (params != null && !params.isEmpty()) {
  131. for (int i = 0; i < params.size(); i++) {
  132. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  133. }
  134. }
  135. logger.info("ps.toString():{}", ps.toString());
  136. logger.info("Executing paginated SQL: {}", sql);
  137. List<Map<String, Object>> resultList = new ArrayList<>();
  138. try (ResultSet rs = ps.executeQuery()) {
  139. ResultSetMetaData metaData = rs.getMetaData();
  140. int columnCount = metaData.getColumnCount();
  141. // 获取所有列名
  142. List<String> columnNames = new ArrayList<>();
  143. for (int i = 1; i <= columnCount; i++) {
  144. columnNames.add(metaData.getColumnLabel(i)); // 使用别名
  145. }
  146. // 遍历结果集
  147. while (rs.next()) {
  148. Map<String, Object> row = new LinkedHashMap<>();
  149. for (int i = 0; i < columnCount; i++) {
  150. String columnName = columnNames.get(i);
  151. Object value = rs.getObject(i + 1);
  152. row.put(columnName, value);
  153. }
  154. resultList.add(row);
  155. }
  156. }
  157. logger.info("Query returned {} rows", resultList.size());
  158. return Result.success(resultList);
  159. }catch (SQLException e){
  160. return Result.fail("查询失败: " + e.getMessage());
  161. }
  162. } catch (Exception e) {
  163. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), sql, params);
  164. return Result.fail(e.getMessage());
  165. }
  166. });
  167. }
  168. public Result<List<Map<String, Object>>> jdbcSearch(Map<String, String> dbConfig,
  169. String sql,
  170. List<Object> params,
  171. int page,
  172. int pageSize ) {
  173. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  174. return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
  175. try(Connection connection = dataSource.getConnection()) {
  176. try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
  177. // 设置参数
  178. if (params != null && !params.isEmpty()) {
  179. for (int i = 0; i < params.size(); i++) {
  180. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  181. }
  182. }
  183. logger.info("ps.toString():{}", ps.toString());
  184. logger.info("Executing paginated SQL: {}", paginatedSql);
  185. List<Map<String, Object>> resultList = new ArrayList<>();
  186. try (ResultSet rs = ps.executeQuery()) {
  187. ResultSetMetaData metaData = rs.getMetaData();
  188. int columnCount = metaData.getColumnCount();
  189. // 获取所有列名
  190. List<String> columnNames = new ArrayList<>();
  191. for (int i = 1; i <= columnCount; i++) {
  192. columnNames.add(metaData.getColumnLabel(i)); // 使用别名
  193. }
  194. // 遍历结果集
  195. while (rs.next()) {
  196. Map<String, Object> row = new LinkedHashMap<>();
  197. for (int i = 0; i < columnCount; i++) {
  198. String columnName = columnNames.get(i);
  199. Object value = rs.getObject(i + 1);
  200. row.put(columnName, value);
  201. }
  202. resultList.add(row);
  203. }
  204. }
  205. logger.info("Query returned {} rows", resultList.size());
  206. return Result.success(resultList);
  207. }catch (SQLException e){
  208. return Result.fail("查询失败: " + e.getMessage());
  209. }
  210. } catch (Exception e) {
  211. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
  212. return Result.fail(e.getMessage());
  213. }
  214. });
  215. }
  216. /**
  217. * 执行SQL查询并返回结果列表
  218. * 此方法重载了另一个queryForList方法,为查询数据库提供了便利
  219. * 它使用了默认的分页参数(0,0),表示不分页
  220. *
  221. * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
  222. * @param sql 要执行的SQL查询语句
  223. * @param params SQL查询语句中的参数列表
  224. * @return 包含查询结果的列表,每个结果是一个键值对映射,表示一行数据
  225. */
  226. public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
  227. String sql,
  228. List<Object> params) {
  229. //复用标准查询即可
  230. return queryForList(dbConfig, sql, params, 0, 0);
  231. }
  232. /**
  233. * 根据提供的数据库配置、SQL查询语句、参数列表以及分页信息来查询数据库并返回结果列表
  234. *
  235. * @param dbConfig 数据库配置信息,包含URL、用户名、密码等连接数据库所需的信息
  236. * @param sql 要执行的SQL查询语句
  237. * @param params SQL查询语句中的参数列表
  238. * @param page 当前查询的页码
  239. * @param pageSize 每页显示的记录数
  240. * @return 返回一个Result对象,其中包含查询结果列表如果查询失败,返回失败结果及错误信息
  241. */
  242. public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
  243. String sql,
  244. List<Object> params,
  245. int page,
  246. int pageSize) {
  247. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  248. return queryWithJdbcTemplate(dbConfig, paginatedSql, jdbcTemplate -> {
  249. try {
  250. List<Object> test = new ArrayList<>();
  251. test.add(1);
  252. List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(paginatedSql, test);
  253. return Result.success(queryForList);
  254. }catch (Exception e){
  255. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), paginatedSql, params);
  256. return Result.fail(e.getMessage());
  257. }
  258. });
  259. }
  260. /**
  261. * 根据SQL查询数据库并以Map形式返回结果
  262. *
  263. * @param dbConfig 数据库配置信息,包括连接地址、用户名、密码等
  264. * @param sql 执行的SQL查询语句
  265. * @param params SQL语句中的参数列表
  266. * @return 返回一个Result对象,包含查询结果的Map或错误信息
  267. */
  268. public Result<Map<String,Object>> queryForMap(Map<String, String> dbConfig,
  269. String sql,
  270. List<Object> params) {
  271. return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
  272. try {
  273. return Result.success(jdbcTemplate.queryForMap(sql, params));
  274. }catch (Exception e){
  275. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  276. return Result.fail(e.getMessage());
  277. }
  278. });
  279. }
  280. /**
  281. * 执行查询并返回单个对象结果
  282. * 该方法用于执行SQL查询,并期望得到一个结果对象如果查询成功,返回一个包含查询结果的Result对象;
  283. * 如果查询失败,则返回一个表示失败的Result对象,并包含错误信息
  284. *
  285. * @param dbConfig 数据库配置信息,包括连接数据库所需的URL、用户名和密码等
  286. * @param sql 要执行的SQL查询语句
  287. * @param params SQL语句的参数列表,用于预编译SQL语句以防止SQL注入
  288. * @return 返回一个Result对象,包含查询结果或错误信息
  289. */
  290. public Result<Object> queryForObject(Map<String, String> dbConfig,
  291. String sql,
  292. List<Object> params) {
  293. return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
  294. try {
  295. Object queriedForObject = jdbcTemplate.queryForObject(sql, Object.class, params.toArray());
  296. return Result.success(queriedForObject);
  297. }catch (Exception e){
  298. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  299. return Result.fail(e.getMessage());
  300. }
  301. });
  302. }
  303. /**
  304. * 异步执行查询(适用于大数据量导出等场景)
  305. * 该方法通过CompletableFuture实现异步执行查询操作,适用于需要处理大量数据的场景,例如数据导出
  306. * 使用异步执行可以提高程序的响应性,特别是在处理时间较长的查询时,不会阻塞主线程
  307. *
  308. * @param dbConfig 数据库配置参数,包括数据库URL、用户名、密码等信息
  309. * @param sql SQL查询语句,用于从数据库获取数据
  310. * @param params SQL语句参数,用于预编译SQL语句,提高安全性和性能
  311. * @param rowMapper 行映射函数,用于将查询结果集中的每一行转换为指定类型的对象
  312. * @param <T> 泛型参数,表示查询结果集中每条记录映射成的对象类型
  313. * @return 返回一个CompletableFuture对象,表示异步执行的结果
  314. * 通过这个对象可以进行后续的异步处理,如转换、消费等操作
  315. */
  316. public <T> CompletableFuture<Result<List<T>>> asyncQuery(
  317. Map<String, String> dbConfig,
  318. String sql,
  319. List<Object> params,
  320. Function<ResultSet, T> rowMapper) {
  321. return CompletableFuture.supplyAsync(() -> query(dbConfig, sql, params, rowMapper), executor);
  322. }
  323. /**
  324. * 执行数据库查询操作
  325. *
  326. * @param dbConfig 数据库配置信息,包含数据库连接所需的各种参数
  327. * @param sql 待执行的SQL查询语句
  328. * @param operation 函数式接口,定义了如何执行查询操作并返回结果
  329. * @param <T> 查询结果的类型
  330. * @return 返回一个Result对象,包含查询结果或错误信息
  331. */
  332. private <T> Result<T> queryWithJdbcTemplate(Map<String, String> dbConfig,String sql, Function<JdbcTemplate, Result<T>> operation) {
  333. // 检查SQL语句中是否包含潜在的SQL注入风险
  334. if (isPotentialSqlInjection(sql,"query")) {
  335. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  336. return Result.fail("SQL 含非法字符");
  337. }
  338. if (!rateLimiter.allowRequest()) {
  339. logger.warn("请求被限流");
  340. return Result.fail("请求被限流,请稍后重试");
  341. }
  342. //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
  343. Result<JdbcTemplate> jdbcTemplateResult = poolManager.getJdbcTemplate(dbConfig);
  344. if(!jdbcTemplateResult.isSuccess()){
  345. return Result.fail(jdbcTemplateResult.getError());
  346. }
  347. return operation.apply(jdbcTemplateResult.getData());
  348. }
  349. private <T> Result<T> queryWithDatasource(Map<String, String> dbConfig,String sql, Function<HikariDataSource, Result<T>> operation) {
  350. // 检查SQL语句中是否包含潜在的SQL注入风险
  351. if (isPotentialSqlInjection(sql,"query")) {
  352. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  353. return Result.fail("SQL 含非法字符");
  354. }
  355. if (!rateLimiter.allowRequest()) {
  356. logger.warn("请求被限流");
  357. return Result.fail("请求被限流,请稍后重试");
  358. }
  359. //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
  360. Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
  361. if(!dataSourceResult.isSuccess()){
  362. return Result.fail(dataSourceResult.getError());
  363. }
  364. return operation.apply(dataSourceResult.getData());
  365. }
  366. /**
  367. * 根据不同数据库应用分页逻辑
  368. */
  369. private String applyPagination(String sql, String dbUrl, int page, int pageSize) {
  370. if(page < 1 || pageSize < 1){
  371. return sql;
  372. }
  373. int offset = (page - 1) * pageSize;
  374. if (dbUrl.contains(":oracle:") || dbUrl.contains(":kingbase:") ||
  375. dbUrl.contains(":dm:")) {
  376. int start = offset + 1;
  377. int end = offset + pageSize;
  378. return "SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (" + sql + ") a WHERE ROWNUM <= " + end + ") WHERE rnum >= " + start;
  379. } else if (dbUrl.contains(":sqlserver:")) {
  380. return sql + " OFFSET " + offset + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
  381. } else {
  382. return sql + " LIMIT " + pageSize + " OFFSET " + offset;
  383. }
  384. }
  385. //----------------------------------------------------------------------------------
  386. public Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params) {
  387. return update(dbConfig, sql, params,"update");
  388. }
  389. /**
  390. * 执行更新操作(统一入口,含 SQL 注入检测)
  391. * 该方法旨在提供一个安全、通用的数据库更新操作接口,通过预编译 SQL 语句和参数设置来防止 SQL 注入攻击
  392. * 同时,它通过事务控制确保数据一致性,在操作失败时进行回滚
  393. *
  394. * @param dbConfig 数据库连接配置,包含连接数据库所需的信息,如 URL、用户名、密码等
  395. * @param sql 待执行的 SQL 更新语句,应包含占位符以安全地插入参数
  396. * @param params SQL 语句中的参数列表,用于替换 SQL 语句中的占位符
  397. * @return 返回一个 Result 对象,包含受影响的行数,表示更新操作的结果
  398. */
  399. private Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params,String type) {
  400. if(type == null || type.isEmpty()) type = "update";
  401. return validateParams(dbConfig,sql, params,type,dataSource -> {
  402. try (Connection connection = dataSource.getConnection()){
  403. connection.setAutoCommit(false);
  404. try (PreparedStatement ps = connection.prepareStatement(sql)) {
  405. setParameters(ps, params);// 设置 SQL 语句的参数,防止 SQL 注入
  406. int executedUpdate = ps.executeUpdate();// 执行更新操作
  407. connection.commit();// 提交事务
  408. return Result.success(executedUpdate);// 返回成功结果
  409. } catch (Exception e) {
  410. // 异常处理:回滚事务并记录错误日志
  411. try {
  412. connection.rollback();
  413. } catch (SQLException ex) {
  414. logger.warn("回滚事务失败", ex);
  415. }
  416. logger.error("更新失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  417. // 返回失败结果
  418. return Result.fail("更新失败: " + e.getMessage());
  419. }
  420. }catch (Exception e){
  421. logger.error("获取数据库连接失败:{}", e.getMessage());
  422. return Result.fail("获取数据库连接失败: " + e.getMessage());
  423. }
  424. });
  425. }
  426. /**
  427. * 批量执行更新操作,并返回总的更新成功条数
  428. *
  429. * @param dbConfig 数据库配置信息
  430. * @param sql SQL 插入/更新语句
  431. * @param batchParams 每一组参数代表一条记录
  432. * @return Result<Integer> 包含成功更新的总记录数
  433. */
  434. public Result<Integer> batchUpdate(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
  435. return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
  436. int totalSuccess = 0;
  437. // 根据负载情况返回相应的批处理大小
  438. int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
  439. //参数分批次
  440. List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
  441. //循环批次
  442. for (List<List<Object>> batch : batches) {
  443. Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
  444. if (!result.isSuccess()) {
  445. logger.warn("整体批量插入失败,尝试单条插入");
  446. // 单条插入并统计成功数量
  447. int successCount = batch.parallelStream()
  448. .map(params -> update(dbConfig, sql, params))
  449. .filter(Result::isSuccess)
  450. .mapToInt(Result::getData)
  451. .sum();
  452. totalSuccess += successCount;
  453. } else {
  454. totalSuccess += result.getData();
  455. }
  456. }
  457. return Result.success(totalSuccess);
  458. });
  459. }
  460. /**
  461. * 批量执行更新操作,并返回失败的记录
  462. *
  463. * @param dbConfig 数据库配置信息
  464. * @param sql SQL 插入/更新语句
  465. * @param batchParams 每一组参数代表一条记录
  466. * @return Result<Integer> 包含成功更新的总记录数
  467. */
  468. public Result<List<List<Object>>> batchUpdateWithFailures(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
  469. return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
  470. // 根据负载情况返回相应的批处理大小
  471. int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
  472. //参数分批次
  473. List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
  474. List<List<Object>> failedParams = null;
  475. //循环批次
  476. for (List<List<Object>> batch : batches) {
  477. Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
  478. if (!result.isSuccess()) {
  479. logger.warn("整体批量插入失败,尝试单条插入");
  480. // 单条插入并统计成功数量
  481. failedParams = batch.parallelStream()
  482. .filter(params -> !update(dbConfig, sql, params).isSuccess())
  483. .toList();
  484. }
  485. }
  486. return Result.success(failedParams);
  487. });
  488. }
  489. /**
  490. * 验证参数并执行数据库操作
  491. * 此方法主要用于验证传入的数据库配置和SQL语句是否符合规范,以防止SQL注入等安全问题
  492. * 同时,它还负责限流,以保护系统在高并发情况下能够稳定运行
  493. *
  494. * @param dbConfig 数据库配置信息,包括数据库地址、用户名、密码等
  495. * @param sql 待执行的SQL语句
  496. * @param operation 数据库操作,这是一个函数式接口,允许传递一个操作数据库的函数
  497. * @param <T> 泛型参数,表示操作结果的具体类型
  498. * @return 返回一个Result对象,包含操作结果或错误信息
  499. */
  500. private <T> Result<T> validateParams(Map<String, String> dbConfig,
  501. String sql,
  502. Object params,
  503. String type,
  504. Function<HikariDataSource, Result<T>> operation) {
  505. if (!rateLimiter.allowRequest()) {
  506. logger.warn("请求被限流");
  507. return Result.fail("请求被限流,请稍后重试");
  508. }
  509. if(type.equalsIgnoreCase("update") && (params == null || sql == null || sql.isEmpty())){
  510. return Result.fail("请检查执行参数");
  511. }
  512. if (isPotentialSqlInjection(sql,type)) {
  513. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  514. return Result.fail("SQL 含非法字符");
  515. }
  516. Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
  517. if (!dataSourceResult.isSuccess()) {
  518. return Result.fail(dataSourceResult.getError());
  519. }
  520. return operation.apply(dataSourceResult.getData());
  521. }
  522. /**
  523. * 根据数据库配置动态计算批处理大小
  524. * 该方法旨在根据当前数据库连接池的负载情况,动态调整批处理的大小,以优化数据库性能
  525. * 当连接池负载较高时,减小批处理大小以减少数据库压力;当负载较低时,增大批处理大小以提高处理效率
  526. *
  527. * @param dataSource 数据源,用于获取数据源和连接池信息
  528. * @return 根据当前数据库负载动态计算出的批处理大小
  529. */
  530. private int calculateDynamicBatchSize(HikariDataSource dataSource) {
  531. // 获取连接池管理代理对象
  532. HikariPoolMXBean poolProxy = dataSource.getHikariPoolMXBean();
  533. // 计算当前数据库负载
  534. double load = (double) poolProxy.getActiveConnections() / dataSource.getMaximumPoolSize(); // 获取数据库负载(如连接池利用率)
  535. // 根据负载情况返回相应的批处理大小
  536. return load > 0.8 ? Math.max(100, 450)
  537. : load < 0.3 ? Math.min(1000, 550)
  538. : 500;
  539. }
  540. /**
  541. * 分批次处理
  542. * 将一个列表按照指定大小分割成多个子列表
  543. * 这个方法有助于处理大量数据时,分批次进行处理,以提高系统性能和避免内存溢出
  544. *
  545. * @param list 待分割的列表
  546. * @param size 每个子列表的大小
  547. * @return 返回一个包含多个子列表的列表
  548. */
  549. public static <T> List<List<T>> splitBatch(List<T> list, int size) {
  550. // 检查批次大小是否有效,如果小于等于0,则返回空列表
  551. if (size <= 0) return Collections.emptyList();
  552. // 创建一个用于存储分割后子列表的列表
  553. List<List<T>> result = new ArrayList<>();
  554. // 遍历原始列表,每次增加size个元素
  555. for (int i = 0; i < list.size(); i += size) {
  556. // 根据当前索引和批次大小,获取子列表,并添加到结果列表中
  557. // 使用Math.min函数来确保不会超出原始列表的大小
  558. result.add(list.subList(i, Math.min(i + size, list.size())));
  559. }
  560. // 返回分割后的子列表列表
  561. return result;
  562. }
  563. /**
  564. * 执行批量插入或更新操作,并返回总的更新成功条数
  565. *
  566. * @param dataSource 数据源
  567. * @param sql SQL 插入/更新语句
  568. * @param batchParams 批处理参数列表,每个元素是一组参数
  569. * @return 返回一个 Result<Integer>,包含所有批次中成功更新的记录总数
  570. */
  571. private Result<Integer> executeBatchWithRetry(HikariDataSource dataSource,
  572. String sql,
  573. List<List<Object>> batchParams) {
  574. try(Connection connection = dataSource.getConnection()) {
  575. connection.setAutoCommit(false);
  576. try (PreparedStatement ps = connection.prepareStatement(sql)) {
  577. // 添加所有批次
  578. for (List<Object> params : batchParams) {
  579. setParameters(ps, params);
  580. ps.addBatch();
  581. }
  582. // 执行批处理
  583. int[] results = ps.executeBatch();
  584. connection.commit(); // 提交事务
  585. // 计算总成功条数(注意:非 Oracle 等数据库可能返回 -1 表示“成功但不返回数量”)
  586. int totalSuccess = Arrays.stream(results)
  587. .filter(count -> count > 0) // 过滤掉失败或未知的情况(如 Statement.SUCCESS_NO_INFO)
  588. .sum();
  589. return Result.success(totalSuccess);
  590. } catch (Exception e) {
  591. try {
  592. connection.rollback(); // 出现异常时回滚
  593. } catch (SQLException ex) {
  594. logger.warn("事务回滚失败", ex);
  595. }
  596. logger.warn("执行批量插入失败: {}; SQL: {}; params: {}", e.getMessage(), sql, batchParams);
  597. if (e instanceof BatchUpdateException) {
  598. int[] counts = ((BatchUpdateException) e).getUpdateCounts();
  599. return Result.fail("部分数据插入失败: " + Arrays.toString(counts));
  600. }
  601. return Result.fail("批量插入失败: " + e.getMessage());
  602. }
  603. }catch (Exception e){
  604. logger.error("获取数据库连接失败:{}", e.getMessage());
  605. return Result.fail("获取数据库连接失败: " + e.getMessage());
  606. }
  607. }
  608. //----------------------------------------------------------------------------------
  609. /**
  610. * 执行 DDL 语句(如 CREATE TABLE、ALTER TABLE 等)
  611. *
  612. * @param dbConfig 数据库配置信息,包含连接数据库所需的参数
  613. * @param ddlSql DDL 语句,用于操作数据库架构
  614. * @return 返回一个 Result 对象,其中包含受影响的行数
  615. */
  616. public Result<Integer> executeDDL(Map<String, String> dbConfig, String ddlSql) {
  617. // 调用 update 方法执行 DDL 语句,传入数据库配置信息、DDL 语句、空参数列表和执行标识
  618. return update(dbConfig, ddlSql, Collections.emptyList(),"exec");
  619. }
  620. /**
  621. * 设置参数到 SQL
  622. * 该方法将列表中的参数设置到预编译的 SQL 语句中
  623. * 使用 PreparedStatement 对象,通过遍历参数列表,将每个参数按位置设置到 SQL 语句中
  624. *
  625. * @param ps PreparedStatement 对象,用于执行 SQL 语句
  626. * @param params 包含 SQL 语句参数的列表
  627. * @throws SQLException 如果设置参数过程中发生错误,抛出此异常
  628. */
  629. private void setParameters(PreparedStatement ps, List<Object> params) throws SQLException {
  630. for (int i = 0; i < params.size(); i++) {
  631. ps.setObject(i + 1, params.get(i));
  632. }
  633. }
  634. /**
  635. * SQL 注入检测(所有 SQL 必须经过此检测)
  636. * 此方法用于检查给定的SQL语句是否包含潜在的SQL注入操作
  637. * 它通过解析SQL语句并检查其中是否包含危险操作来判断
  638. *
  639. * @param sql 待检测的SQL语句
  640. * @param type SQL语句的类型,用于区分不同数据库的特定语法
  641. * @return 如果SQL语句包含潜在的SQL注入操作,则返回true;否则返回false
  642. */
  643. public boolean isPotentialSqlInjection(String sql,String type) {
  644. if (sql == null || sql.isEmpty()) return false;
  645. if(type == null || type.isEmpty()) {
  646. logger.warn("未指定数据操作类型type");
  647. return true;
  648. }
  649. // 首先尝试从缓存中获取该SQL语句的安全性信息
  650. Boolean cached = safeSqlCache.getIfPresent(sql);
  651. // 如果缓存中存在且值为false,则说明该SQL语句已被标记为不安全,直接返回false
  652. if (Boolean.TRUE.equals(cached)) {
  653. return false;
  654. }
  655. try {
  656. //优先使用正则表达式检测非法SQL关键词
  657. Matcher matcher = SQL_INJECTION_PATTERN.matcher(sql);
  658. if(matcher.find()) return true;
  659. // 使用CCJSqlParser解析SQL语句,以便进行进一步的检查
  660. Statement stmt = CCJSqlParserUtil.parse(sql);
  661. // 检查解析后的SQL语句是否包含危险操作
  662. boolean isInject = containsDangerousOperation(stmt,type);
  663. // 如果包含危险操作,则将该SQL语句添加到白名单缓存中,避免重复检测
  664. if (!isInject) {
  665. safeSqlCache.put(sql, true); // 白名单缓存
  666. }
  667. // 返回检测结果
  668. return isInject;
  669. } catch (JSQLParserException e) {
  670. // 如果SQL解析失败,记录警告日志并认为该SQL语句疑似注入
  671. logger.warn("SQL 解析失败:{};疑似注入: {}",e.getMessage(), sql);
  672. return true;
  673. }
  674. }
  675. /**
  676. * 判断 SQL 是否包含危险操作
  677. */
  678. private boolean containsDangerousOperation(Statement stmt,String type) {
  679. return switch (type.toLowerCase()) {
  680. case "query" -> !(stmt instanceof Select);
  681. case "update" -> !(stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete);
  682. case "exec" -> !(stmt instanceof CreateTable || stmt instanceof Alter);
  683. default ->
  684. // 更复杂的检测逻辑可在此处扩展
  685. false;
  686. };
  687. }
  688. }