Sharding Sphere

透明化底层分库分表框架

子项目

  • sharding-jdbc: 无中心,作为一个类库,客户端直连数据库,Java
  • sharding-proxy:中心,作为一个服务,数据库代理,可支持多语言,提供静态操作入口方便DBA
  • sharding-sidecar(规划中): 无中心,ServiceMesh云原生,每台节点运行一个单独服务

概念


  • 逻辑表:逻辑上没分片的表
  • 真实表:底层数据库内真实存在的表,一般表名带有后缀
  • 数据节点:库+表,数据分片最小单元
  • 绑定表: join的主表和子表分片规则相同,join时只在对应分片之间join,比如主表分片0和子表分片0关联,不会与子表分片1关联
  • 广播表:所有库中都包含的数据相同的表,即冗余在每个库中,一般数据量小便于关联,比如字典表

流程


普通流程

1
SQL解析 -> 查询优化 -> SQL执行

分库分表后

1
SQL解析 -> 查询优化 -> SQL路由 -> SQL改写 -> SQL执行 -> 结果归并

代码对应模块
parse -> optimize -> route -> rewrite -> execute -> merge

路由

携带分片键,进行分片路由

  • 直接路由:使用Hint,由外部传入值决定不由SQL决定,直接定位到某一分片
  • 标准路由:单一逻辑表使用SQL内解析值决定,或者绑定表多表查询
  • 笛卡尔路由:无法确定关联关系,多表组合

不携带分片键,进行广播

SPI


  • 注册中心
    • 接口:RegistryCenter
    • 实现:Zookeeper
  • 分布式主键
    • 接口:ShardingKeyGenerator
    • 实现:UUIDShardingKeyGenerator,SnowflakeShardingKeyGenerator
  • 数据脱敏算法
    • 接口:ShardingEncryptor
    • 实现:AESShardingEncryptor,MD5ShardingEncryptor
  • SQL解析
    • 接口:SQLParserEntry
    • 实现:MySQLParserEntry, PostgreSQLParserEntry, SQLServerParserEntry,OracleParserEntry
  • 数据库协议
    • 接口:DatabaseProtocolFrontendEngine
    • 实现:MySQLProtocolFrontendEngine,PostgreSQLProtocolFrontendEngine
  • 分布式事务
    • 接口:ShardingTransactionManager
    • 实现:XAShardingTransactionManager,SeataATShardingTransactionManager
  • XA事务管理器
    • 接口:XATransactionManager
    • 实现:AtomikosTransactionManager, NarayanaXATransactionManager,BitronixXATransactionManager

数据脱敏


编程面向逻辑列,底层有明文列(可选),密文列,辅助查询列(可选)

  • 任何对逻辑列操作都会改写为对明文列和密文列操作
  • 可以配置查询时使用明文还是密文
  • 相同的原文,为了保证安全性,可以采取策略生成不同密文,但是查询时无法知道两者相等。此时可以使用辅助查询列,即原文相同,辅助查询列也一定相同

配置

加密模块顶层配置,包含两大部分,加密器配置和表配置

EncryptRuleConfiguration.java
1
2
3
4
5
6
7
8
9
10
@RequiredArgsConstructor
@Getter
public final class EncryptRuleConfiguration implements RuleConfiguration {
private final Map<String, EncryptorRuleConfiguration> encryptors;
private final Map<String, EncryptTableRuleConfiguration> tables;

public EncryptRuleConfiguration() {
this(new LinkedHashMap<String, EncryptorRuleConfiguration>(), new LinkedHashMap<String, EncryptTableRuleConfiguration>());
}
}

表配置包含一组列配置

EncryptTableRuleConfiguration.java
1
2
3
4
5
6
7
8
@NoArgsConstructor
@Getter
public final class EncryptTableRuleConfiguration {
private final Map<String, EncryptColumnRuleConfiguration> columns = new LinkedHashMap<>();
public EncryptTableRuleConfiguration(final Map<String, EncryptColumnRuleConfiguration> columns) {
this.columns.putAll(columns);
}
}

列配置包含原文列,密文列,辅助查询列,加密器名称

