123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571 |
- package com.bfkj.unidia.DataBaseUtils;
- import com.bfkj.unidia.Result;
- import com.github.benmanes.caffeine.cache.Cache;
- import com.github.benmanes.caffeine.cache.Caffeine;
- import com.zaxxer.hikari.HikariDataSource;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import java.math.BigDecimal;
- import java.sql.*;
- import java.time.LocalDate;
- import java.time.LocalDateTime;
- import java.util.*;
- import java.util.concurrent.*;
- import java.util.concurrent.locks.ReentrantLock;
- import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
- /**
- * 表结构管理器,用于获取和缓存数据库表结构信息(多数据库兼容版)
- */
- @Service
- public class TableStructureManager {
- private static final Logger logger = LoggerFactory.getLogger(TableStructureManager.class);
- // 表结构缓存:数据源标识 -> 表名 -> 表结构信息
- private final Cache<String, ConcurrentMap<String, TableSchema>> schemaCache = buildCaffeineCache();
- // 数据库连接池管理器
- private final ConnectionPoolManager poolManager;
- // 数据库工具类
- private final DbUtils dbUtils;
- //数据命令执行工具(在此主要用于创建表和更新表结构)
- private final JdbcExecutor jdbcExecutor;
- @Autowired
- public TableStructureManager(ConnectionPoolManager poolManager,DbUtils dbUtils,
- JdbcExecutor jdbcExecutor){
- this.poolManager = poolManager;
- this.dbUtils = dbUtils;
- this.jdbcExecutor = jdbcExecutor;
- }
- // 表级锁管理器:保证同一时间只能有一个线程操作一张表
- private final ConcurrentMap<String, ReentrantLock> tableLocks = new ConcurrentHashMap<>();
- /**
- * 获取指定表的锁对象(如果不存在则创建)
- */
- public ReentrantLock getTableLock(String tableName) {
- return tableLocks.computeIfAbsent(tableName, k -> new ReentrantLock());
- }
- // 定义 ResultSet 列名常量
- private static final String COLUMN_NAME = "COLUMN_NAME";
- private static final String TYPE_NAME = "TYPE_NAME";
- private static final String DATA_TYPE = "DATA_TYPE";
- private static final String COLUMN_SIZE = "COLUMN_SIZE";
- private static final String NULLABLE = "NULLABLE";
- private static final String REMARKS = "REMARKS";
- private static final Set<String> ORACLE_TYPES = Set.of("oracle", "dm", "kingbase", "oceanbase");
- private static final Set<String> POSTGRESQL_TYPES = Set.of("postgresql", "gaussdb", "opengauss");
- private static final Set<String> SQLSERVER_TYPES = Set.of("sqlserver");
- private static final Set<String> SQLITE_TYPES = Set.of("sqlite");
- private static final Set<String> GOLDEN_DB_TYPES = Set.of("goldendb");
- /**
- * 获取表结构信息(线程安全、高性能)
- * @param dbConfig 数据库配置
- * @param tableName 表名
- * @return 表结构结果
- */
- public Result<TableSchema> getTableStructure(Map<String, String> dbConfig, String tableName) {
- Result<Map<String, String>> validation = dbUtils.validateConnection(dbConfig);
- if (!validation.isSuccess()) {
- return Result.fail(validation.getError());
- }
- Map<String, String> validationData = validation.getData();
- String dbKey = validationData.get("dbKey");
- String dbType = validationData.get("dbType");
- return getTableStructure(dbKey,dbType, dbConfig,tableName);
- }
- /**
- * 获取表结构信息(线程安全、高性能)
- *
- * @param dbConfig 数据库配置
- * @param tableName 表名
- * @return 表结构结果
- */
- public Result<TableSchema> getTableStructure(String dbKey,String dbType, Map<String, String> dbConfig, String tableName) {
- try {
- // 第一次尝试从缓存中获取
- ConcurrentMap<String, TableSchema> tableSchemas = schemaCache.getIfPresent(dbKey);
- if (tableSchemas != null) {
- TableSchema schema = tableSchemas.get(tableName);
- if (schema != null) {
- return Result.success(schema);
- }
- }
- // 缓存未命中,同步加载并更新缓存
- synchronized (this) {
- // 再次检查缓存(双重检查锁)
- tableSchemas = schemaCache.getIfPresent(dbKey);
- if (tableSchemas == null) {
- tableSchemas = new ConcurrentHashMap<>();
- schemaCache.put(dbKey, tableSchemas);
- }
- TableSchema schema = tableSchemas.get(tableName);
- if (schema != null) {
- return Result.success(schema);
- }
- // 获取数据源
- Result<HikariDataSource> dataSourceResult = poolManager.getDataSource(dbConfig);
- if (!dataSourceResult.isSuccess()) {
- return Result.fail(dataSourceResult.getError());
- }
- HikariDataSource dataSource = dataSourceResult.getData();
- // 真正加载
- Result<TableSchema> result = loadTableStructure(dataSource, tableName, dbType);
- if (result.isSuccess()) {
- tableSchemas.put(tableName, result.getData());
- }
- return result;
- }
- } catch (Exception e) {
- logger.error("获取表结构失败,dbKey: {}, tableName: {}", dbKey, tableName, e);
- return Result.fail("获取表结构失败: 系统错误");
- }
- }
- /**
- * 实际获取表结构的核心方法
- *
- * @param dataSource 数据库配置参数集合,包含连接池所需所有配置项
- * @param tableName 需要获取结构的数据库表名称
- * @return 包含表结构信息的Result对象,失败时返回错误信息
- */
- private Result<TableSchema> loadTableStructure(HikariDataSource dataSource, String tableName, String dbType) {
- try (Connection connection = dataSource.getConnection()) {
- DatabaseMetaData metaData = connection.getMetaData();
- String catalog = getCatalog(dbType, connection);
- String schema = getSchema(dbType, connection);
- try (ResultSet columns = metaData.getColumns(catalog, schema, tableName, "%")) {
- LinkedHashMap<String, ColumnInfo> columnMap = new LinkedHashMap<>();
- while (columns.next()) {
- String columnName = columns.getString(COLUMN_NAME);
- String columnType = columns.getString(TYPE_NAME);
- int dataType = columns.getInt(DATA_TYPE);
- int columnSize = columns.getInt(COLUMN_SIZE);
- boolean nullable = columns.getBoolean(NULLABLE);
- String remarks = columns.getString(REMARKS);
- columnMap.put(columnName, createColumnInfo(columnName, columnType, dataType, columnSize, nullable, remarks));
- }
- return Result.success(new TableSchema(tableName, columnMap));
- } catch (SQLException e) {
- // 记录详细日志(示例)
- logger.error("获取表结构失败: {}", e.getMessage(), e);
- return Result.fail("获取表结构失败: ", e);
- }
- } catch (Exception e) {
- // 记录非 SQL 异常日志(示例)
- logger.error("非预期异常: {}", e.getMessage(), e);
- return Result.fail("获取表结构失败: ", e);
- }
- }
- public List<String> getColumns(Map<String, String> dbConfig,String tableName){
- Result<TableSchema> tableSchemaResult = getTableStructure(dbConfig, tableName);
- if(!tableSchemaResult.isSuccess()){
- return null;
- }
- return tableSchemaResult.getData().getColumns().keySet().stream().toList();
- }
- // 提取为独立方法
- private ColumnInfo createColumnInfo(String columnName, String columnType, int dataType, int columnSize, boolean nullable, String remarks) {
- return new ColumnInfo(columnName, columnType, dataType, columnSize, nullable, remarks);
- }
- private static final Set<String> SUPPORTED_DB_TYPES = Set.of(
- "mysql", "polardb", "tidb", "goldendb", "tdsql",
- "sqlserver", "kingbase", "gaussdb", "opengauss", "gbase"
- );
- private String getCatalog(String dbType, Connection connection) {
- if (dbType == null) {
- return null;
- }
- try {
- String catalog = SUPPORTED_DB_TYPES.contains(dbType) ? connection.getCatalog() : null;
- if ("oceanbase".equalsIgnoreCase(dbType)) {
- try (Statement stmt = connection.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT tenant_name FROM oceanbase.__all_virtual_tenant LIMIT 1")) {
- if (rs.next()) {
- catalog = rs.getString(1);
- } else {
- logger.warn("OceanBase查询成功但未找到租户信息,使用默认租户sys");
- catalog = "sys";
- }
- } catch (SQLException e) {
- logger.warn("OceanBase查询异常,使用默认租户sys", e);
- catalog = "sys";
- }
- }
- return catalog;
- } catch (SQLException e) {
- return null;
- }
- }
- private static final Set<String> PUBLIC_SCHEMA_TYPES = Set.of("kingbase", "gaussdb", "opengauss", "gbase");
- private String getSchema(String dbType, Connection connection) {
- if (dbType == null || connection == null) {
- // 可选:抛出IllegalArgumentException以强制约束输入
- return null;
- }
- try {
- if (dbType.equals("sqlserver")) {
- return "dbo";
- } else if ("oracle".equals(dbType) || "dm".equals(dbType)) {
- return connection.getMetaData().getUserName().toUpperCase();
- } else if (PUBLIC_SCHEMA_TYPES.contains(dbType)) {
- return "public";
- } else if (dbType.equals("oceanbase")) {
- return connection.getCatalog();
- } else {
- // 可选:记录未知dbType以辅助调试
- return null;
- }
- } catch (SQLException e) {
- // 建议记录日志,避免静默失败
- logger.info("Failed to get schema: " + e.getMessage());
- return null;
- }
- }
- /**
- * 创建表(如果不存在)并自动添加缺失字段
- */
- public Result<Integer> createTableIfNotExists(Map<String, String> dbConfig,
- String tableName,
- Map<String, Object> executeParam) {
- Result<TableSchema> schemaResult = getTableStructure(dbConfig, tableName);
- if (schemaResult.isSuccess() && !schemaResult.getData().getColumns().isEmpty()) {
- // 表已存在,尝试添加缺失字段
- return addMissingColumns(dbConfig, tableName, executeParam);
- }
- // 表不存在,创建新表
- Result<Integer> createResult = createTable(dbConfig, tableName, executeParam);
- if (createResult.isSuccess()) {
- return createResult;
- }
- // 创建失败可能是并发创建导致,检查是否存在后重试
- Result<TableSchema> retrySchema = getTableStructure(dbConfig, tableName);
- if (retrySchema.isSuccess() && !retrySchema.getData().getColumns().isEmpty()) {
- return addMissingColumns(dbConfig, tableName, executeParam);
- }
- return createResult;
- }
- public Result<Integer> createTable(Map<String, String> dbConfig,
- String tableName,
- Map<String, Object> executeParam) {
- ReentrantLock lock = getTableLock(tableName);
- lock.lock();
- try {
- Result<Map<String, String>> validateResult = dbUtils.validateConnection(dbConfig);
- if (!validateResult.isSuccess()) {
- return Result.fail(validateResult.getError());
- }
- String dbType = validateResult.getData().get("dbType");
- // 构建字段列表
- List<String> columns = buildColumnDefinitions(executeParam, dbType,tableName);
- // 构建并执行建表语句
- String sqlTemplate = getCreateTableTemplate(dbType);
- String sql = String.format(sqlTemplate, tableName, String.join(", ", columns));
- Result<Integer> result = jdbcExecutor.executeDDL(dbConfig, sql);
- if (result.isSuccess()) {
- clearTableCache(dbConfig, tableName);
- }
- return result;
- } catch (Exception e) {
- logger.error("创建表失败", e);
- return Result.fail("创建表失败: " + e.getMessage());
- } finally {
- lock.unlock();
- }
- }
- /**
- * 添加缺失字段(适用于表已存在的情况)
- */
- public Result<Integer> addMissingColumns(Map<String, String> dbConfig,
- String tableName,
- Map<String, Object> executeParam) {
- Result<TableSchema> schemaResult = getTableStructure(dbConfig, tableName);
- if (!schemaResult.isSuccess()) {
- return Result.fail(schemaResult.getError());
- }
- // 筛选新字段
- Map<String, Object> newColumns = filterNewColumns(executeParam, schemaResult.getData());
- if (newColumns.isEmpty()) {
- return Result.fail("所有字段已存在");
- }
- // 添加字段
- return addColumns(dbConfig, tableName, newColumns);
- }
- /**
- * 构建字段定义列表
- */
- private List<String> buildColumnDefinitions(Map<String, Object> executeParam,
- String dbType,String tableName) {
- List<String> columns = new ArrayList<>();
- for (Map.Entry<String, Object> entry : executeParam.entrySet()) {
- String colName = entry.getKey();
- String colType = convertJavaTypeToSqlType(entry.getValue().getClass(), dbType);
- // 自动添加主键定义
- if (isPrimaryKey(colName, tableName)) {
- colType += " PRIMARY KEY ";
- }
- columns.add(colName + " " + colType);
- }
- return columns;
- }
- private String getCreateTableTemplate(String dbType) {
- return switch (dbType) {
- case "sqlserver" ->
- "IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'%s') AND type in (N'U')) CREATE TABLE %s (%s)";
- case "oracle" ->
- "BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (%s)'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;";
- case "dm" ->
- "DECLARE e_table_not_exist EXCEPTION; PRAGMA EXCEPTION_INIT(e_table_not_exist, -20071); BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (%s)'; EXCEPTION WHEN e_table_not_exist THEN NULL; END;";
- default -> "CREATE TABLE IF NOT EXISTS %s (%s)";
- };
- }
- private boolean isPrimaryKey(String colName, String tableName) {
- return colName.equalsIgnoreCase(tableName + "_code") ||
- colName.equalsIgnoreCase("_id") ||
- colName.equalsIgnoreCase("id");
- }
- /**
- * 添加多个字段(支持批量执行以提升性能)
- */
- public Result<Integer> addColumns(Map<String, String> dbConfig, String tableName, Map<String, Object> columns) {
- ReentrantLock lock = getTableLock(tableName);
- lock.lock();
- try {
- // 2. 获取当前表结构
- Result<TableStructureManager.TableSchema> schemaResult = getTableStructure(dbConfig, tableName);
- if (!schemaResult.isSuccess()) {
- return Result.fail(schemaResult.getError());
- }
- // 3. 筛选新字段
- Map<String, Object> newColumns = filterNewColumns(columns, schemaResult.getData());
- if (newColumns.isEmpty()) {
- return Result.fail("所有字段已存在");
- }
- String dbType = dbUtils.getDatabaseType(dbConfig.get("url"));
- // 4. 构建ALTER语句
- List<String> alterStatements = buildAlterStatements(dbType, tableName, newColumns);
- // 5. 根据数据库类型选择执行方式
- int totalAffectedRows;
- if (Set.of("mysql","mariadb","polardb","tidb","gaussdb","sqlite").contains(dbType)) {
- String combinedSql = combineAlterStatements(tableName, alterStatements, dbType);
- Result<Integer> result = jdbcExecutor.executeDDL(dbConfig, combinedSql);
- if (!result.isSuccess()) {
- return result;
- }
- totalAffectedRows = result.getData();
- } else {
- Result<Integer> singleResult = executeSingleAlters(dbConfig, alterStatements);
- if (singleResult.isSuccess()) {
- totalAffectedRows = singleResult.getData();
- } else {
- return Result.fail(singleResult.getError());
- }
- }
- // 6. 清除缓存以便下次重新加载表结构
- clearTableCache(dbConfig, tableName);
- return Result.success(totalAffectedRows);
- } catch (Exception e) {
- logger.error("添加字段失败", e);
- return Result.fail("添加字段失败: " + e.getMessage());
- }finally {
- lock.unlock();
- }
- }
- /**
- * 筛选出不存在的新字段
- */
- public Map<String, Object> filterNewColumns(
- Map<String, Object> columns,
- TableStructureManager.TableSchema schema) {
- Map<String, Object> newColumns = new HashMap<>();
- for (Map.Entry<String, Object> entry : columns.entrySet()) {
- String columnName = entry.getKey();
- if (!schema.getColumns().containsKey(columnName.toLowerCase())) {
- newColumns.put(columnName, entry.getValue());
- } else {
- logger.warn("字段 [{}] 已存在,跳过添加", columnName);
- }
- }
- return newColumns;
- }
- /**
- * 构建 ALTER TABLE ADD COLUMN 语句列表
- */
- private List<String> buildAlterStatements(String dbType, String tableName, Map<String, Object> columns) {
- List<String> statements = new ArrayList<>();
- for (Map.Entry<String, Object> entry : columns.entrySet()) {
- String quotedTableName = dbUtils.quoteIdentifier(tableName, dbType);
- String quotedColumnName = dbUtils.quoteIdentifier(entry.getKey(), dbType);
- String sql = "ALTER TABLE " + quotedTableName + " ADD COLUMN " + quotedColumnName + " " + entry.getValue();
- statements.add(sql);
- }
- return statements;
- }
- /**
- * 合并多条 ALTER TABLE 语句为一条 SQL
- */
- private String combineAlterStatements(String tableName, List<String> alterStatements, String dbType) {
- StringBuilder combined = new StringBuilder();
- String quotedTableName = dbUtils.quoteIdentifier(tableName, dbType);
- combined.append("ALTER TABLE ").append(quotedTableName).append(" ");
- // 提取每个语句中的 ADD COLUMN 子句
- List<String> addColumnClauses = new ArrayList<>();
- for (String stmt : alterStatements) {
- String clause = stmt.substring(stmt.indexOf("ADD COLUMN"));
- addColumnClauses.add(clause);
- }
- combined.append(String.join(", ", addColumnClauses));
- return combined.toString();
- }
- /**
- * 逐条执行 ADD COLUMN 操作
- */
- private Result<Integer> executeSingleAlters(Map<String, String> dbConfig, List<String> alterStatements) {
- int totalAffectedRows = 0;
- for (String sql : alterStatements) {
- Result<Integer> result = jdbcExecutor.executeDDL(dbConfig, sql);
- if (!result.isSuccess()) {
- logger.error("执行DDL失败: " + result.getError());
- //return Result.fail(result.getError()).getData(); // 返回错误结果
- continue;
- }
- totalAffectedRows += result.getData();
- }
- if(totalAffectedRows == 0){
- return Result.fail("所有字段已存在");
- }
- return Result.success(totalAffectedRows);
- }
- /**
- * 清除特定表的缓存
- */
- public void clearTableCache(Map<String, String> dbConfig, String tableName) {
- try {
- String cacheKey = dbUtils.generateCacheKey(dbConfig, null);
- // 第一次尝试从缓存中获取
- ConcurrentMap<String, TableSchema> tableSchemas = schemaCache.getIfPresent(cacheKey);
- if (tableSchemas != null) {
- tableSchemas.remove(tableName);
- }
- // 缓存未命中,同步加载并更新缓存
- synchronized (this) {
- // 再次检查缓存(双重检查锁)
- tableSchemas = schemaCache.getIfPresent(cacheKey);
- if (tableSchemas != null) {
- tableSchemas.remove(tableName);
- }
- }
- } catch (Exception e) {
- logger.error("清除表结构失败", e);
- }
- }
- /**
- * 将Java类型转换为数据库支持的字段类型
- *
- * @param javaType Java类型(如String.class, Integer.class等)
- * @param dbType 数据库连接URL(用于识别数据库类型)
- * @return 数据库字段类型字符串(如VARCHAR, INTEGER等)
- * @throws UnsupportedOperationException 不支持的Java类型或数据库类型
- */
- public String convertJavaTypeToSqlType(Class<?> javaType, String dbType) {
- if (javaType == null || dbType == null) {
- throw new IllegalArgumentException("参数不能为空");
- }
- if (javaType == String.class) {
- if (ORACLE_TYPES.contains(dbType)) return "VARCHAR2(255)";
- if (SQLITE_TYPES.contains(dbType)) return "TEXT";
- if (SQLSERVER_TYPES.contains(dbType)) return "NVARCHAR(MAX)";
- if (GOLDEN_DB_TYPES.contains(dbType)) return "VARCHAR(4096)";
- return "VARCHAR(255)";
- } else if (javaType == Integer.class || javaType == int.class) {
- if (ORACLE_TYPES.contains(dbType)) return "NUMBER(11)";
- if (POSTGRESQL_TYPES.contains(dbType) || SQLITE_TYPES.contains(dbType)) return "INTEGER";
- return "INT";
- } else if (javaType == Long.class || javaType == long.class) {
- if (ORACLE_TYPES.contains(dbType)) return "NUMBER(19)";
- return "BIGINT";
- } else if (javaType == Double.class || javaType == double.class) {
- if (ORACLE_TYPES.contains(dbType)) return "NUMBER(38,10)";
- if (SQLSERVER_TYPES.contains(dbType)) return "DECIMAL(38, 10)";
- if (SQLITE_TYPES.contains(dbType)) return "REAL";
- return "DECIMAL";
- } else if (javaType == LocalDate.class) {
- return "DATE";
- } else if (javaType == LocalDateTime.class) {
- if (Set.of("oracle", "dm", "kingbase", "oceanbase",
- "postgresql", "gaussdb", "opengauss",
- "polardb", "tidb").contains(dbType)) {
- return "TIMESTAMP";
- } else if (SQLSERVER_TYPES.contains(dbType)) {
- return "DATETIME2";
- }
- return "DATETIME";
- } else if (javaType == BigDecimal.class) {
- if (ORACLE_TYPES.contains(dbType)) return "NUMBER(38,10)";
- if (SQLSERVER_TYPES.contains(dbType)) return "DECIMAL(38, 10)";
- if (SQLITE_TYPES.contains(dbType)) return "REAL";
- return "DECIMAL";
- } else if (javaType == byte[].class) {
- if (ORACLE_TYPES.contains(dbType) || SQLITE_TYPES.contains(dbType)) {
- return "BLOB";
- } else if (POSTGRESQL_TYPES.contains(dbType)) {
- return "BYTEA";
- } else if (SQLSERVER_TYPES.contains(dbType)) {
- return "VARBINARY(MAX)";
- }
- return "LONGBLOB";
- }
- throw new UnsupportedOperationException("不支持的Java类型: " + javaType.getName());
- }
- /**
- * 表结构封装类
- */
- public record TableSchema(String name, Map<String, ColumnInfo> columns) {
- public Map<String, ColumnInfo> getColumns() {
- return Collections.unmodifiableMap(columns);
- }
- }
- /**
- * 列信息封装类
- */
- public record ColumnInfo(String name, String type,
- int dataType, int size,
- boolean nullable, String remarks) {
- }
- }
|