内置 I/O 转换

SingleStoreDB I/O

有关使用和运行 SingleStoreDB I/O 的管道选项和一般信息。

开始之前

要使用 SingleStoreDB I/O,请将 Maven 工件依赖项添加到您的 pom.xml 文件中。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>2.60.0</version>
</dependency>

附加资源

身份验证

DataSource 配置是为配置 SingleStoreIO 连接属性所必需的。

创建 DataSource 配置

SingleStoreIO.DataSourceConfiguration
    .create("myHost:3306")
    .withDatabase("db")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("password")
    .withUsername("admin");

其中参数可以是

注意 - .withDatabase(...) 对于 .readWithPartitions() 来说是必需的

从 SingleStoreDB 读取

SingleStoreIO 的功能之一是从 SingleStoreDB 表中读取数据。SingleStoreIO 支持两种类型的读取

在很多情况下,由于性能原因,并行数据读取比顺序数据读取更受欢迎。

顺序数据读取

基本 .read() 操作的使用方式如下

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withStatementPreparator(statementPreparator)
        .withOutputParallelization(true)
        .withRowMapper(mapper)
);

其中参数可以是

注意 - .withTable(...).withQuery(...) 是必需的

并行数据读取

基本 .readWithPartitions() 操作的使用方式如下

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withRowMapper(mapper)
);

其中参数可以是

注意 - .withTable(...).withQuery(...) 是必需的

StatementPreparator

StatementPreparatorread() 用于设置 PreparedStatement 的参数。例如

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}

RowMapper

RowMapperread()readWithPartitions() 用于将 ResultSet 的每一行转换为结果 PCollection 的元素。例如

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}

写入 SingleStoreDB 表

SingleStoreIO 的功能之一是写入 SingleStoreDB 表。此转换使您可以将用户的 PCollection 发送到您的 SingleStoreDB 数据库。它返回每个元素批次写入的行数。

基本 .write() 操作的使用方式如下

data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE")
        .withUserDataMapper(mapper)
        .withBatchSize(100000)
);

其中参数可以是

UserDataMapper

UserDataMapper 是必需的,用于将数据从 PCollection 映射到 String 值数组,然后 write() 操作保存数据。例如

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}