内置 I/O 转换

Snowflake I/O

有关使用和运行 Snowflake IO 的管道选项和一般信息。

开始之前

要使用 SnowflakeIO,请将 Maven 工件依赖项添加到你的 pom.xml 文件中。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-snowflake</artifactId>
    <version>2.60.0</version>
</dependency>

其他资源

身份验证

读取和批处理写入支持以下身份验证方法

流式写入仅支持密钥对身份验证。有关详细信息,请参阅:BEAM-3304.

通过用于实例化 SnowflakeIO.DataSourceConfiguration 类的管道选项传递凭据。每个身份验证方法都有不同的方法来配置此类。

用户名和密码

要在 SnowflakeIO 中使用用户名/密码身份验证,请使用以下管道选项调用你的管道

--username=<USERNAME> --password=<PASSWORD>

通过用于实例化 SnowflakeIO.DataSourceConfiguration 类的管道选项传递凭据。

SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
        .withUsernamePasswordAuth(
                options.getUsername(),
                options.getPassword())
        .withServerName(options.getServerName())
        .withDatabase(options.getDatabase())
        .withRole(options.getRole())
        .withWarehouse(options.getWarehouse())
        .withSchema(options.getSchema());

密钥对

要使用此身份验证方法,你必须首先生成一个密钥对,并将公钥与将使用 IO 转换进行连接的 Snowflake 用户关联。有关说明,请参阅 Snowflake 文档中的 密钥对身份验证和密钥对轮换

要使用 SnowflakeIO 的密钥对身份验证,请使用以下管道选项集之一调用你的管道

OAuth 令牌

SnowflakeIO 还支持 OAuth 令牌。

重要提示:SnowflakeIO 需要有效的 OAuth 访问令牌。它既不能刷新令牌,也不能使用基于 Web 的流程获取令牌。有关配置 OAuth 集成和获取令牌的信息,请参阅 Snowflake 文档

获得令牌后,请使用以下管道选项调用你的管道

--oauthToken=<TOKEN>
SnowflakeIO.DataSourceConfiguration 类的初始化可能如下所示
 SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());

数据源配置

数据源配置在读取和写入对象中都需要,用于为 IO 目的配置 Snowflake 连接属性。

一般用法

创建数据源配置

 SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());
其中参数可以是

注意 - .withUrl(...).withServerName(...) 必须有一个

管道选项

使用 Beam 的 管道选项 通过命令行设置选项。

Snowflake 管道选项

Snowflake IO 库支持以下选项,这些选项默认情况下可以通过 命令行 传递,前提是管道使用它们

--url Snowflake 的类似 JDBC 的 URL,包括帐户名称和区域,不包含任何参数。

--serverName 包含帐户、区域和域的完整服务器名称。

--username 用户名/密码和私钥身份验证必需。

--oauthToken 仅 OAuth 身份验证必需。

--password 用户名/密码身份验证必需。

--privateKeyPath 私钥文件路径。仅私钥身份验证必需。

--rawPrivateKey 私钥。仅私钥身份验证必需。

--privateKeyPassphrase 私钥的密码。仅私钥身份验证必需。

--stagingBucketName/ 结尾的外部存储桶路径。例如 {gs,s3}://bucket/。允许使用子目录。

--storageIntegrationName 存储集成名称

--warehouse 要使用的仓库。可选。

--database 要连接的数据库名称。可选。

--schema 要使用的模式。可选。

--table 要使用的表。可选。

--query 要使用的查询。可选。

--role 要使用的角色。可选。

--authenticator 要使用的身份验证器。可选。

--portNumber 端口号。可选。

--loginTimeout 登录超时。可选。

--snowPipe SnowPipe 名称。可选。

使用管道选项运行主命令

要通过命令行传递管道选项,请在 gradle 命令中使用 --args,如下所示