EncryptColumnRuleConfiguration.java
1
2
3
4
5
6
7
8
@RequiredArgsConstructor
@Getter
public final class EncryptColumnRuleConfiguration {
private final String plainColumn;
private final String cipherColumn;
private final String assistedQueryColumn;
private final String encryptor;
}

加密器配置包含类型和属性

EncryptorRuleConfiguration.java
1
2
3
4
5
6
@Getter
public final class EncryptorRuleConfiguration extends TypeBasedSPIConfiguration {
public EncryptorRuleConfiguration(final String type, final Properties properties) {
super(type, properties);
}
}

加密器SPI

加密器是可以通过SPI进行注册的,静态初始化块触发

ShardingEncryptorServiceLoader.java
1
2
3
4
5
6
7
8
public final class ShardingEncryptorServiceLoader extends TypeBasedSPIServiceLoader<ShardingEncryptor> {
static {
NewInstanceServiceLoader.register(ShardingEncryptor.class);
}
public ShardingEncryptorServiceLoader() {
super(ShardingEncryptor.class);
}
}

注册机制就是原生的ServiceLoader

NewInstanceServiceLoader.java
1
2
3
4
5
6
7
8
9
10
11
12
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NewInstanceServiceLoader {
//每个接口对应一组实现
private static final Map<Class, Collection<Class<?>>> SERVICE_MAP = new HashMap<>();

public static <T> void register(final Class<T> service) {
for (T each : ServiceLoader.load(service)) {
registerServiceClass(service, each);
}
}
//...
}

TypeBasedSPIServiceLoader引入类型机制
多个加密器可能是同一类型,对应同一类,只是配置不同,比如多个AES不同加密key

TypeBasedSPIServiceLoader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RequiredArgsConstructor
public abstract class TypeBasedSPIServiceLoader<T extends TypeBasedSPI> {
private final Class<T> classType;

public final T newService(final String type, final Properties props) {
Collection<T> typeBasedServices = loadTypeBasedServices(type);
if (typeBasedServices.isEmpty()) {
throw new ShardingConfigurationException("Invalid `%s` SPI type `%s`.", classType.getName(), type);
}
T result = typeBasedServices.iterator().next();
result.setProperties(props);
return result;
}
//筛选流程是依次创建所有类实例,进行类型过滤
private Collection<T> loadTypeBasedServices(final String type) {
return Collections2.filter(NewInstanceServiceLoader.newServiceInstances(classType), new Predicate<T>() {
@Override
public boolean apply(final T input) {
return type.equalsIgnoreCase(input.getType());
}
});
}
//...
}

META-INF的services文件夹下存在org.apache.shardingsphere.spi.encrypt.ShardingEncryptor
预制定义了AES加密器和MD5加密器

1
2
org.apache.shardingsphere.core.strategy.encrypt.impl.MD5ShardingEncryptor
org.apache.shardingsphere.core.strategy.encrypt.impl.AESShardingEncryptor

加解密流程

EncryptDataSourceFactory静态获取支持脱敏功能的数据源

EncryptDataSourceFactory.java
1
2
3
4
5
6
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EncryptDataSourceFactory {
public static DataSource createDataSource(final DataSource dataSource, final EncryptRuleConfiguration encryptRuleConfiguration, final Properties props) throws SQLException {
return new EncryptDataSource(dataSource, new EncryptRule(encryptRuleConfiguration), props);
}
}

根据加密配置,生成上下文

EncryptDataSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Getter
public class EncryptDataSource extends AbstractDataSourceAdapter {
private final EncryptRuntimeContext runtimeContext;

public EncryptDataSource(final DataSource dataSource, final EncryptRule encryptRule, final Properties props) throws SQLException {
super(dataSource);
runtimeContext = new EncryptRuntimeContext(dataSource, encryptRule, props, getDatabaseType());
}

@Override
public final EncryptConnection getConnection() throws SQLException {
return new EncryptConnection(getDataSource().getConnection(), runtimeContext);
}

public DataSource getDataSource() {
return getDataSourceMap().values().iterator().next();
}
}

加密的连接上,Statement相关方法都被包装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RequiredArgsConstructor
@Getter
public final class EncryptConnection extends AbstractUnsupportedOperationConnection {
private final Connection connection;
private final EncryptRuntimeContext runtimeContext;

@Override
public Statement createStatement() throws SQLException {
return new EncryptStatement(this);
}
@Override
public PreparedStatement prepareStatement(final String sql) {
return new EncryptPreparedStatement(this, sql);
}

//...
}

