透明化底层分库分表框架
子项目
- sharding-jdbc: 无中心,作为一个类库,客户端直连数据库,Java
- sharding-proxy:中心,作为一个服务,数据库代理,可支持多语言,提供静态操作入口方便DBA
- sharding-sidecar(规划中): 无中心,ServiceMesh云原生,每台节点运行一个单独服务
概念
- 逻辑表:逻辑上没分片的表
- 真实表:底层数据库内真实存在的表,一般表名带有后缀
- 数据节点:库+表,数据分片最小单元
- 绑定表: join的主表和子表分片规则相同,join时只在对应分片之间join,比如主表分片0和子表分片0关联,不会与子表分片1关联
- 广播表:所有库中都包含的数据相同的表,即冗余在每个库中,一般数据量小便于关联,比如字典表
流程
普通流程
分库分表后
1
| SQL解析 -> 查询优化 -> SQL路由 -> SQL改写 -> SQL执行 -> 结果归并
|
代码对应模块
parse -> optimize -> route -> rewrite -> execute -> merge
路由
携带分片键,进行分片路由
- 直接路由:使用Hint,由外部传入值决定不由SQL决定,直接定位到某一分片
- 标准路由:单一逻辑表使用SQL内解析值决定,或者绑定表多表查询
- 笛卡尔路由:无法确定关联关系,多表组合
不携带分片键,进行广播
SPI
- 注册中心
- 接口:RegistryCenter
- 实现:Zookeeper
- 分布式主键
- 接口:ShardingKeyGenerator
- 实现:UUIDShardingKeyGenerator,SnowflakeShardingKeyGenerator
- 数据脱敏算法
- 接口:ShardingEncryptor
- 实现:AESShardingEncryptor,MD5ShardingEncryptor
- SQL解析
- 接口:SQLParserEntry
- 实现:MySQLParserEntry, PostgreSQLParserEntry, SQLServerParserEntry,OracleParserEntry
- 数据库协议
- 接口:DatabaseProtocolFrontendEngine
- 实现:MySQLProtocolFrontendEngine,PostgreSQLProtocolFrontendEngine
- 分布式事务
- 接口:ShardingTransactionManager
- 实现:XAShardingTransactionManager,SeataATShardingTransactionManager
- XA事务管理器
- 接口:XATransactionManager
- 实现:AtomikosTransactionManager, NarayanaXATransactionManager,BitronixXATransactionManager
数据脱敏
编程面向逻辑列,底层有明文列(可选),密文列,辅助查询列(可选)
- 任何对逻辑列操作都会改写为对明文列和密文列操作
- 可以配置查询时使用明文还是密文
- 相同的原文,为了保证安全性,可以采取策略生成不同密文,但是查询时无法知道两者相等。此时可以使用辅助查询列,即原文相同,辅助查询列也一定相同
配置
加密模块顶层配置,包含两大部分,加密器配置和表配置
EncryptRuleConfiguration.java1 2 3 4 5 6 7 8 9 10
| @RequiredArgsConstructor @Getter public final class EncryptRuleConfiguration implements RuleConfiguration { private final Map<String, EncryptorRuleConfiguration> encryptors; private final Map<String, EncryptTableRuleConfiguration> tables;
public EncryptRuleConfiguration() { this(new LinkedHashMap<String, EncryptorRuleConfiguration>(), new LinkedHashMap<String, EncryptTableRuleConfiguration>()); } }
|
表配置包含一组列配置
EncryptTableRuleConfiguration.java1 2 3 4 5 6 7 8
| @NoArgsConstructor @Getter public final class EncryptTableRuleConfiguration { private final Map<String, EncryptColumnRuleConfiguration> columns = new LinkedHashMap<>(); public EncryptTableRuleConfiguration(final Map<String, EncryptColumnRuleConfiguration> columns) { this.columns.putAll(columns); } }
|
列配置包含原文列,密文列,辅助查询列,加密器名称
EncryptColumnRuleConfiguration.java1 2 3 4 5 6 7 8
| @RequiredArgsConstructor @Getter public final class EncryptColumnRuleConfiguration { private final String plainColumn; private final String cipherColumn; private final String assistedQueryColumn; private final String encryptor; }
|
加密器配置包含类型和属性
EncryptorRuleConfiguration.java1 2 3 4 5 6
| @Getter public final class EncryptorRuleConfiguration extends TypeBasedSPIConfiguration { public EncryptorRuleConfiguration(final String type, final Properties properties) { super(type, properties); } }
|
加密器SPI
加密器是可以通过SPI进行注册的,静态初始化块触发
ShardingEncryptorServiceLoader.java1 2 3 4 5 6 7 8
| public final class ShardingEncryptorServiceLoader extends TypeBasedSPIServiceLoader<ShardingEncryptor> { static { NewInstanceServiceLoader.register(ShardingEncryptor.class); } public ShardingEncryptorServiceLoader() { super(ShardingEncryptor.class); } }
|
注册机制就是原生的ServiceLoader
NewInstanceServiceLoader.java1 2 3 4 5 6 7 8 9 10 11 12
| @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class NewInstanceServiceLoader { private static final Map<Class, Collection<Class<?>>> SERVICE_MAP = new HashMap<>();
public static <T> void register(final Class<T> service) { for (T each : ServiceLoader.load(service)) { registerServiceClass(service, each); } } }
|
TypeBasedSPIServiceLoader引入类型机制
多个加密器可能是同一类型,对应同一类,只是配置不同,比如多个AES不同加密key
TypeBasedSPIServiceLoader.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @RequiredArgsConstructor public abstract class TypeBasedSPIServiceLoader<T extends TypeBasedSPI> { private final Class<T> classType;
public final T newService(final String type, final Properties props) { Collection<T> typeBasedServices = loadTypeBasedServices(type); if (typeBasedServices.isEmpty()) { throw new ShardingConfigurationException("Invalid `%s` SPI type `%s`.", classType.getName(), type); } T result = typeBasedServices.iterator().next(); result.setProperties(props); return result; } private Collection<T> loadTypeBasedServices(final String type) { return Collections2.filter(NewInstanceServiceLoader.newServiceInstances(classType), new Predicate<T>() { @Override public boolean apply(final T input) { return type.equalsIgnoreCase(input.getType()); } }); } }
|
META-INF的services文件夹下存在org.apache.shardingsphere.spi.encrypt.ShardingEncryptor
预制定义了AES加密器和MD5加密器
1 2
| org.apache.shardingsphere.core.strategy.encrypt.impl.MD5ShardingEncryptor org.apache.shardingsphere.core.strategy.encrypt.impl.AESShardingEncryptor
|
加解密流程
EncryptDataSourceFactory静态获取支持脱敏功能的数据源
EncryptDataSourceFactory.java1 2 3 4 5 6
| @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class EncryptDataSourceFactory { public static DataSource createDataSource(final DataSource dataSource, final EncryptRuleConfiguration encryptRuleConfiguration, final Properties props) throws SQLException { return new EncryptDataSource(dataSource, new EncryptRule(encryptRuleConfiguration), props); } }
|
根据加密配置,生成上下文
EncryptDataSource.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Getter public class EncryptDataSource extends AbstractDataSourceAdapter { private final EncryptRuntimeContext runtimeContext;
public EncryptDataSource(final DataSource dataSource, final EncryptRule encryptRule, final Properties props) throws SQLException { super(dataSource); runtimeContext = new EncryptRuntimeContext(dataSource, encryptRule, props, getDatabaseType()); }
@Override public final EncryptConnection getConnection() throws SQLException { return new EncryptConnection(getDataSource().getConnection(), runtimeContext); }
public DataSource getDataSource() { return getDataSourceMap().values().iterator().next(); } }
|
加密的连接上,Statement相关方法都被包装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RequiredArgsConstructor @Getter public final class EncryptConnection extends AbstractUnsupportedOperationConnection { private final Connection connection; private final EncryptRuntimeContext runtimeContext;
@Override public Statement createStatement() throws SQLException { return new EncryptStatement(this); } @Override public PreparedStatement prepareStatement(final String sql) { return new EncryptPreparedStatement(this, sql); }
}
|
执行查询,sql会重写,结果集会包装
EncryptStatement.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public final class EncryptStatement extends AbstractUnsupportedOperationStatement { private final EncryptConnection connection; private final Statement statement; private OptimizedStatement optimizedStatement; private EncryptResultSet resultSet;
@Override public int executeUpdate(final String sql) throws SQLException { return statement.executeUpdate(getRewriteSQL(sql)); }
@Override public ResultSet executeQuery(final String sql) throws SQLException { ResultSet resultSet = statement.executeQuery(getRewriteSQL(sql)); this.resultSet = new EncryptResultSet(connection.getRuntimeContext(), optimizedStatement, this, resultSet); return this.resultSet; }
@SuppressWarnings("unchecked") private String getRewriteSQL(final String sql) { SQLStatement sqlStatement = connection.getRuntimeContext().getParseEngine().parse(sql, false); optimizedStatement = EncryptOptimizeEngineFactory.newInstance(sqlStatement) .optimize(connection.getRuntimeContext().getRule(), connection.getRuntimeContext().getMetaData(), sql, Collections.emptyList(), sqlStatement); SQLRewriteEngine encryptSQLRewriteEngine = new SQLRewriteEngine(connection.getRuntimeContext().getRule(), optimizedStatement, sql, Collections.emptyList(), connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.QUERY_WITH_CIPHER_COLUMN)); String result = encryptSQLRewriteEngine.generateSQL().getSql(); showSQL(result); return result; }
private void showSQL(final String sql) { boolean showSQL = connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW); if (showSQL) { SQLLogger.logSQL(sql); } } }
|
结果集进行逻辑列和真实列转换
EncryptResultSet.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public final class EncryptResultSet extends AbstractUnsupportedOperationResultSet { private final EncryptRule encryptRule; private final OptimizedStatement optimizedStatement; private final Statement encryptStatement; private ResultSet originalResultSet; private IteratorStreamMergedResult resultSet; private final Map<String, String> logicAndActualColumns;
public EncryptResultSet(final EncryptRuntimeContext encryptRuntimeContext, final OptimizedStatement optimizedStatement, final Statement encryptStatement, final ResultSet resultSet) throws SQLException { this.encryptRule = encryptRuntimeContext.getRule(); this.optimizedStatement = optimizedStatement; this.encryptStatement = encryptStatement; originalResultSet = resultSet; QueryResult queryResult = new StreamQueryResult(resultSet, encryptRule); this.resultSet = new IteratorStreamMergedResult(Collections.singletonList(queryResult)); logicAndActualColumns = createLogicAndActualColumns(encryptRuntimeContext.getProps().<Boolean>getValue(ShardingPropertiesConstant.QUERY_WITH_CIPHER_COLUMN)); }
@Override public String getString(final String columnLabel) throws SQLException { return (String) ResultSetUtil.convertValue(resultSet.getValue(getActualColumnLabel(columnLabel), String.class), String.class); }
@Override public boolean next() throws SQLException { return resultSet.next(); } }
|
结果集在获取结果时进行解码转换
StreamQueryResult.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public final class StreamQueryResult implements QueryResult { @Getter private final QueryResultMetaData queryResultMetaData; private final ResultSet resultSet;
@Override public Object getValue(final int columnIndex, final Class<?> type) throws SQLException { return decrypt(columnIndex, QueryResultUtil.getValue(resultSet, columnIndex)); }
private Object decrypt(final int columnIndex, final Object value) throws SQLException { Optional<ShardingEncryptor> shardingEncryptor = queryResultMetaData.getShardingEncryptor(columnIndex); return shardingEncryptor.isPresent() ? shardingEncryptor.get().decrypt(getCiphertext(value)) : value; } }
|
流程总结
整体上基于组合扩展,所有JDBC组件被包装
sql由JDBC层获取,与Hibernate等上层框架无关
加密流程基于Statement执行前重写sql,解密流程基于ResultSet取出后转换