./gradle run
    --args="
        --serverName=<SNOWFLAKE SERVER NAME>
           Example: --serverName=account.region.gcp.snowflakecomputing.com
        --username=<SNOWFLAKE USERNAME>
           Example: --username=testuser
        --password=<SNOWFLAKE PASSWORD>
           Example: --password=mypassword
        --database=<SNOWFLAKE DATABASE>
           Example: --database=TEST_DATABASE
        --schema=<SNOWFLAKE SCHEMA>
           Example: --schema=public
        --table=<SNOWFLAKE TABLE IN DATABASE>
           Example: --table=TEST_TABLE
        --query=<IF NOT TABLE THEN QUERY>
           Example: --query=‘SELECT column FROM TABLE’
        --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
           Example: --storageIntegrationName=my_integration
        --stagingBucketName=<GCS OR S3 BUCKET>
           Example: --stagingBucketName={gs,s3}://bucket/
        --runner=<DirectRunner/DataflowRunner>
           Example: --runner=DataflowRunner
        --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
           Example: --project=my_project
        --tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
                        WITH gs://…>
           Example: --tempLocation=gs://bucket/temp/
        --region=<FOR DATAFLOW RUNNER: GCP REGION>
           Example: --region=us-east-1
        --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
           Example: --appName=my_job"
然后,在代码中,可以使用 options.getStagingBucketName() 命令访问参数。

使用管道选项运行测试命令

要通过命令行传递管道选项,请在 gradle 命令中使用 -DintegrationTestPipelineOptions,如下所示

./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
  "--serverName=<SNOWFLAKE SERVER NAME>",
      Example: --serverName=account.region.gcp.snowflakecomputing.com
  "--username=<SNOWFLAKE USERNAME>",
      Example: --username=testuser
  "--password=<SNOWFLAKE PASSWORD>",
      Example: --password=mypassword
  "--schema=<SNOWFLAKE SCHEMA>",
      Example: --schema=PUBLIC
  "--table=<SNOWFLAKE TABLE IN DATABASE>",
      Example: --table=TEST_TABLE
  "--database=<SNOWFLAKE DATABASE>",
      Example: --database=TEST_DATABASE
  "--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
      Example: --storageIntegrationName=my_integration
  "--stagingBucketName=<GCS OR S3 BUCKET>",
      Example: --stagingBucketName={gs,s3}://bucket
  "--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
      Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache

所有参数都以“–”开头,用双引号括起来,并用逗号分隔

在 Dataflow 上运行管道

默认情况下,管道在您本地机器上的 Direct Runner 上运行。要在 Google Dataflow 上运行管道,您必须提供以下管道选项

可在 此处 找到更多 Dataflow 的管道选项。

注意:要正确地使用 Google Cloud 进行身份验证,请使用 gcloud 或遵循 Google Cloud 文档

重要:请确认 Google Dataflow 定价

在 Dataflow 上运行管道模板

Google Dataflow 支持 模板 创建,这意味着将管道暂存到 Cloud Storage 并运行它们,并能够传递在管道执行期间才可用的运行时参数。

创建您自己的 Dataflow 模板的过程如下

  1. 创建您自己的管道。
  2. 创建 Dataflow 模板,并检查 SnowflakeIO 在运行时支持哪些选项。
  3. 使用 Cloud ConsoleREST APIgcloud 运行 Dataflow 模板。

目前,SnowflakeIO 在运行时支持以下选项

目前,SnowflakeIO 不支持在运行时使用以下选项

写入 Snowflake 表

SnowflakeIO 的功能之一是写入 Snowflake 表。此转换使您能够使用将用户 PCollection 发送到 Snowflake 数据库的输出操作来完成 Beam 管道。

批处理写入(来自有界源)

基本 .write() 操作的使用方法如下

data.apply(
   SnowflakeIO.<type>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
)
用要写入的 PCollection 对象的数据类型替换 type;例如,对于字符串的输入 PCollection,使用 SnowflakeIO.<String>

所有以下参数都是必需的

注意:SnowflakeIO 在后台使用 COPY 语句进行写入(使用 COPY 到表)。StagingBucketName 将用于保存最终将存储在 Snowflake 中的 CSV 文件。这些 CSV 文件将保存在“stagingBucketName”路径下。

可选 用于批处理

流式写入(来自无界源)

需要在 Snowflake 控制台中创建一个 SnowPipe。SnowPipe 应使用与 .withStagingBucketName.withStorageIntegrationName 方法指定的集成和存储桶相同的集成和存储桶。写入操作可能如下所示

