JdbcExecutor.java 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. package com.bfkj.unidia.DataBaseUtils;
  2. import com.bfkj.unidia.Result;
  3. import com.github.benmanes.caffeine.cache.Cache;
  4. import com.github.benmanes.caffeine.cache.Caffeine;
  5. import com.zaxxer.hikari.HikariDataSource;
  6. import com.zaxxer.hikari.HikariPoolMXBean;
  7. import lombok.extern.slf4j.Slf4j;
  8. import net.sf.jsqlparser.JSQLParserException;
  9. import net.sf.jsqlparser.parser.CCJSqlParserUtil;
  10. import net.sf.jsqlparser.statement.Statement;
  11. import net.sf.jsqlparser.statement.alter.Alter;
  12. import net.sf.jsqlparser.statement.create.table.CreateTable;
  13. import net.sf.jsqlparser.statement.delete.Delete;
  14. import net.sf.jsqlparser.statement.insert.Insert;
  15. import net.sf.jsqlparser.statement.select.Select;
  16. import net.sf.jsqlparser.statement.update.Update;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.jdbc.core.JdbcTemplate;
  21. import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
  22. import org.springframework.jdbc.core.namedparam.SqlParameterSource;
  23. import org.springframework.stereotype.Component;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.util.StringUtils;
  26. import java.sql.*;
  27. import java.util.*;
  28. import java.util.concurrent.*;
  29. import java.util.function.Function;
  30. import java.util.regex.Matcher;
  31. import java.util.regex.Pattern;
  32. import java.util.stream.Collectors;
  33. import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
  34. /**
  35. * 高性能 JDBC 执行器,支持:
  36. * - SQL 注入检测(基于 JSqlParser)
  37. * - 批量插入自动降级
  38. * - 分批次插入
  39. * - 自动事务控制
  40. */
  41. @Service
  42. @Slf4j
  43. public class JdbcExecutor {
  44. private static final Logger logger = LoggerFactory.getLogger(JdbcExecutor.class);
  45. private final ConnectionPoolManager poolManager;
  46. private final SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(1000); // 默认每秒 1000 请求
  47. // 线程池
  48. private final ExecutorService executor = ForkJoinPool.commonPool();
  49. // 安全 SQL 缓存(避免重复检测)
  50. private final Cache<String, Boolean> safeSqlCache = buildCaffeineCache();
  51. @Autowired
  52. public JdbcExecutor(ConnectionPoolManager poolManager) {
  53. this.poolManager = poolManager;
  54. }
  55. private static final Pattern SQL_INJECTION_PATTERN = Pattern.compile(
  56. "(?i)('|\\b(?:exec|execute|unhex|declare|drop|truncate|table|database|if|exists|script|xp_cmdshell|sp_executesql)\\b)|" +
  57. "(\\b(?:and|or)\\b\\s+.*(?:=|like).*'.*)"
  58. );
  59. /**
  60. * 执行查询操作(统一入口,含 SQL 注入检测)
  61. *
  62. * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
  63. * @param sql 查询的SQL语句
  64. * @param params SQL语句中的参数列表
  65. * @param rowMapper 一个函数接口,用于将结果集中的每一行映射为一个对象
  66. * @param <T> 泛型参数,表示结果集中每一行数据映射后的类型
  67. * @return 返回一个Result对象,包含查询结果列表如果查询失败,返回失败的Result对象
  68. */
  69. public <T> Result<List<T>> query(Map<String, String> dbConfig,
  70. String sql,
  71. List<Object> params,
  72. Function<ResultSet, T> rowMapper) {
  73. //复用标准查询即可
  74. return query(dbConfig, sql, params, 0, 0, rowMapper);
  75. }
  76. /**
  77. * 执行查询操作并返回结果列表
  78. * 该方法负责根据提供的SQL查询语句、数据库配置、参数以及分页信息来执行数据库查询操作
  79. * 它使用了泛型来允许调用者指定返回类型,并利用函数式接口来处理结果集映射
  80. *
  81. * @param dbConfig 数据库配置信息,包括URL、用户名、密码等
  82. * @param sql 查询语句
  83. * @param params 查询语句的参数
  84. * @param page 当前页码
  85. * @param pageSize 每页记录数
  86. * @param rowMapper 函数式接口,用于将结果集一行映射为一个对象
  87. * @param <T> 泛型,表示返回的对象类型
  88. * @return 查询结果,包含一个对象列表
  89. */
  90. public <T> Result<List<T>> query(Map<String, String> dbConfig,
  91. String sql,
  92. List<Object> params,
  93. int page,
  94. int pageSize,
  95. Function<ResultSet, T> rowMapper) {
  96. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  97. return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
  98. try(Connection connection = dataSource.getConnection()) {
  99. try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
  100. // 设置参数
  101. if (params != null && !params.isEmpty()) {
  102. for (int i = 0; i < params.size(); i++) {
  103. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  104. }
  105. }
  106. logger.info("ps.toString():{}", ps.toString());
  107. // 执行查询
  108. ResultSet rs = ps.executeQuery();
  109. // 映射结果
  110. List<T> result = new ArrayList<>();
  111. while (rs.next()) {
  112. result.add(rowMapper.apply(rs));
  113. }
  114. return Result.success(result);
  115. }catch (SQLException e){
  116. return Result.fail("查询失败: " + e.getMessage());
  117. }
  118. } catch (Exception e) {
  119. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
  120. return Result.fail(e.getMessage());
  121. }
  122. });
  123. }
  124. public Result<List<Map<String, Object>>> queryForSql(Map<String, String> dbConfig,
  125. String sql,
  126. List<Object> params ) {
  127. if (!StringUtils.hasText(sql)){
  128. return Result.fail("sql 不存在或为空");
  129. }
  130. return customQueryWithDatasource(dbConfig, sql, dataSource -> {
  131. try(Connection connection = dataSource.getConnection()) {
  132. try(PreparedStatement ps = connection.prepareStatement(sql)){
  133. // 设置参数
  134. if (params != null && !params.isEmpty()) {
  135. for (int i = 0; i < params.size(); i++) {
  136. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  137. }
  138. }
  139. logger.info("执行 SQL: {}", sql);
  140. List<Map<String, Object>> resultList = new ArrayList<>();
  141. try (ResultSet rs = ps.executeQuery()) {
  142. ResultSetMetaData metaData = rs.getMetaData();
  143. int columnCount = metaData.getColumnCount();
  144. // 获取所有列名
  145. List<String> columnNames = new ArrayList<>();
  146. for (int i = 1; i <= columnCount; i++) {
  147. columnNames.add(metaData.getColumnLabel(i)); // 使用别名
  148. }
  149. // 遍历结果集
  150. while (rs.next()) {
  151. Map<String, Object> row = new LinkedHashMap<>();
  152. for (int i = 0; i < columnCount; i++) {
  153. String columnName = columnNames.get(i);
  154. Object value = rs.getObject(i + 1);
  155. row.put(columnName, value);
  156. }
  157. resultList.add(row);
  158. }
  159. }
  160. logger.info("Query returned {} rows", resultList.size());
  161. return Result.success(resultList);
  162. }catch (SQLException e){
  163. return Result.fail("查询失败: " + e.getMessage());
  164. }
  165. } catch (Exception e) {
  166. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), sql, params);
  167. return Result.fail(e.getMessage());
  168. }
  169. });
  170. }
  171. public Result<List<Map<String, Object>>> jdbcSearch(Map<String, String> dbConfig,
  172. String sql,
  173. List<Object> params,
  174. int page,
  175. int pageSize ) {
  176. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  177. return queryWithDatasource(dbConfig, paginatedSql, dataSource -> {
  178. try(Connection connection = dataSource.getConnection()) {
  179. try(PreparedStatement ps = connection.prepareStatement(paginatedSql)){
  180. // 设置参数
  181. if (params != null && !params.isEmpty()) {
  182. for (int i = 0; i < params.size(); i++) {
  183. ps.setObject(i + 1, params.get(i)); // 参数从 1 开始
  184. }
  185. }
  186. logger.info("ps.toString():{}", ps.toString());
  187. logger.info("Executing paginated SQL: {}", paginatedSql);
  188. List<Map<String, Object>> resultList = new ArrayList<>();
  189. try (ResultSet rs = ps.executeQuery()) {
  190. ResultSetMetaData metaData = rs.getMetaData();
  191. int columnCount = metaData.getColumnCount();
  192. // 获取所有列名
  193. List<String> columnNames = new ArrayList<>();
  194. for (int i = 1; i <= columnCount; i++) {
  195. columnNames.add(metaData.getColumnLabel(i)); // 使用别名
  196. }
  197. // 遍历结果集
  198. while (rs.next()) {
  199. Map<String, Object> row = new LinkedHashMap<>();
  200. for (int i = 0; i < columnCount; i++) {
  201. String columnName = columnNames.get(i);
  202. Object value = rs.getObject(i + 1);
  203. row.put(columnName, value);
  204. }
  205. resultList.add(row);
  206. }
  207. }
  208. logger.info("Query returned {} rows", resultList.size());
  209. return Result.success(resultList);
  210. }catch (SQLException e){
  211. return Result.fail("查询失败: " + e.getMessage());
  212. }
  213. } catch (Exception e) {
  214. logger.error("查询失败:{}; SQL: {}, params: {}", e.getMessage(), paginatedSql, params);
  215. return Result.fail(e.getMessage());
  216. }
  217. });
  218. }
  219. /**
  220. * 执行SQL查询并返回结果列表
  221. * 此方法重载了另一个queryForList方法,为查询数据库提供了便利
  222. * 它使用了默认的分页参数(0,0),表示不分页
  223. *
  224. * @param dbConfig 数据库配置信息,包含连接数据库所需的信息
  225. * @param sql 要执行的SQL查询语句
  226. * @param params SQL查询语句中的参数列表
  227. * @return 包含查询结果的列表,每个结果是一个键值对映射,表示一行数据
  228. */
  229. public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
  230. String sql,
  231. List<Object> params) {
  232. //复用标准查询即可
  233. return queryForList(dbConfig, sql, params, 0, 0);
  234. }
  235. /**
  236. * 根据提供的数据库配置、SQL查询语句、参数列表以及分页信息来查询数据库并返回结果列表
  237. *
  238. * @param dbConfig 数据库配置信息,包含URL、用户名、密码等连接数据库所需的信息
  239. * @param sql 要执行的SQL查询语句
  240. * @param params SQL查询语句中的参数列表
  241. * @param page 当前查询的页码
  242. * @param pageSize 每页显示的记录数
  243. * @return 返回一个Result对象,其中包含查询结果列表如果查询失败,返回失败结果及错误信息
  244. */
  245. public Result<List<Map<String,Object>>> queryForList(Map<String, String> dbConfig,
  246. String sql,
  247. List<Object> params,
  248. int page,
  249. int pageSize) {
  250. String paginatedSql = applyPagination(sql, dbConfig.get("url"), page, pageSize);
  251. return queryWithJdbcTemplate(dbConfig, paginatedSql, jdbcTemplate -> {
  252. try {
  253. List<Object> test = new ArrayList<>();
  254. test.add(1);
  255. List<Map<String, Object>> queryForList = jdbcTemplate.queryForList(paginatedSql, test);
  256. return Result.success(queryForList);
  257. }catch (Exception e){
  258. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), paginatedSql, params);
  259. return Result.fail(e.getMessage());
  260. }
  261. });
  262. }
  263. /**
  264. * 根据SQL查询数据库并以Map形式返回结果
  265. *
  266. * @param dbConfig 数据库配置信息,包括连接地址、用户名、密码等
  267. * @param sql 执行的SQL查询语句
  268. * @param params SQL语句中的参数列表
  269. * @return 返回一个Result对象,包含查询结果的Map或错误信息
  270. */
  271. public Result<Map<String,Object>> queryForMap(Map<String, String> dbConfig,
  272. String sql,
  273. List<Object> params) {
  274. return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
  275. try {
  276. return Result.success(jdbcTemplate.queryForMap(sql, params));
  277. }catch (Exception e){
  278. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  279. return Result.fail(e.getMessage());
  280. }
  281. });
  282. }
  283. /**
  284. * 执行查询并返回单个对象结果
  285. * 该方法用于执行SQL查询,并期望得到一个结果对象如果查询成功,返回一个包含查询结果的Result对象;
  286. * 如果查询失败,则返回一个表示失败的Result对象,并包含错误信息
  287. *
  288. * @param dbConfig 数据库配置信息,包括连接数据库所需的URL、用户名和密码等
  289. * @param sql 要执行的SQL查询语句
  290. * @param params SQL语句的参数列表,用于预编译SQL语句以防止SQL注入
  291. * @return 返回一个Result对象,包含查询结果或错误信息
  292. */
  293. public Result<Object> queryForObject(Map<String, String> dbConfig,
  294. String sql,
  295. List<Object> params) {
  296. return queryWithJdbcTemplate(dbConfig, sql, jdbcTemplate -> {
  297. try {
  298. Object queriedForObject = jdbcTemplate.queryForObject(sql, Object.class, params.toArray());
  299. return Result.success(queriedForObject);
  300. }catch (Exception e){
  301. logger.error("查询失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  302. return Result.fail(e.getMessage());
  303. }
  304. });
  305. }
  306. /**
  307. * 异步执行查询(适用于大数据量导出等场景)
  308. * 该方法通过CompletableFuture实现异步执行查询操作,适用于需要处理大量数据的场景,例如数据导出
  309. * 使用异步执行可以提高程序的响应性,特别是在处理时间较长的查询时,不会阻塞主线程
  310. *
  311. * @param dbConfig 数据库配置参数,包括数据库URL、用户名、密码等信息
  312. * @param sql SQL查询语句,用于从数据库获取数据
  313. * @param params SQL语句参数,用于预编译SQL语句,提高安全性和性能
  314. * @param rowMapper 行映射函数,用于将查询结果集中的每一行转换为指定类型的对象
  315. * @param <T> 泛型参数,表示查询结果集中每条记录映射成的对象类型
  316. * @return 返回一个CompletableFuture对象,表示异步执行的结果
  317. * 通过这个对象可以进行后续的异步处理,如转换、消费等操作
  318. */
  319. public <T> CompletableFuture<Result<List<T>>> asyncQuery(
  320. Map<String, String> dbConfig,
  321. String sql,
  322. List<Object> params,
  323. Function<ResultSet, T> rowMapper) {
  324. return CompletableFuture.supplyAsync(() -> query(dbConfig, sql, params, rowMapper), executor);
  325. }
  326. /**
  327. * 执行数据库查询操作
  328. *
  329. * @param dbConfig 数据库配置信息,包含数据库连接所需的各种参数
  330. * @param sql 待执行的SQL查询语句
  331. * @param operation 函数式接口,定义了如何执行查询操作并返回结果
  332. * @param <T> 查询结果的类型
  333. * @return 返回一个Result对象,包含查询结果或错误信息
  334. */
  335. private <T> Result<T> queryWithJdbcTemplate(Map<String, String> dbConfig,String sql, Function<JdbcTemplate, Result<T>> operation) {
  336. // 检查SQL语句中是否包含潜在的SQL注入风险
  337. if (isPotentialSqlInjection(sql,"query")) {
  338. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  339. return Result.fail("SQL 含非法字符");
  340. }
  341. if (!rateLimiter.allowRequest()) {
  342. logger.warn("请求被限流");
  343. return Result.fail("请求被限流,请稍后重试");
  344. }
  345. //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
  346. Result<JdbcTemplate> jdbcTemplateResult = poolManager.getJdbcTemplate(dbConfig);
  347. if(!jdbcTemplateResult.isSuccess()){
  348. return Result.fail(jdbcTemplateResult.getError());
  349. }
  350. return operation.apply(jdbcTemplateResult.getData());
  351. }
  352. private <T> Result<T> queryWithDatasource(Map<String, String> dbConfig,String sql, Function<HikariDataSource, Result<T>> operation) {
  353. // 检查SQL语句中是否包含潜在的SQL注入风险
  354. if (isPotentialSqlInjection(sql,"query")) {
  355. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  356. return Result.fail("SQL 含非法字符");
  357. }
  358. if (!rateLimiter.allowRequest()) {
  359. logger.warn("请求被限流");
  360. return Result.fail("请求被限流,请稍后重试");
  361. }
  362. //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
  363. Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
  364. if(!dataSourceResult.isSuccess()){
  365. return Result.fail(dataSourceResult.getError());
  366. }
  367. return operation.apply(dataSourceResult.getData());
  368. }
  369. private <T> Result<T> customQueryWithDatasource(Map<String, String> dbConfig,String sql, Function<HikariDataSource, Result<T>> operation) {
  370. // 检查SQL语句中是否包含潜在的SQL注入风险
  371. if (customerHandelSqlInjection(sql,"query")) {
  372. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  373. return Result.fail("SQL 含非法字符");
  374. }
  375. if (!rateLimiter.allowRequest()) {
  376. logger.warn("请求被限流");
  377. return Result.fail("请求被限流,请稍后重试");
  378. }
  379. //查询直接使用JdbcTemplate即可,性能与原生没有区别,代码要简化很多
  380. Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
  381. if(!dataSourceResult.isSuccess()){
  382. return Result.fail(dataSourceResult.getError());
  383. }
  384. return operation.apply(dataSourceResult.getData());
  385. }
  386. /**
  387. * 根据不同数据库应用分页逻辑
  388. */
  389. private String applyPagination(String sql, String dbUrl, int page, int pageSize) {
  390. if(page < 1 || pageSize < 1){
  391. return sql;
  392. }
  393. int offset = (page - 1) * pageSize;
  394. if (dbUrl.contains(":oracle:") || dbUrl.contains(":kingbase:") ||
  395. dbUrl.contains(":dm:")) {
  396. int start = offset + 1;
  397. int end = offset + pageSize;
  398. return "SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (" + sql + ") a WHERE ROWNUM <= " + end + ") WHERE rnum >= " + start;
  399. } else if (dbUrl.contains(":sqlserver:")) {
  400. return sql + " OFFSET " + offset + " ROWS FETCH NEXT " + pageSize + " ROWS ONLY";
  401. } else {
  402. return sql + " LIMIT " + pageSize + " OFFSET " + offset;
  403. }
  404. }
  405. //----------------------------------------------------------------------------------
  406. public Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params) {
  407. return update(dbConfig, sql, params,"update");
  408. }
  409. /**
  410. * 执行更新操作(统一入口,含 SQL 注入检测)
  411. * 该方法旨在提供一个安全、通用的数据库更新操作接口,通过预编译 SQL 语句和参数设置来防止 SQL 注入攻击
  412. * 同时,它通过事务控制确保数据一致性,在操作失败时进行回滚
  413. *
  414. * @param dbConfig 数据库连接配置,包含连接数据库所需的信息,如 URL、用户名、密码等
  415. * @param sql 待执行的 SQL 更新语句,应包含占位符以安全地插入参数
  416. * @param params SQL 语句中的参数列表,用于替换 SQL 语句中的占位符
  417. * @return 返回一个 Result 对象,包含受影响的行数,表示更新操作的结果
  418. */
  419. private Result<Integer> update(Map<String, String> dbConfig, String sql, List<Object> params,String type) {
  420. if(type == null || type.isEmpty()) type = "update";
  421. return validateParams(dbConfig,sql, params,type,dataSource -> {
  422. try (Connection connection = dataSource.getConnection()){
  423. connection.setAutoCommit(false);
  424. try (PreparedStatement ps = connection.prepareStatement(sql)) {
  425. logger.info("sql:{}",sql);
  426. setParameters(ps, params);// 设置 SQL 语句的参数,防止 SQL 注入
  427. int executedUpdate = ps.executeUpdate();// 执行更新操作
  428. connection.commit();// 提交事务
  429. return Result.success(executedUpdate);// 返回成功结果
  430. } catch (Exception e) {
  431. // 异常处理:回滚事务并记录错误日志
  432. try {
  433. connection.rollback();
  434. } catch (SQLException ex) {
  435. logger.warn("回滚事务失败", ex);
  436. }
  437. logger.error("更新失败:{};SQL:{},params:{}", e.getMessage(), sql, params);
  438. // 返回失败结果
  439. return Result.fail("更新失败: " + e.getMessage());
  440. }
  441. }catch (Exception e){
  442. logger.error("获取数据库连接失败:{}", e.getMessage());
  443. return Result.fail("获取数据库连接失败: " + e.getMessage());
  444. }
  445. });
  446. }
  447. /**
  448. * 批量执行更新操作,并返回总的更新成功条数
  449. *
  450. * @param dbConfig 数据库配置信息
  451. * @param sql SQL 插入/更新语句
  452. * @param batchParams 每一组参数代表一条记录
  453. * @return Result<Integer> 包含成功更新的总记录数
  454. */
  455. public Result<Integer> batchUpdate(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
  456. return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
  457. int totalSuccess = 0;
  458. // 根据负载情况返回相应的批处理大小
  459. int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
  460. //参数分批次
  461. List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
  462. //循环批次
  463. for (List<List<Object>> batch : batches) {
  464. Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
  465. if (!result.isSuccess()) {
  466. logger.warn("整体批量插入失败,尝试单条插入");
  467. // 单条插入并统计成功数量
  468. int successCount = batch.parallelStream()
  469. .map(params -> update(dbConfig, sql, params))
  470. .filter(Result::isSuccess)
  471. .mapToInt(Result::getData)
  472. .sum();
  473. totalSuccess += successCount;
  474. } else {
  475. totalSuccess += result.getData();
  476. }
  477. }
  478. return Result.success(totalSuccess);
  479. });
  480. }
  481. /**
  482. * 批量执行更新操作,并返回失败的记录
  483. *
  484. * @param dbConfig 数据库配置信息
  485. * @param sql SQL 插入/更新语句
  486. * @param batchParams 每一组参数代表一条记录
  487. * @return Result<Integer> 包含成功更新的总记录数
  488. */
  489. public Result<List<List<Object>>> batchUpdateWithFailures(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
  490. return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
  491. // 根据负载情况返回相应的批处理大小
  492. int dynamicBatchSize = calculateDynamicBatchSize(dataSource);
  493. //参数分批次
  494. List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
  495. List<List<Object>> failedParams = null;
  496. //循环批次
  497. for (List<List<Object>> batch : batches) {
  498. Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
  499. if (!result.isSuccess()) {
  500. logger.warn("整体批量插入失败,尝试单条插入");
  501. // 单条插入并统计成功数量
  502. failedParams = batch.parallelStream()
  503. .filter(params -> !update(dbConfig, sql, params).isSuccess())
  504. .toList();
  505. }
  506. }
  507. return Result.success(failedParams);
  508. });
  509. }
  510. /**
  511. * 验证参数并执行数据库操作
  512. * 此方法主要用于验证传入的数据库配置和SQL语句是否符合规范,以防止SQL注入等安全问题
  513. * 同时,它还负责限流,以保护系统在高并发情况下能够稳定运行
  514. *
  515. * @param dbConfig 数据库配置信息,包括数据库地址、用户名、密码等
  516. * @param sql 待执行的SQL语句
  517. * @param operation 数据库操作,这是一个函数式接口,允许传递一个操作数据库的函数
  518. * @param <T> 泛型参数,表示操作结果的具体类型
  519. * @return 返回一个Result对象,包含操作结果或错误信息
  520. */
  521. private <T> Result<T> validateParams(Map<String, String> dbConfig,
  522. String sql,
  523. Object params,
  524. String type,
  525. Function<HikariDataSource, Result<T>> operation) {
  526. if (!rateLimiter.allowRequest()) {
  527. logger.warn("请求被限流");
  528. return Result.fail("请求被限流,请稍后重试");
  529. }
  530. if(type.equalsIgnoreCase("update") && (params == null || sql == null || sql.isEmpty())){
  531. return Result.fail("请检查执行参数");
  532. }
  533. if (isPotentialSqlInjection(sql,type)) {
  534. logger.warn("检测到潜在 SQL 注入风险: {}", sql);
  535. return Result.fail("SQL 含非法字符");
  536. }
  537. Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
  538. if (!dataSourceResult.isSuccess()) {
  539. return Result.fail(dataSourceResult.getError());
  540. }
  541. return operation.apply(dataSourceResult.getData());
  542. }
  543. /**
  544. * 根据数据库配置动态计算批处理大小
  545. * 该方法旨在根据当前数据库连接池的负载情况,动态调整批处理的大小,以优化数据库性能
  546. * 当连接池负载较高时,减小批处理大小以减少数据库压力;当负载较低时,增大批处理大小以提高处理效率
  547. *
  548. * @param dataSource 数据源,用于获取数据源和连接池信息
  549. * @return 根据当前数据库负载动态计算出的批处理大小
  550. */
  551. private int calculateDynamicBatchSize(HikariDataSource dataSource) {
  552. // 获取连接池管理代理对象
  553. HikariPoolMXBean poolProxy = dataSource.getHikariPoolMXBean();
  554. // 计算当前数据库负载
  555. double load = (double) poolProxy.getActiveConnections() / dataSource.getMaximumPoolSize(); // 获取数据库负载(如连接池利用率)
  556. // 根据负载情况返回相应的批处理大小
  557. return load > 0.8 ? Math.max(100, 450)
  558. : load < 0.3 ? Math.min(1000, 550)
  559. : 500;
  560. }
  561. /**
  562. * 分批次处理
  563. * 将一个列表按照指定大小分割成多个子列表
  564. * 这个方法有助于处理大量数据时,分批次进行处理,以提高系统性能和避免内存溢出
  565. *
  566. * @param list 待分割的列表
  567. * @param size 每个子列表的大小
  568. * @return 返回一个包含多个子列表的列表
  569. */
  570. public static <T> List<List<T>> splitBatch(List<T> list, int size) {
  571. // 检查批次大小是否有效,如果小于等于0,则返回空列表
  572. if (size <= 0) return Collections.emptyList();
  573. // 创建一个用于存储分割后子列表的列表
  574. List<List<T>> result = new ArrayList<>();
  575. // 遍历原始列表,每次增加size个元素
  576. for (int i = 0; i < list.size(); i += size) {
  577. // 根据当前索引和批次大小,获取子列表,并添加到结果列表中
  578. // 使用Math.min函数来确保不会超出原始列表的大小
  579. result.add(list.subList(i, Math.min(i + size, list.size())));
  580. }
  581. // 返回分割后的子列表列表
  582. return result;
  583. }
  584. /**
  585. * 执行批量插入或更新操作,并返回总的更新成功条数
  586. *
  587. * @param dataSource 数据源
  588. * @param sql SQL 插入/更新语句
  589. * @param batchParams 批处理参数列表,每个元素是一组参数
  590. * @return 返回一个 Result<Integer>,包含所有批次中成功更新的记录总数
  591. */
  592. private Result<Integer> executeBatchWithRetry(HikariDataSource dataSource,
  593. String sql,
  594. List<List<Object>> batchParams) {
  595. try(Connection connection = dataSource.getConnection()) {
  596. connection.setAutoCommit(false);
  597. try (PreparedStatement ps = connection.prepareStatement(sql)) {
  598. // 添加所有批次
  599. for (List<Object> params : batchParams) {
  600. setParameters(ps, params);
  601. ps.addBatch();
  602. }
  603. // 执行批处理
  604. int[] results = ps.executeBatch();
  605. connection.commit(); // 提交事务
  606. // 计算总成功条数(注意:非 Oracle 等数据库可能返回 -1 表示“成功但不返回数量”)
  607. int totalSuccess = Arrays.stream(results)
  608. .filter(count -> count > 0) // 过滤掉失败或未知的情况(如 Statement.SUCCESS_NO_INFO)
  609. .sum();
  610. return Result.success(totalSuccess);
  611. } catch (Exception e) {
  612. try {
  613. connection.rollback(); // 出现异常时回滚
  614. } catch (SQLException ex) {
  615. logger.warn("事务回滚失败", ex);
  616. }
  617. logger.warn("执行批量插入失败: {}; SQL: {}; params: {}", e.getMessage(), sql, batchParams);
  618. if (e instanceof BatchUpdateException) {
  619. int[] counts = ((BatchUpdateException) e).getUpdateCounts();
  620. return Result.fail("部分数据插入失败: " + Arrays.toString(counts));
  621. }
  622. return Result.fail("批量插入失败: " + e.getMessage());
  623. }
  624. }catch (Exception e){
  625. logger.error("获取数据库连接失败:{}", e.getMessage());
  626. return Result.fail("获取数据库连接失败: " + e.getMessage());
  627. }
  628. }
  629. //----------------------------------------------------------------------------------
  630. /**
  631. * 执行 DDL 语句(如 CREATE TABLE、ALTER TABLE 等)
  632. *
  633. * @param dbConfig 数据库配置信息,包含连接数据库所需的参数
  634. * @param ddlSql DDL 语句,用于操作数据库架构
  635. * @return 返回一个 Result 对象,其中包含受影响的行数
  636. */
  637. public Result<Integer> executeDDL(Map<String, String> dbConfig, String ddlSql) {
  638. // 调用 update 方法执行 DDL 语句,传入数据库配置信息、DDL 语句、空参数列表和执行标识
  639. return update(dbConfig, ddlSql, Collections.emptyList(),"exec");
  640. }
  641. /**
  642. * 设置参数到 SQL
  643. * 该方法将列表中的参数设置到预编译的 SQL 语句中
  644. * 使用 PreparedStatement 对象,通过遍历参数列表,将每个参数按位置设置到 SQL 语句中
  645. *
  646. * @param ps PreparedStatement 对象,用于执行 SQL 语句
  647. * @param params 包含 SQL 语句参数的列表
  648. * @throws SQLException 如果设置参数过程中发生错误,抛出此异常
  649. */
  650. private void setParameters(PreparedStatement ps, List<Object> params) throws SQLException {
  651. for (int i = 0; i < params.size(); i++) {
  652. ps.setObject(i + 1, params.get(i));
  653. }
  654. }
  655. /**
  656. * SQL 注入检测(所有 SQL 必须经过此检测)
  657. * 此方法用于检查给定的SQL语句是否包含潜在的SQL注入操作
  658. * 它通过解析SQL语句并检查其中是否包含危险操作来判断
  659. *
  660. * @param sql 待检测的SQL语句
  661. * @param type SQL语句的类型,用于区分不同数据库的特定语法
  662. * @return 如果SQL语句包含潜在的SQL注入操作,则返回true;否则返回false
  663. */
  664. public boolean isPotentialSqlInjection(String sql,String type) {
  665. if (sql == null || sql.isEmpty()) return false;
  666. if(type == null || type.isEmpty()) {
  667. logger.warn("未指定数据操作类型type");
  668. return true;
  669. }
  670. // 首先尝试从缓存中获取该SQL语句的安全性信息
  671. Boolean cached = safeSqlCache.getIfPresent(sql);
  672. // 如果缓存中存在且值为false,则说明该SQL语句已被标记为不安全,直接返回false
  673. if (Boolean.TRUE.equals(cached)) {
  674. return false;
  675. }
  676. try {
  677. //优先使用正则表达式检测非法SQL关键词
  678. Matcher matcher = SQL_INJECTION_PATTERN.matcher(sql);
  679. if(matcher.find()) {
  680. logger.info("SQL_INJECTION_PATTERN.matcher(sql); 692");
  681. return true;
  682. }
  683. // 使用CCJSqlParser解析SQL语句,以便进行进一步的检查
  684. Statement stmt = CCJSqlParserUtil.parse(sql);
  685. // 检查解析后的SQL语句是否包含危险操作
  686. boolean isInject = containsDangerousOperation(stmt,type);
  687. // 如果包含危险操作,则将该SQL语句添加到白名单缓存中,避免重复检测
  688. if (!isInject) {
  689. logger.info("containsDangerousOperation(stmt,type); 701");
  690. safeSqlCache.put(sql, true); // 白名单缓存
  691. }
  692. // 返回检测结果
  693. return isInject;
  694. } catch (JSQLParserException e) {
  695. // 如果SQL解析失败,记录警告日志并认为该SQL语句疑似注入
  696. logger.error("抛出异常:\n",e);
  697. logger.warn("SQL 解析失败:{};疑似注入: {}",e.getMessage(), sql);
  698. return true;
  699. }
  700. }
  701. /**
  702. * 自定义sql验证
  703. * @param sql
  704. * @param type
  705. * @return
  706. */
  707. public boolean customerHandelSqlInjection(String sql,String type) {
  708. if (sql == null || sql.isEmpty()) return false;
  709. if(type == null || type.isEmpty()) {
  710. logger.warn("未指定数据操作类型type");
  711. return true;
  712. }
  713. // 首先尝试从缓存中获取该SQL语句的安全性信息
  714. Boolean cached = safeSqlCache.getIfPresent(sql);
  715. // 如果缓存中存在且值为false,则说明该SQL语句已被标记为不安全,直接返回false
  716. if (Boolean.TRUE.equals(cached)) {
  717. return false;
  718. }
  719. try {
  720. //优先使用正则表达式检测非法SQL关键词
  721. // todo 暂且注释掉,自定义sql被拦住了
  722. /*Matcher matcher = SQL_INJECTION_PATTERN.matcher(sql);
  723. if(matcher.find()) {
  724. logger.info("SQL_INJECTION_PATTERN.matcher(sql); 692");
  725. return true;
  726. }*/
  727. // 使用CCJSqlParser解析SQL语句,以便进行进一步的检查
  728. Statement stmt = CCJSqlParserUtil.parse(sql);
  729. // 检查解析后的SQL语句是否包含危险操作
  730. boolean isInject = containsDangerousOperation(stmt,type);
  731. // 如果包含危险操作,则将该SQL语句添加到白名单缓存中,避免重复检测
  732. if (!isInject) {
  733. safeSqlCache.put(sql, true); // 白名单缓存
  734. }
  735. // 返回检测结果
  736. return isInject;
  737. } catch (JSQLParserException e) {
  738. // 如果SQL解析失败,记录警告日志并认为该SQL语句疑似注入
  739. logger.error("抛出异常:\n",e);
  740. logger.warn("SQL 解析失败:{};疑似注入: {}",e.getMessage(), sql);
  741. return true;
  742. }
  743. }
  744. /**
  745. * 判断 SQL 是否包含危险操作
  746. */
  747. private boolean containsDangerousOperation(Statement stmt,String type) {
  748. return switch (type.toLowerCase()) {
  749. case "query" -> !(stmt instanceof Select);
  750. case "update" -> !(stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete);
  751. case "exec" -> !(stmt instanceof CreateTable || stmt instanceof Alter);
  752. default ->
  753. // 更复杂的检测逻辑可在此处扩展
  754. false;
  755. };
  756. }
  757. }