package com.bfkj.unidia.DataBaseUtils; import com.bfkj.unidia.Result; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.sql.*; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache; /** * 表结构管理器,用于获取和缓存数据库表结构信息(多数据库兼容版) */ @Service public class TableStructureManager { private static final Logger logger = LoggerFactory.getLogger(TableStructureManager.class); // 表结构缓存:数据源标识 -> 表名 -> 表结构信息 private final Cache> schemaCache = buildCaffeineCache(); // 数据库连接池管理器 private final ConnectionPoolManager poolManager; // 数据库工具类 private final DbUtils dbUtils; //数据命令执行工具(在此主要用于创建表和更新表结构) private final JdbcExecutor jdbcExecutor; @Autowired public TableStructureManager(ConnectionPoolManager poolManager,DbUtils dbUtils, JdbcExecutor jdbcExecutor){ this.poolManager = poolManager; this.dbUtils = dbUtils; this.jdbcExecutor = jdbcExecutor; } // 表级锁管理器:保证同一时间只能有一个线程操作一张表 private final ConcurrentMap tableLocks = new ConcurrentHashMap<>(); /** * 获取指定表的锁对象(如果不存在则创建) */ public ReentrantLock getTableLock(String tableName) { return tableLocks.computeIfAbsent(tableName, k -> new ReentrantLock()); } // 定义 ResultSet 列名常量 private static final String COLUMN_NAME = "COLUMN_NAME"; private static final String TYPE_NAME = "TYPE_NAME"; private static final String DATA_TYPE = "DATA_TYPE"; private static final String COLUMN_SIZE = "COLUMN_SIZE"; private static final String NULLABLE = "NULLABLE"; private static final String REMARKS = "REMARKS"; private static final Set ORACLE_TYPES = Set.of("oracle", "dm", "kingbase", "oceanbase"); private static final Set POSTGRESQL_TYPES = Set.of("postgresql", "gaussdb", "opengauss"); private static final Set SQLSERVER_TYPES = Set.of("sqlserver"); private static final Set SQLITE_TYPES = Set.of("sqlite"); private static final Set GOLDEN_DB_TYPES = Set.of("goldendb"); /** * 获取表结构信息(线程安全、高性能) * @param dbConfig 数据库配置 * @param tableName 表名 * @return 表结构结果 */ public Result getTableStructure(Map dbConfig, String tableName) { Result> validation = dbUtils.validateConnection(dbConfig); if (!validation.isSuccess()) { return Result.fail(validation.getError()); } Map validationData = validation.getData(); String dbKey = validationData.get("dbKey"); String dbType = validationData.get("dbType"); return getTableStructure(dbKey,dbType, dbConfig,tableName); } /** * 获取表结构信息(线程安全、高性能) * * @param dbConfig 数据库配置 * @param tableName 表名 * @return 表结构结果 */ public Result getTableStructure(String dbKey,String dbType, Map dbConfig, String tableName) { try { // 第一次尝试从缓存中获取 ConcurrentMap tableSchemas = schemaCache.getIfPresent(dbKey); if (tableSchemas != null) { TableSchema schema = tableSchemas.get(tableName); if (schema != null) { return Result.success(schema); } } // 缓存未命中,同步加载并更新缓存 synchronized (this) { // 再次检查缓存(双重检查锁) tableSchemas = schemaCache.getIfPresent(dbKey); if (tableSchemas == null) { tableSchemas = new ConcurrentHashMap<>(); schemaCache.put(dbKey, tableSchemas); } TableSchema schema = tableSchemas.get(tableName); if (schema != null) { return Result.success(schema); } // 获取数据源 Result dataSourceResult = poolManager.getDataSource(dbConfig); if (!dataSourceResult.isSuccess()) { return Result.fail(dataSourceResult.getError()); } HikariDataSource dataSource = dataSourceResult.getData(); // 真正加载 Result result = loadTableStructure(dataSource, tableName, dbType); if (result.isSuccess()) { tableSchemas.put(tableName, result.getData()); } return result; } } catch (Exception e) { logger.error("获取表结构失败,dbKey: {}, tableName: {}", dbKey, tableName, e); return Result.fail("获取表结构失败: 系统错误"); } } /** * 实际获取表结构的核心方法 * * @param dataSource 数据库配置参数集合,包含连接池所需所有配置项 * @param tableName 需要获取结构的数据库表名称 * @return 包含表结构信息的Result对象,失败时返回错误信息 */ private Result loadTableStructure(HikariDataSource dataSource, String tableName, String dbType) { try (Connection connection = dataSource.getConnection()) { DatabaseMetaData metaData = connection.getMetaData(); String catalog = getCatalog(dbType, connection); String schema = getSchema(dbType, connection); try (ResultSet columns = metaData.getColumns(catalog, schema, tableName, "%")) { LinkedHashMap columnMap = new LinkedHashMap<>(); while (columns.next()) { String columnName = columns.getString(COLUMN_NAME); String columnType = columns.getString(TYPE_NAME); int dataType = columns.getInt(DATA_TYPE); int columnSize = columns.getInt(COLUMN_SIZE); boolean nullable = columns.getBoolean(NULLABLE); String remarks = columns.getString(REMARKS); columnMap.put(columnName, createColumnInfo(columnName, columnType, dataType, columnSize, nullable, remarks)); } return Result.success(new TableSchema(tableName, columnMap)); } catch (SQLException e) { // 记录详细日志(示例) logger.error("获取表结构失败: {}", e.getMessage(), e); return Result.fail("获取表结构失败: ", e); } } catch (Exception e) { // 记录非 SQL 异常日志(示例) logger.error("非预期异常: {}", e.getMessage(), e); return Result.fail("获取表结构失败: ", e); } } public List getColumns(Map dbConfig,String tableName){ Result tableSchemaResult = getTableStructure(dbConfig, tableName); if(!tableSchemaResult.isSuccess()){ return null; } return tableSchemaResult.getData().getColumns().keySet().stream().toList(); } // 提取为独立方法 private ColumnInfo createColumnInfo(String columnName, String columnType, int dataType, int columnSize, boolean nullable, String remarks) { return new ColumnInfo(columnName, columnType, dataType, columnSize, nullable, remarks); } private static final Set SUPPORTED_DB_TYPES = Set.of( "mysql", "polardb", "tidb", "goldendb", "tdsql", "sqlserver", "kingbase", "gaussdb", "opengauss", "gbase" ); private String getCatalog(String dbType, Connection connection) { if (dbType == null) { return null; } try { String catalog = SUPPORTED_DB_TYPES.contains(dbType) ? connection.getCatalog() : null; if ("oceanbase".equalsIgnoreCase(dbType)) { try (Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery("SELECT tenant_name FROM oceanbase.__all_virtual_tenant LIMIT 1")) { if (rs.next()) { catalog = rs.getString(1); } else { logger.warn("OceanBase查询成功但未找到租户信息,使用默认租户sys"); catalog = "sys"; } } catch (SQLException e) { logger.warn("OceanBase查询异常,使用默认租户sys", e); catalog = "sys"; } } return catalog; } catch (SQLException e) { return null; } } private static final Set PUBLIC_SCHEMA_TYPES = Set.of("kingbase", "gaussdb", "opengauss", "gbase"); private String getSchema(String dbType, Connection connection) { if (dbType == null || connection == null) { // 可选:抛出IllegalArgumentException以强制约束输入 return null; } try { if (dbType.equals("sqlserver")) { return "dbo"; } else if ("oracle".equals(dbType) || "dm".equals(dbType)) { return connection.getMetaData().getUserName().toUpperCase(); } else if (PUBLIC_SCHEMA_TYPES.contains(dbType)) { return "public"; } else if (dbType.equals("oceanbase")) { return connection.getCatalog(); } else { // 可选:记录未知dbType以辅助调试 return null; } } catch (SQLException e) { // 建议记录日志,避免静默失败 logger.info("Failed to get schema: " + e.getMessage()); return null; } } /** * 创建表(如果不存在)并自动添加缺失字段 */ public Result createTableIfNotExists(Map dbConfig, String tableName, Map executeParam) { Result schemaResult = getTableStructure(dbConfig, tableName); if (schemaResult.isSuccess() && !schemaResult.getData().getColumns().isEmpty()) { // 表已存在,尝试添加缺失字段 return addMissingColumns(dbConfig, tableName, executeParam); } // 表不存在,创建新表 Result createResult = createTable(dbConfig, tableName, executeParam); if (createResult.isSuccess()) { return createResult; } // 创建失败可能是并发创建导致,检查是否存在后重试 Result retrySchema = getTableStructure(dbConfig, tableName); if (retrySchema.isSuccess() && !retrySchema.getData().getColumns().isEmpty()) { return addMissingColumns(dbConfig, tableName, executeParam); } return createResult; } public Result createTable(Map dbConfig, String tableName, Map executeParam) { ReentrantLock lock = getTableLock(tableName); lock.lock(); try { Result> validateResult = dbUtils.validateConnection(dbConfig); if (!validateResult.isSuccess()) { return Result.fail(validateResult.getError()); } String dbType = validateResult.getData().get("dbType"); // 构建字段列表 List columns = buildColumnDefinitions(executeParam, dbType,tableName); // 构建并执行建表语句 String sqlTemplate = getCreateTableTemplate(dbType); String sql = String.format(sqlTemplate, tableName, String.join(", ", columns)); Result result = jdbcExecutor.executeDDL(dbConfig, sql); if (result.isSuccess()) { clearTableCache(dbConfig, tableName); } return result; } catch (Exception e) { logger.error("创建表失败", e); return Result.fail("创建表失败: " + e.getMessage()); } finally { lock.unlock(); } } /** * 添加缺失字段(适用于表已存在的情况) */ public Result addMissingColumns(Map dbConfig, String tableName, Map executeParam) { Result schemaResult = getTableStructure(dbConfig, tableName); if (!schemaResult.isSuccess()) { return Result.fail(schemaResult.getError()); } // 筛选新字段 Map newColumns = filterNewColumns(executeParam, schemaResult.getData()); if (newColumns.isEmpty()) { return Result.fail("所有字段已存在"); } // 添加字段 return addColumns(dbConfig, tableName, newColumns); } /** * 构建字段定义列表 */ private List buildColumnDefinitions(Map executeParam, String dbType,String tableName) { List columns = new ArrayList<>(); for (Map.Entry entry : executeParam.entrySet()) { String colName = entry.getKey(); String colType = convertJavaTypeToSqlType(entry.getValue().getClass(), dbType); // 自动添加主键定义 if (isPrimaryKey(colName, tableName)) { colType += " PRIMARY KEY "; } columns.add(colName + " " + colType); } return columns; } private String getCreateTableTemplate(String dbType) { return switch (dbType) { case "sqlserver" -> "IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'%s') AND type in (N'U')) CREATE TABLE %s (%s)"; case "oracle" -> "BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (%s)'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;"; case "dm" -> "DECLARE e_table_not_exist EXCEPTION; PRAGMA EXCEPTION_INIT(e_table_not_exist, -20071); BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (%s)'; EXCEPTION WHEN e_table_not_exist THEN NULL; END;"; default -> "CREATE TABLE IF NOT EXISTS %s (%s)"; }; } private boolean isPrimaryKey(String colName, String tableName) { return colName.equalsIgnoreCase(tableName + "_code") || colName.equalsIgnoreCase("_id") || colName.equalsIgnoreCase("id"); } /** * 添加多个字段(支持批量执行以提升性能) */ public Result addColumns(Map dbConfig, String tableName, Map columns) { ReentrantLock lock = getTableLock(tableName); lock.lock(); try { // 2. 获取当前表结构 Result schemaResult = getTableStructure(dbConfig, tableName); if (!schemaResult.isSuccess()) { return Result.fail(schemaResult.getError()); } // 3. 筛选新字段 Map newColumns = filterNewColumns(columns, schemaResult.getData()); if (newColumns.isEmpty()) { return Result.fail("所有字段已存在"); } String dbType = dbUtils.getDatabaseType(dbConfig.get("url")); // 4. 构建ALTER语句 List alterStatements = buildAlterStatements(dbType, tableName, newColumns); // 5. 根据数据库类型选择执行方式 int totalAffectedRows; if (Set.of("mysql","mariadb","polardb","tidb","gaussdb","sqlite").contains(dbType)) { String combinedSql = combineAlterStatements(tableName, alterStatements, dbType); Result result = jdbcExecutor.executeDDL(dbConfig, combinedSql); if (!result.isSuccess()) { return result; } totalAffectedRows = result.getData(); } else { Result singleResult = executeSingleAlters(dbConfig, alterStatements); if (singleResult.isSuccess()) { totalAffectedRows = singleResult.getData(); } else { return Result.fail(singleResult.getError()); } } // 6. 清除缓存以便下次重新加载表结构 clearTableCache(dbConfig, tableName); return Result.success(totalAffectedRows); } catch (Exception e) { logger.error("添加字段失败", e); return Result.fail("添加字段失败: " + e.getMessage()); }finally { lock.unlock(); } } /** * 筛选出不存在的新字段 */ public Map filterNewColumns( Map columns, TableStructureManager.TableSchema schema) { Map newColumns = new HashMap<>(); for (Map.Entry entry : columns.entrySet()) { String columnName = entry.getKey(); if (!schema.getColumns().containsKey(columnName.toLowerCase())) { newColumns.put(columnName, entry.getValue()); } else { logger.warn("字段 [{}] 已存在,跳过添加", columnName); } } return newColumns; } /** * 构建 ALTER TABLE ADD COLUMN 语句列表 */ private List buildAlterStatements(String dbType, String tableName, Map columns) { List statements = new ArrayList<>(); for (Map.Entry entry : columns.entrySet()) { String quotedTableName = dbUtils.quoteIdentifier(tableName, dbType); String quotedColumnName = dbUtils.quoteIdentifier(entry.getKey(), dbType); String sql = "ALTER TABLE " + quotedTableName + " ADD COLUMN " + quotedColumnName + " " + entry.getValue(); statements.add(sql); } return statements; } /** * 合并多条 ALTER TABLE 语句为一条 SQL */ private String combineAlterStatements(String tableName, List alterStatements, String dbType) { StringBuilder combined = new StringBuilder(); String quotedTableName = dbUtils.quoteIdentifier(tableName, dbType); combined.append("ALTER TABLE ").append(quotedTableName).append(" "); // 提取每个语句中的 ADD COLUMN 子句 List addColumnClauses = new ArrayList<>(); for (String stmt : alterStatements) { String clause = stmt.substring(stmt.indexOf("ADD COLUMN")); addColumnClauses.add(clause); } combined.append(String.join(", ", addColumnClauses)); return combined.toString(); } /** * 逐条执行 ADD COLUMN 操作 */ private Result executeSingleAlters(Map dbConfig, List alterStatements) { int totalAffectedRows = 0; for (String sql : alterStatements) { Result result = jdbcExecutor.executeDDL(dbConfig, sql); if (!result.isSuccess()) { logger.error("执行DDL失败: " + result.getError()); //return Result.fail(result.getError()).getData(); // 返回错误结果 continue; } totalAffectedRows += result.getData(); } if(totalAffectedRows == 0){ return Result.fail("所有字段已存在"); } return Result.success(totalAffectedRows); } /** * 清除特定表的缓存 */ public void clearTableCache(Map dbConfig, String tableName) { try { String cacheKey = dbUtils.generateCacheKey(dbConfig, null); // 第一次尝试从缓存中获取 ConcurrentMap tableSchemas = schemaCache.getIfPresent(cacheKey); if (tableSchemas != null) { tableSchemas.remove(tableName); } // 缓存未命中,同步加载并更新缓存 synchronized (this) { // 再次检查缓存(双重检查锁) tableSchemas = schemaCache.getIfPresent(cacheKey); if (tableSchemas != null) { tableSchemas.remove(tableName); } } } catch (Exception e) { logger.error("清除表结构失败", e); } } /** * 将Java类型转换为数据库支持的字段类型 * * @param javaType Java类型(如String.class, Integer.class等) * @param dbType 数据库连接URL(用于识别数据库类型) * @return 数据库字段类型字符串(如VARCHAR, INTEGER等) * @throws UnsupportedOperationException 不支持的Java类型或数据库类型 */ public String convertJavaTypeToSqlType(Class javaType, String dbType) { if (javaType == null || dbType == null) { throw new IllegalArgumentException("参数不能为空"); } if (javaType == String.class) { if (ORACLE_TYPES.contains(dbType)) return "VARCHAR2(255)"; if (SQLITE_TYPES.contains(dbType)) return "TEXT"; if (SQLSERVER_TYPES.contains(dbType)) return "NVARCHAR(MAX)"; if (GOLDEN_DB_TYPES.contains(dbType)) return "VARCHAR(4096)"; return "VARCHAR(255)"; } else if (javaType == Integer.class || javaType == int.class) { if (ORACLE_TYPES.contains(dbType)) return "NUMBER(11)"; if (POSTGRESQL_TYPES.contains(dbType) || SQLITE_TYPES.contains(dbType)) return "INTEGER"; return "INT"; } else if (javaType == Long.class || javaType == long.class) { if (ORACLE_TYPES.contains(dbType)) return "NUMBER(19)"; return "BIGINT"; } else if (javaType == Double.class || javaType == double.class) { if (ORACLE_TYPES.contains(dbType)) return "NUMBER(38,10)"; if (SQLSERVER_TYPES.contains(dbType)) return "DECIMAL(38, 10)"; if (SQLITE_TYPES.contains(dbType)) return "REAL"; return "DECIMAL"; } else if (javaType == LocalDate.class) { return "DATE"; } else if (javaType == LocalDateTime.class) { if (Set.of("oracle", "dm", "kingbase", "oceanbase", "postgresql", "gaussdb", "opengauss", "polardb", "tidb").contains(dbType)) { return "TIMESTAMP"; } else if (SQLSERVER_TYPES.contains(dbType)) { return "DATETIME2"; } return "DATETIME"; } else if (javaType == BigDecimal.class) { if (ORACLE_TYPES.contains(dbType)) return "NUMBER(38,10)"; if (SQLSERVER_TYPES.contains(dbType)) return "DECIMAL(38, 10)"; if (SQLITE_TYPES.contains(dbType)) return "REAL"; return "DECIMAL"; } else if (javaType == byte[].class) { if (ORACLE_TYPES.contains(dbType) || SQLITE_TYPES.contains(dbType)) { return "BLOB"; } else if (POSTGRESQL_TYPES.contains(dbType)) { return "BYTEA"; } else if (SQLSERVER_TYPES.contains(dbType)) { return "VARBINARY(MAX)"; } return "LONGBLOB"; } throw new UnsupportedOperationException("不支持的Java类型: " + javaType.getName()); } /** * 表结构封装类 */ public record TableSchema(String name, Map columns) { public Map getColumns() { return Collections.unmodifiableMap(columns); } } /** * 列信息封装类 */ public record ColumnInfo(String name, String type, int dataType, int size, boolean nullable, String remarks) { } }