data.apply(
   SnowflakeIO.<type>write()
      .withStagingBucketName("BUCKET")
      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
      .withDataSourceConfiguration(dc)
      .withUserDataMapper(mapper)
      .withSnowPipe("MY_SNOW_PIPE")
      .withFlushTimeLimit(Duration.millis(time))
      .withFlushRowLimit(rowsNumber)
      .withShardsNumber(shardsNumber)
)

参数

必需 用于流式传输

注意:提供模式数据库名称非常重要。

注意:

如前所述,SnowflakeIO 在后台使用 SnowPipe REST 调用 从无界源进行写入。StagingBucketName 将用于保存最终将存储在 Snowflake 中的 CSV 文件。SnowflakeIO 不会在流式传输期间或完成后从“stagingBucketName”下的路径删除创建的 CSV 文件。

可选 用于流式传输

重要提示:

  1. 流式传输仅接受密钥对身份验证。有关详细信息,请参见:问题 21287
  2. SnowflakeIO.DataSourceConfiguration 对象中配置的角色参数将被忽略,以进行流式写入。有关详细信息,请参见:问题 21365

刷新时间:持续时间和行数

持续时间:流式写入将根据在刷新时间限制中指定的持续时间定期在阶段上写入文件(例如,每 1 分钟一次)。

行数:用于写入的阶段文件将具有在刷新行限制中指定的行数,除非达到刷新时间限制(例如,如果限制是 1000 行,缓冲区收集了 99 行,并且 1 分钟刷新时间过去,则这些行将发送到 SnowPipe 以进行插入)。

阶段文件的尺寸将取决于行的大小和使用的压缩(GZIP)。

UserDataMapper 函数

UserDataMapper 函数是必需的,用于将数据从 PCollection 映射到字符串值数组,然后 write() 操作将数据保存到临时 .csv 文件中。例如

public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
}

其他写入选项

转换查询

.withQueryTransformation() 选项用于 write() 操作,它接受一个 SQL 查询作为字符串值,该查询将在将数据从暂存的 CSV 文件直接传输到目标 Snowflake 表时执行。有关转换 SQL 语法的更多信息,请参阅 Snowflake 文档

用法

String query = "SELECT t.$1 from YOUR_TABLE;";
data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withQueryTransformation(query)
)

写入处置

通过为 write() 操作指定 .withWriteDisposition(...) 选项来定义基于将写入数据的表的行为。支持以下值

用法示例

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withWriteDisposition(TRUNCATE)
)

创建处置

.withCreateDisposition() 选项定义如果目标表不存在,写入操作的行为。支持以下值

用法

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withCreateDisposition(CREATE_NEVER)
)

表模式处置

.withCreateDisposition() 选项设置为 CREATE_IF_NEEDED 时,.withTableSchema() 选项使您能够指定创建的目标表的模式。表模式是 SnowflakeColumn 对象的列表,其中名称和类型对应于表中每列的列类型。

用法

SnowflakeTableSchema tableSchema =
    new SnowflakeTableSchema(
        SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
        new SnowflakeColumn("id", new SnowflakeNumber()),
        SnowflakeColumn.of("name", new SnowflakeText(), true));

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withTableSchema(tableSchema)
)

从 Snowflake 读取

SnowflakeIO 的功能之一是读取 Snowflake 表 - 可以通过表名称读取完整表,也可以通过查询读取自定义数据。读取转换的输出是用户定义数据类型的 PCollection

一般用法

基本 .read() 操作的使用方法

PCollection<USER_DATA_TYPE> items = pipeline.apply(
   SnowflakeIO.<USER_DATA_TYPE>read()
       .withDataSourceConfiguration(dc)
       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withCsvMapper(mapper)
       .withCoder(coder));
)
所有以下参数都是必需的

注意:SnowflakeIO 在幕后使用 COPY 语句读取(使用 COPY to location)云存储中暂存的文件。StagingBucketName 将用作存储 CSV 文件的临时位置。这些临时目录将被命名为 sf_copy_csv_DATE_TIME_RANDOMSUFFIX,并且一旦读取操作完成,它们将被自动删除。

CSVMapper

SnowflakeIO 使用 COPY INTO 语句将数据从 Snowflake 表移动到 GCS/S3 作为 CSV 文件。然后,这些文件通过 FileIO 下载并逐行处理。每行使用 OpenCSV 库拆分为字符串数组。

