Snowflake I/O
有关使用和运行 Snowflake IO 的管道选项和一般信息。
开始之前
要使用 SnowflakeIO,请将 Maven 工件依赖项添加到你的 pom.xml
文件中。
其他资源
身份验证
读取和批处理写入支持以下身份验证方法
- 用户名和密码
- 密钥对
- OAuth 令牌
流式写入仅支持密钥对身份验证。有关详细信息,请参阅:BEAM-3304.
通过用于实例化 SnowflakeIO.DataSourceConfiguration
类的管道选项传递凭据。每个身份验证方法都有不同的方法来配置此类。
用户名和密码
要在 SnowflakeIO 中使用用户名/密码身份验证,请使用以下管道选项调用你的管道
通过用于实例化 SnowflakeIO.DataSourceConfiguration
类的管道选项传递凭据。
SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(
options.getUsername(),
options.getPassword())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());
密钥对
要使用此身份验证方法,你必须首先生成一个密钥对,并将公钥与将使用 IO 转换进行连接的 Snowflake 用户关联。有关说明,请参阅 Snowflake 文档中的 密钥对身份验证和密钥对轮换。
要使用 SnowflakeIO 的密钥对身份验证,请使用以下管道选项集之一调用你的管道
- 将密钥作为路径传递
SnowflakeIO.DataSourceConfiguration
类的初始化可能如下所示SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairPathAuth( options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
- 将密钥作为值传递
SnowflakeIO.DataSourceConfiguration
类的初始化可能如下所示SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairRawAuth( options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
OAuth 令牌
SnowflakeIO 还支持 OAuth 令牌。
重要提示:SnowflakeIO 需要有效的 OAuth 访问令牌。它既不能刷新令牌,也不能使用基于 Web 的流程获取令牌。有关配置 OAuth 集成和获取令牌的信息,请参阅 Snowflake 文档。
获得令牌后,请使用以下管道选项调用你的管道
SnowflakeIO.DataSourceConfiguration
类的初始化可能如下所示数据源配置
数据源配置在读取和写入对象中都需要,用于为 IO 目的配置 Snowflake 连接属性。
一般用法
创建数据源配置
.withUrl(...)
- 用于你的 Snowflake 帐户的类似 JDBC 的 URL,包括帐户名称和区域,不包含任何参数。
- 例如:
.withUrl("jdbc:snowflake://account.snowflakecomputing.com")
.withServerName(...)
- 服务器名称 - 包含帐户、区域和域的完整服务器名称。
- 例如:
.withServerName("account.snowflakecomputing.com")
.withDatabase(...)
- 要使用的 Snowflake 数据库的名称。
- 例如:
.withDatabase("MY_DATABASE")
.withWarehouse(...)
- 要使用的 Snowflake 仓库的名称。此参数是可选的。如果未指定仓库名称,则使用用户的默认仓库。
- 例如:
.withWarehouse("MY_WAREHOUSE")
.withSchema(...)
- 要使用的数据库中模式的名称。此参数是可选的。
- 例如:
.withSchema("PUBLIC")
.withUsernamePasswordAuth(username, password)
- 设置用户名/密码身份验证。
- 例如:
.withUsernamePasswordAuth("USERNAME", "PASSWORD")
.withOAuth(token)
- 设置 OAuth 身份验证。
- 例如:
.withOAuth("TOKEN")
.withKeyPairAuth(username, privateKey)
- 使用用户名和 PrivateKey 设置密钥对身份验证
- 例如:
.withKeyPairAuth("USERNAME",
PrivateKey)
.withKeyPairPathAuth(username, privateKeyPath, privateKeyPassphrase)
- 使用用户名、私钥文件路径和密码设置密钥对身份验证。
- 例如:
.withKeyPairPathAuth("USERNAME", "PATH/TO/KEY.P8", "PASSPHRASE")
.withKeyPairRawAuth(username, rawPrivateKey, privateKeyPassphrase)
- 使用用户名、私钥和密码设置密钥对身份验证。
- 例如:
.withKeyPairRawAuth("USERNAME", "PRIVATE_KEY", "PASSPHRASE")
注意 - .withUrl(...)
或 .withServerName(...)
必须有一个。
管道选项
使用 Beam 的 管道选项 通过命令行设置选项。
Snowflake 管道选项
Snowflake IO 库支持以下选项,这些选项默认情况下可以通过 命令行 传递,前提是管道使用它们
--url
Snowflake 的类似 JDBC 的 URL,包括帐户名称和区域,不包含任何参数。
--serverName
包含帐户、区域和域的完整服务器名称。
--username
用户名/密码和私钥身份验证必需。
--oauthToken
仅 OAuth 身份验证必需。
--password
用户名/密码身份验证必需。
--privateKeyPath
私钥文件路径。仅私钥身份验证必需。
--rawPrivateKey
私钥。仅私钥身份验证必需。
--privateKeyPassphrase
私钥的密码。仅私钥身份验证必需。
--stagingBucketName
以 /
结尾的外部存储桶路径。例如 {gs,s3}://bucket/
。允许使用子目录。
--storageIntegrationName
存储集成名称
--warehouse
要使用的仓库。可选。
--database
要连接的数据库名称。可选。
--schema
要使用的模式。可选。
--table
要使用的表。可选。
--query
要使用的查询。可选。
--role
要使用的角色。可选。
--authenticator
要使用的身份验证器。可选。
--portNumber
端口号。可选。
--loginTimeout
登录超时。可选。
--snowPipe
SnowPipe 名称。可选。
使用管道选项运行主命令
要通过命令行传递管道选项,请在 gradle 命令中使用 --args
,如下所示
./gradle run
--args="
--serverName=<SNOWFLAKE SERVER NAME>
Example: --serverName=account.region.gcp.snowflakecomputing.com
--username=<SNOWFLAKE USERNAME>
Example: --username=testuser
--password=<SNOWFLAKE PASSWORD>
Example: --password=mypassword
--database=<SNOWFLAKE DATABASE>
Example: --database=TEST_DATABASE
--schema=<SNOWFLAKE SCHEMA>
Example: --schema=public
--table=<SNOWFLAKE TABLE IN DATABASE>
Example: --table=TEST_TABLE
--query=<IF NOT TABLE THEN QUERY>
Example: --query=‘SELECT column FROM TABLE’
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
Example: --storageIntegrationName=my_integration
--stagingBucketName=<GCS OR S3 BUCKET>
Example: --stagingBucketName={gs,s3}://bucket/
--runner=<DirectRunner/DataflowRunner>
Example: --runner=DataflowRunner
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
Example: --project=my_project
--tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
WITH gs://…>
Example: --tempLocation=gs://bucket/temp/
--region=<FOR DATAFLOW RUNNER: GCP REGION>
Example: --region=us-east-1
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
Example: --appName=my_job"
options.getStagingBucketName()
命令访问参数。使用管道选项运行测试命令
要通过命令行传递管道选项,请在 gradle 命令中使用 -DintegrationTestPipelineOptions
,如下所示
./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
"--serverName=<SNOWFLAKE SERVER NAME>",
Example: --serverName=account.region.gcp.snowflakecomputing.com
"--username=<SNOWFLAKE USERNAME>",
Example: --username=testuser
"--password=<SNOWFLAKE PASSWORD>",
Example: --password=mypassword
"--schema=<SNOWFLAKE SCHEMA>",
Example: --schema=PUBLIC
"--table=<SNOWFLAKE TABLE IN DATABASE>",
Example: --table=TEST_TABLE
"--database=<SNOWFLAKE DATABASE>",
Example: --database=TEST_DATABASE
"--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
Example: --storageIntegrationName=my_integration
"--stagingBucketName=<GCS OR S3 BUCKET>",
Example: --stagingBucketName={gs,s3}://bucket
"--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache
所有参数都以“–”开头,用双引号括起来,并用逗号分隔
--serverName=<SNOWFLAKE SERVER NAME>
- 指定您的帐户的完整名称(由 Snowflake 提供)。请注意,您的完整帐户名称可能包含标识帐户所在的区域和云平台的其他段。
- 示例:
--serverName=xy12345.eu-west-1.gcp..snowflakecomputing.com
--username=<SNOWFLAKE USERNAME>
- 指定用户的登录名。
- 示例:
--username=my_username
--password=<SNOWFLAKE PASSWORD>
- 指定指定用户的密码。
- 示例:
--password=my_secret
--schema=<SNOWFLAKE SCHEMA>
- 指定连接后要使用的模式。指定的模式应该是指定用户角色具有权限的现有模式。
- 示例:
--schema=PUBLIC
--table=<SNOWFLAKE TABLE IN DATABASE>
- 示例:
--table=MY_TABLE
- 示例:
--database=<SNOWFLAKE DATABASE>
- 指定连接后要使用的数据库。指定的数据库应该是指定用户角色具有权限的现有数据库。
- 示例:
--database=MY_DATABASE
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
- 在 Snowflake 中为选择的云存储创建的存储集成名称。
- 示例:
--storageIntegrationName=my_google_integration
在 Dataflow 上运行管道
默认情况下,管道在您本地机器上的 Direct Runner 上运行。要在 Google Dataflow 上运行管道,您必须提供以下管道选项
--runner=DataflowRunner
- Dataflow 的特定运行器。
--project=<GCS PROJECT>
- Google Cloud Platform 项目的名称。
--stagingBucketName=<GCS OR S3 BUCKET>
- 将暂存 Beam 文件的 Google Cloud Services 存储桶或 AWS S3 存储桶。
--maxNumWorkers=5
- (可选) 工作人员的最大数量。
--appName=<JOB NAME>
- (可选) Dataflow 仪表板中作业名称的前缀。
可在 此处 找到更多 Dataflow 的管道选项。
注意:要正确地使用 Google Cloud 进行身份验证,请使用 gcloud 或遵循 Google Cloud 文档。
重要:请确认 Google Dataflow 定价
在 Dataflow 上运行管道模板
Google Dataflow 支持 模板 创建,这意味着将管道暂存到 Cloud Storage 并运行它们,并能够传递在管道执行期间才可用的运行时参数。
创建您自己的 Dataflow 模板的过程如下
- 创建您自己的管道。
- 创建 Dataflow 模板,并检查 SnowflakeIO 在运行时支持哪些选项。
- 使用 Cloud Console、REST API 或 gcloud 运行 Dataflow 模板。
目前,SnowflakeIO 在运行时支持以下选项
--serverName
包含帐户、区域和域的完整服务器名称。--username
用户名/密码和私钥身份验证必需。--password
用户名/密码身份验证必需。--rawPrivateKey
私钥文件。仅在使用私钥身份验证时需要。--privateKeyPassphrase
私钥的密码。仅私钥身份验证必需。--stagingBucketName
以/
结尾的外部存储桶路径。例如{gs,s3}://bucket/
。允许使用子目录。--storageIntegrationName
存储集成名称。--warehouse
要使用的仓库。可选。--database
要连接的数据库名称。可选。--schema
要使用的模式。可选。--table
要使用的表。可选。注意:table 不在默认管道选项中。--query
要使用的查询。可选。注意:query 不在默认管道选项中。--role
要使用的角色。可选。--snowPipe
SnowPipe 名称。可选。
目前,SnowflakeIO 不支持在运行时使用以下选项
--url
Snowflake 的类似 JDBC 的 URL,包括帐户名称和区域,不包含任何参数。--oauthToken
仅 OAuth 身份验证必需。--privateKeyPath
私钥文件路径。仅私钥身份验证必需。--authenticator
要使用的身份验证器。可选。--portNumber
端口号。可选。--loginTimeout
登录超时。可选。
写入 Snowflake 表
SnowflakeIO 的功能之一是写入 Snowflake 表。此转换使您能够使用将用户 PCollection 发送到 Snowflake 数据库的输出操作来完成 Beam 管道。
批处理写入(来自有界源)
基本 .write()
操作的使用方法如下
PCollection
对象的数据类型替换 type;例如,对于字符串的输入 PCollection
,使用 SnowflakeIO.<String>
。所有以下参数都是必需的
.withDataSourceConfiguration()
接受 DatasourceConfiguration 对象。.to()
接受目标 Snowflake 表名称。.withStagingBucketName()
接受以斜杠结尾的云存储桶路径。- 示例:.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
接受根据 Snowflake 文档创建的 Snowflake 存储集成对象的名称。示例然后.withUserDataMapper()
接受 UserDataMapper 函数,该函数将把用户的 PCollection 映射到字符串值数组(String[])
。
注意:SnowflakeIO 在后台使用 COPY
语句进行写入(使用 COPY 到表)。StagingBucketName 将用于保存最终将存储在 Snowflake 中的 CSV 文件。这些 CSV 文件将保存在“stagingBucketName”路径下。
可选 用于批处理
.withQuotationMark()
- 默认值:
‘
(单引号)。 - 接受包含一个字符的字符串。它将包围保存到 CSV 中的所有文本(字符串)字段。它应该是 Snowflake 的 FIELD_OPTIONALLY_ENCLOSED_BY 参数(双引号、单引号或无)接受的字符之一。
- 示例:
.withQuotationMark("'")
- 默认值:
流式写入(来自无界源)
需要在 Snowflake 控制台中创建一个 SnowPipe。SnowPipe 应使用与 .withStagingBucketName
和 .withStorageIntegrationName
方法指定的集成和存储桶相同的集成和存储桶。写入操作可能如下所示
data.apply(
SnowflakeIO.<type>write()
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withDataSourceConfiguration(dc)
.withUserDataMapper(mapper)
.withSnowPipe("MY_SNOW_PIPE")
.withFlushTimeLimit(Duration.millis(time))
.withFlushRowLimit(rowsNumber)
.withShardsNumber(shardsNumber)
)
参数
必需 用于流式传输
.withDataSourceConfiguration()
- 接受 DatasourceConfiguration 对象。
.to()
- 接受目标 Snowflake 表名称。
- 示例:
.to("MY_TABLE")
.withStagingBucketName()
- 接受以斜杠结尾的云存储桶路径。
- 示例:
.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
- 接受根据 Snowflake 文档创建的 Snowflake 存储集成对象的名称。
- 示例然后
.withSnowPipe()
接受目标 SnowPipe 名称。
.withSnowPipe()
接受 snowpipe 的确切名称。示例然后
注意:提供模式和数据库名称非常重要。
.withUserDataMapper()
- 接受 UserDataMapper 函数,该函数将把用户的 PCollection 映射到字符串值数组
(String[]).
- 接受 UserDataMapper 函数,该函数将把用户的 PCollection 映射到字符串值数组
注意:
如前所述,SnowflakeIO 在后台使用 SnowPipe REST 调用 从无界源进行写入。StagingBucketName 将用于保存最终将存储在 Snowflake 中的 CSV 文件。SnowflakeIO 不会在流式传输期间或完成后从“stagingBucketName”下的路径删除创建的 CSV 文件。
可选 用于流式传输
.withFlushTimeLimit()
- 默认值:30 秒
- 接受指定时间后的持续时间对象,之后将重复流式传输写入
- 示例:
.withFlushTimeLimit(Duration.millis(180000))
.withFlushRowLimit()
- 默认值:10,000 行
- 写入每个暂存文件的行数限制
- 示例:
.withFlushRowLimit(500000)
.withShardNumber()
- 默认值:1 个分片
- 每次刷新(用于并行写入)将保存的文件数量。
- 示例:
.withShardNumber(5)
.withQuotationMark()
- 默认值:
‘
(单引号)。 - 接受包含一个字符的字符串。它将包围保存到 CSV 中的所有文本(字符串)字段。它应该是 Snowflake 的 FIELD_OPTIONALLY_ENCLOSED_BY 参数(双引号、单引号或无)接受的字符之一。示例:.withQuotationMark("")(无引号)
- 默认值:
.withDebugMode()
- 接受
SnowflakeIO.StreamingLogLevel.INFO
- 显示有关加载文件的全部信息SnowflakeIO.StreamingLogLevel.ERROR
- 仅显示错误。
- 显示有关流式传输到 Snowflake 的文件的日志,类似于 insertReport。启用调试模式可能会影响性能。
- 示例:
.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)
- 接受
重要提示:
- 流式传输仅接受密钥对身份验证。有关详细信息,请参见:问题 21287。
- 在
SnowflakeIO.DataSourceConfiguration
对象中配置的角色参数将被忽略,以进行流式写入。有关详细信息,请参见:问题 21365
刷新时间:持续时间和行数
持续时间:流式写入将根据在刷新时间限制中指定的持续时间定期在阶段上写入文件(例如,每 1 分钟一次)。
行数:用于写入的阶段文件将具有在刷新行限制中指定的行数,除非达到刷新时间限制(例如,如果限制是 1000 行,缓冲区收集了 99 行,并且 1 分钟刷新时间过去,则这些行将发送到 SnowPipe 以进行插入)。
阶段文件的尺寸将取决于行的大小和使用的压缩(GZIP)。
UserDataMapper 函数
UserDataMapper
函数是必需的,用于将数据从 PCollection
映射到字符串值数组,然后 write()
操作将数据保存到临时 .csv
文件中。例如
其他写入选项
转换查询
.withQueryTransformation()
选项用于 write()
操作,它接受一个 SQL 查询作为字符串值,该查询将在将数据从暂存的 CSV 文件直接传输到目标 Snowflake 表时执行。有关转换 SQL 语法的更多信息,请参阅 Snowflake 文档。
用法
写入处置
通过为 write()
操作指定 .withWriteDisposition(...)
选项来定义基于将写入数据的表的行为。支持以下值
APPEND
- 默认行为。写入的数据将添加到表中现有的行中,EMPTY
- 目标表必须为空;否则,写入操作将失败,TRUNCATE
- 写入操作将在写入目标表之前删除表中的所有行。
用法示例
创建处置
.withCreateDisposition()
选项定义如果目标表不存在,写入操作的行为。支持以下值
CREATE_IF_NEEDED
- 默认行为。写入操作检查指定的目标表是否存在;如果不存在,写入操作将尝试创建表。使用.withTableSchema()
选项指定目标表的模式。CREATE_NEVER
- 如果目标表不存在,写入操作将失败。
用法
表模式处置
当 .withCreateDisposition()
选项设置为 CREATE_IF_NEEDED
时,.withTableSchema()
选项使您能够指定创建的目标表的模式。表模式是 SnowflakeColumn
对象的列表,其中名称和类型对应于表中每列的列类型。
用法
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
new SnowflakeColumn("id", new SnowflakeNumber()),
SnowflakeColumn.of("name", new SnowflakeText(), true));
data.apply(
SnowflakeIO.<~>write()
.withDataSourceConfiguration(dc)
.to("MY_TABLE")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withUserDataMapper(mapper)
.withTableSchema(tableSchema)
)
从 Snowflake 读取
SnowflakeIO 的功能之一是读取 Snowflake 表 - 可以通过表名称读取完整表,也可以通过查询读取自定义数据。读取转换的输出是用户定义数据类型的 PCollection。
一般用法
基本 .read()
操作的使用方法
PCollection<USER_DATA_TYPE> items = pipeline.apply(
SnowflakeIO.<USER_DATA_TYPE>read()
.withDataSourceConfiguration(dc)
.fromTable("MY_TABLE") // or .fromQuery("QUERY")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withCsvMapper(mapper)
.withCoder(coder));
)
.withDataSourceConfiguration(...)
- 接受 DataSourceConfiguration 对象。
.fromTable(...) 或 .fromQuery(...)
- 指定 Snowflake 表名称或自定义 SQL 查询。
.withStagingBucketName()
- 接受云存储桶名称。
.withStorageIntegrationName()
接受根据 Snowflake 文档创建的 Snowflake 存储集成对象的名称。例如
然后.withCsvMapper(mapper)
- 接受一个 CSVMapper 实例,用于将 String[] 映射到 USER_DATA_TYPE。
.withCoder(coder)
- 接受 Coder 用于 USER_DATA_TYPE。
注意:SnowflakeIO 在幕后使用 COPY
语句读取(使用 COPY to location)云存储中暂存的文件。StagingBucketName 将用作存储 CSV 文件的临时位置。这些临时目录将被命名为 sf_copy_csv_DATE_TIME_RANDOMSUFFIX
,并且一旦读取操作完成,它们将被自动删除。
CSVMapper
SnowflakeIO 使用 COPY INTO
CSVMapper 的作用是为用户提供将字符串数组转换为用户定义类型的可能性,例如 Avro 或 Parquet 文件的 GenericRecord,或自定义 POJO。
用于 GenericRecord 的 CsvMapper 的示例实现
将 SnowflakeIO 与 AWS S3 一起使用
为了能够使用 AWS S3 存储桶作为 stagingBucketName
,需要
- 创建
PipelineOptions
接口,该接口 扩展SnowflakePipelineOptions
和 S3Options,并带有AwsAccessKey
和AwsSecretKey
选项。例如
public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {
@Description("AWS Access Key")
@Default.String("access_key")
String getAwsAccessKey();
void setAwsAccessKey(String awsAccessKey);
@Description("AWS secret key")
@Default.String("secret_key")
String getAwsSecretKey();
void setAwsSecretKey(String awsSecretKey);
}
AwsAccessKey
和 AwsSecretKey
选项设置 AwsCredentialsProvider
选项。注意:请记住从 S3Options 中设置 awsRegion
。
在 Python SDK 中使用 SnowflakeIO
简介
Snowflake 跨语言实现支持 Python 编程语言的读写操作,这得益于跨语言,它是 可移植性框架路线图 的一部分,其目标是在整个 Beam 生态系统中提供完全的互操作性。从开发人员的角度来看,这意味着可以组合用不同语言编写的转换(Java/Python/Go)。
有关跨语言的更多信息,请参见 多 SDK 工作 和 跨语言转换 API 和扩展服务 文章。
其他资源
从 Snowflake 读取
SnowflakeIO 的功能之一是读取 Snowflake 表 - 既可以通过表名读取完整表,也可以通过查询读取自定义数据。读取转换的输出是用户定义数据类型的 PCollection。
一般用法
必需参数
server_name
包含帐户、区域和域的完整 Snowflake 服务器名称。schema
要使用的数据库中 Snowflake 架构的名称。database
要使用的 Snowflake 数据库的名称。staging_bucket_name
Google Cloud Storage 存储桶或 AWS S3 存储桶的名称。存储桶将用作存储 CSV 文件的临时位置。这些临时目录将被命名为sf_copy_csv_DATE_TIME_RANDOMSUFFIX
,并且一旦读取操作完成,它们将被自动删除。storage_integration_name
是根据 Snowflake 文档 创建的 Snowflake 存储集成对象的名称。csv_mapper
指定一个函数,该函数必须将用户定义的对象转换为字符串数组。SnowflakeIO 使用 COPY INTO语句将数据从 Snowflake 表移动到 GCS/S3 作为 CSV 文件。然后,这些文件通过 FileIO 下载并逐行处理。每行使用 OpenCSV 库拆分为字符串数组。csv_mapper 函数的作用是为用户提供将字符串数组转换为用户定义类型的可能性,例如 Avro 或 Parquet 文件的 GenericRecord,或自定义对象。例如 table
或query
指定 Snowflake 表名或自定义 SQL 查询
身份验证参数
需要传递以下有效的参数组合之一来进行身份验证
username
和password
指定用于用户名/密码身份验证方法的用户名和密码。private_key_path
和private_key_passphrase
指定用于密钥/对身份验证方法的私钥路径和密码。raw_private_key
和private_key_passphrase
指定用于密钥/对身份验证方法的私钥和密码。o_auth_token
指定用于 OAuth 身份验证方法的访问令牌。
其他参数
role
指定 Snowflake 角色。如果未指定,将使用用户的默认角色。warehouse
指定 Snowflake 仓库名称。如果未指定,将使用用户的默认仓库。expansion_service
指定扩展服务的 URL。
写入 Snowflake
SnowflakeIO 的功能之一是写入 Snowflake 表。此转换使您能够使用将用户 PCollection 发送到 Snowflake 数据库的输出操作来完成 Beam 管道。
一般用法
OPTIONS = ["--runner=FlinkRunner"]
with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
(p
| <SOURCE OF DATA>
| WriteToSnowflake(
server_name=<SNOWFLAKE SERVER NAME>,
username=<SNOWFLAKE USERNAME>,
password=<SNOWFLAKE PASSWORD>,
o_auth_token=<OAUTH TOKEN>,
private_key_path=<PATH TO P8 FILE>,
raw_private_key=<PRIVATE_KEY>
private_key_passphrase=<PASSWORD FOR KEY>,
schema=<SNOWFLAKE SCHEMA>,
database=<SNOWFLAKE DATABASE>,
staging_bucket_name=<GCS OR S3 BUCKET>,
storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
create_disposition=<CREATE DISPOSITION>,
write_disposition=<WRITE DISPOSITION>,
table_schema=<SNOWFLAKE TABLE SCHEMA>,
user_data_mapper=<USER DATA MAPPER FUNCTION>,
table=<SNOWFLAKE TABLE>,
query=<IF NOT TABLE THEN QUERY>,
role=<SNOWFLAKE ROLE>,
warehouse=<SNOWFLAKE WAREHOUSE>,
expansion_service=<EXPANSION SERVICE ADDRESS>))
必需参数
server_name
包含帐户、区域和域的完整 Snowflake 服务器名称。schema
要使用的数据库中 Snowflake 架构的名称。database
要使用的 Snowflake 数据库的名称。staging_bucket_name
Google Cloud Storage 存储桶或 AWS S3 存储桶的路径,以斜杠结尾。存储桶将用于保存最终将进入 Snowflake 的 CSV 文件。这些 CSV 文件将保存在“staging_bucket_name”路径下。storage_integration_name
是根据 Snowflake 文档 创建的 Snowflake 存储集成对象的名称。user_data_mapper
指定一个函数,该函数在写入操作将数据保存到临时 .csv 文件之前,将 PCollection 中的数据映射到字符串值数组。例如table
或query
指定 Snowflake 表名或自定义 SQL 查询
身份验证参数
需要传递以下有效的参数组合之一来进行身份验证
username
和password
指定用户名/密码身份验证方法。private_key_path
和private_key_passphrase
指定用于密钥/对身份验证方法的私钥路径和密码。raw_private_key
和private_key_passphrase
指定用于密钥/对身份验证方法的私钥和密码。o_auth_token
指定用于 OAuth 身份验证方法的访问令牌。
其他参数
role
指定 Snowflake 角色。如果未指定,将使用用户的默认角色。warehouse
指定 Snowflake 仓库名称。如果未指定,将使用用户的默认仓库。create_disposition
定义目标表不存在时写入操作的行为。支持以下值CREATE_IF_NEEDED
- 默认行为。写入操作检查指定的 target 表是否存在;如果不存在,则写入操作尝试创建该表。使用 table_schema 参数指定目标表的架构。CREATE_NEVER
- 如果目标表不存在,写入操作将失败。
write_disposition
根据将要写入数据的表定义写入行为。支持以下值APPEND
- 默认行为。写入的数据将添加到表中现有的行中,EMPTY
- 目标表必须为空;否则,写入操作将失败,TRUNCATE
- 写入操作将在写入目标表之前删除表中的所有行。
table_schema
当create_disposition
参数设置为 CREATE_IF_NEEDED 时,table_schema 参数使您可以指定创建的目标表的架构。表架构是一个 JSON 数组,具有以下结构所有支持的数据类型您可以在 Snowflake 数据类型 中了解有关 Snowflake 数据类型的更多信息。{"type":"date"}, {"type":"datetime"}, {"type":"time"}, {"type":"timestamp"}, {"type":"timestamp_ltz"}, {"type":"timestamp_ntz"}, {"type":"timestamp_tz"}, {"type":"boolean"}, {"type":"decimal","precision":38,"scale":1}, {"type":"double"}, {"type":"float"}, {"type":"integer","precision":38,"scale":0}, {"type":"number","precision":38,"scale":1}, {"type":"numeric","precision":38,"scale":2}, {"type":"real"}, {"type":"array"}, {"type":"object"}, {"type":"variant"}, {"type":"binary","size":null}, {"type":"char","length":1}, {"type":"string","length":null}, {"type":"text","length":null}, {"type":"varbinary","size":null}, {"type":"varchar","length":100}]
expansion_service
指定扩展服务的 URL。
限制
SnowflakeIO 目前存在以下限制。
流式写入仅支持密钥对身份验证。有关详细信息,请参见:问题 21287。
在
SnowflakeIO.DataSourceConfiguration
对象中配置的角色参数将被忽略,以进行流式写入。有关详细信息,请参见:问题 21365
最后更新时间:2024/10/31
您是否找到了您要查找的所有内容?
所有内容是否都实用且清晰?您想更改任何内容吗?请告诉我们!