执行查询,sql会重写,结果集会包装

EncryptStatement.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class EncryptStatement extends AbstractUnsupportedOperationStatement {
private final EncryptConnection connection;
private final Statement statement;
private OptimizedStatement optimizedStatement;
private EncryptResultSet resultSet;

//更新时重写语句
@Override
public int executeUpdate(final String sql) throws SQLException {
return statement.executeUpdate(getRewriteSQL(sql));
}

//查询时重写语句外,结果集被包装
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet resultSet = statement.executeQuery(getRewriteSQL(sql));
this.resultSet = new EncryptResultSet(connection.getRuntimeContext(), optimizedStatement, this, resultSet);
return this.resultSet;
}

@SuppressWarnings("unchecked")
private String getRewriteSQL(final String sql) {
SQLStatement sqlStatement = connection.getRuntimeContext().getParseEngine().parse(sql, false);
optimizedStatement = EncryptOptimizeEngineFactory.newInstance(sqlStatement)
.optimize(connection.getRuntimeContext().getRule(), connection.getRuntimeContext().getMetaData(), sql, Collections.emptyList(), sqlStatement);
//读取设置是否使用密文查询query.with.cipher.column
SQLRewriteEngine encryptSQLRewriteEngine = new SQLRewriteEngine(connection.getRuntimeContext().getRule(), optimizedStatement, sql, Collections.emptyList(),
connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.QUERY_WITH_CIPHER_COLUMN));
String result = encryptSQLRewriteEngine.generateSQL().getSql();
showSQL(result);
return result;
}

private void showSQL(final String sql) {
//读取设置是否显示查询sql.show
boolean showSQL = connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW);
if (showSQL) {
SQLLogger.logSQL(sql);
}
}
//...
}

结果集进行逻辑列和真实列转换

EncryptResultSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final class EncryptResultSet extends AbstractUnsupportedOperationResultSet {
private final EncryptRule encryptRule;
private final OptimizedStatement optimizedStatement;
private final Statement encryptStatement;
private ResultSet originalResultSet;
private IteratorStreamMergedResult resultSet;
private final Map<String, String> logicAndActualColumns;

public EncryptResultSet(final EncryptRuntimeContext encryptRuntimeContext, final OptimizedStatement optimizedStatement, final Statement encryptStatement, final ResultSet resultSet) throws SQLException {
this.encryptRule = encryptRuntimeContext.getRule();
this.optimizedStatement = optimizedStatement;
this.encryptStatement = encryptStatement;
originalResultSet = resultSet;
//和加密设定一起包装成结果集
QueryResult queryResult = new StreamQueryResult(resultSet, encryptRule);
this.resultSet = new IteratorStreamMergedResult(Collections.singletonList(queryResult));
logicAndActualColumns = createLogicAndActualColumns(encryptRuntimeContext.getProps().<Boolean>getValue(ShardingPropertiesConstant.QUERY_WITH_CIPHER_COLUMN));
}

@Override
public String getString(final String columnLabel) throws SQLException {
return (String) ResultSetUtil.convertValue(resultSet.getValue(getActualColumnLabel(columnLabel), String.class), String.class);
}

@Override
public boolean next() throws SQLException {
return resultSet.next();
}
//...
}

结果集在获取结果时进行解码转换

StreamQueryResult.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class StreamQueryResult implements QueryResult {
@Getter
private final QueryResultMetaData queryResultMetaData;
private final ResultSet resultSet;

@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
return decrypt(columnIndex, QueryResultUtil.getValue(resultSet, columnIndex));
}

private Object decrypt(final int columnIndex, final Object value) throws SQLException {
Optional<ShardingEncryptor> shardingEncryptor = queryResultMetaData.getShardingEncryptor(columnIndex);
return shardingEncryptor.isPresent() ? shardingEncryptor.get().decrypt(getCiphertext(value)) : value;
}
//...
}

流程总结

整体上基于组合扩展,所有JDBC组件被包装
sql由JDBC层获取,与Hibernate等上层框架无关
加密流程基于Statement执行前重写sql,解密流程基于ResultSet取出后转换