CSVMapper 的作用是为用户提供将字符串数组转换为用户定义类型的可能性,例如 Avro 或 Parquet 文件的 GenericRecord,或自定义 POJO。

用于 GenericRecord 的 CsvMapper 的示例实现

static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
   return (SnowflakeIO.CsvMapper<GenericRecord>)
           parts -> {
               return new GenericRecordBuilder(PARQUET_SCHEMA)
                       .set("ID", Long.valueOf(parts[0]))
                       .set("NAME", parts[1])
                       [...]
                       .build();
           };
}

将 SnowflakeIO 与 AWS S3 一起使用

为了能够使用 AWS S3 存储桶作为 stagingBucketName,需要

  1. 创建 PipelineOptions 接口,该接口 扩展 SnowflakePipelineOptionsS3Options,并带有 AwsAccessKeyAwsSecretKey 选项。例如

public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {

    @Description("AWS Access Key")
    @Default.String("access_key")
    String getAwsAccessKey();

    void setAwsAccessKey(String awsAccessKey);

    @Description("AWS secret key")
    @Default.String("secret_key")
    String getAwsSecretKey();

    void setAwsSecretKey(String awsSecretKey);
}
2. 使用 AwsAccessKeyAwsSecretKey 选项设置 AwsCredentialsProvider 选项。

options.setAwsCredentialsProvider(
    new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())
    )
);
3. 创建管道

Pipeline p = Pipeline.create(options);

注意:请记住从 S3Options 中设置 awsRegion

在 Python SDK 中使用 SnowflakeIO

简介

Snowflake 跨语言实现支持 Python 编程语言的读写操作,这得益于跨语言,它是 可移植性框架路线图 的一部分,其目标是在整个 Beam 生态系统中提供完全的互操作性。从开发人员的角度来看,这意味着可以组合用不同语言编写的转换(Java/Python/Go)。

有关跨语言的更多信息,请参见 多 SDK 工作跨语言转换 API 和扩展服务 文章。

其他资源

从 Snowflake 读取

SnowflakeIO 的功能之一是读取 Snowflake 表 - 既可以通过表名读取完整表,也可以通过查询读取自定义数据。读取转换的输出是用户定义数据类型的 PCollection

一般用法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | ReadFromSnowflake(...)
       | <FURTHER TRANSFORMS>)

必需参数

身份验证参数

需要传递以下有效的参数组合之一来进行身份验证

其他参数

写入 Snowflake

SnowflakeIO 的功能之一是写入 Snowflake 表。此转换使您能够使用将用户 PCollection 发送到 Snowflake 数据库的输出操作来完成 Beam 管道。

一般用法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | <SOURCE OF DATA>
       | WriteToSnowflake(
           server_name=<SNOWFLAKE SERVER NAME>,
           username=<SNOWFLAKE USERNAME>,
           password=<SNOWFLAKE PASSWORD>,
           o_auth_token=<OAUTH TOKEN>,
           private_key_path=<PATH TO P8 FILE>,
           raw_private_key=<PRIVATE_KEY>
           private_key_passphrase=<PASSWORD FOR KEY>,
           schema=<SNOWFLAKE SCHEMA>,
           database=<SNOWFLAKE DATABASE>,
           staging_bucket_name=<GCS OR S3 BUCKET>,
           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
           create_disposition=<CREATE DISPOSITION>,
           write_disposition=<WRITE DISPOSITION>,
           table_schema=<SNOWFLAKE TABLE SCHEMA>,
           user_data_mapper=<USER DATA MAPPER FUNCTION>,
           table=<SNOWFLAKE TABLE>,
           query=<IF NOT TABLE THEN QUERY>,
           role=<SNOWFLAKE ROLE>,
           warehouse=<SNOWFLAKE WAREHOUSE>,
           expansion_service=<EXPANSION SERVICE ADDRESS>))

必需参数

身份验证参数

需要传递以下有效的参数组合之一来进行身份验证

其他参数

限制

SnowflakeIO 目前存在以下限制。

  1. 流式写入仅支持密钥对身份验证。有关详细信息,请参见:问题 21287

  2. SnowflakeIO.DataSourceConfiguration 对象中配置的角色参数将被忽略,以进行流式写入。有关详细信息,请参见:问题 21365