Google BigQuery I/O 连接器
- Java SDK
- Python SDK
Beam SDK 包含可以从 Google BigQuery 表读取数据和向其写入数据的内置转换。
在开始之前
要使用 BigQueryIO,请将 Maven 工件依赖项添加到你的 pom.xml
文件中。
其他资源
要使用 BigQueryIO,您必须通过运行 pip install apache-beam[gcp]
安装 Google Cloud Platform 依赖项。
其他资源
BigQuery 基础
表名
要从 BigQuery 表读取或写入 BigQuery 表,您必须提供一个完全限定的 BigQuery 表名(例如,bigquery-public-data:github_repos.sample_contents
)。完全限定的 BigQuery 表名由三个部分组成
- 项目 ID:Google Cloud 项目的 ID。默认值来自您的管道选项对象。
- 数据集 ID:BigQuery 数据集 ID,在给定 Cloud 项目中是唯一的。
- 表 ID:BigQuery 表 ID,在给定数据集中是唯一的。
要指定 BigQuery 表,您可以使用表的完全限定名称作为字符串,也可以使用 TableReference TableReference 对象。
使用字符串
要使用字符串指定表,请使用格式 [project_id]:[dataset_id].[table_id]
指定完全限定的 BigQuery 表名。
您也可以省略 project_id
并使用 [dataset_id].[table_id]
格式。如果您省略了项目 ID,Beam 将使用您 管道选项. 管道选项. 中的默认项目 ID。
使用 TableReference
要使用 TableReference
指定表,请使用 BigQuery 表名的三个部分创建一个新的 TableReference
。
Beam SDK for Java 还提供了 parseTableSpec
帮助程序方法,该方法使用包含完全限定 BigQuery 表名的字符串构造一个 TableReference
对象。但是,BigQueryIO 转换的静态工厂方法接受表名作为字符串,并为您构造一个 TableReference
对象。
表行
BigQueryIO 读取和写入转换生成并使用字典的 PCollection
,其中 PCollection
中的每个元素表示表中的一行。
模式
写入 BigQuery 时,您必须为要写入的目标表提供表模式,除非您指定 CREATE_NEVER
的 创建处置。 创建表模式 更详细地介绍了模式。
数据类型
BigQuery 支持以下数据类型:STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME 和 GEOGRAPHY。有关 Google Standard SQL 数据类型的概述,请参阅 数据类型。BigQueryIO 允许您使用所有这些数据类型。以下示例显示了从 BigQuery 读取和写入 BigQuery 时使用的数据类型的正确格式
import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class BigQueryTableRowCreate {
public static TableRow createTableRow() {
TableRow row =
new TableRow()
// To learn more about BigQuery data types:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
.set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
.set("int64_field", 432)
.set("float64_field", 3.141592653589793)
.set("numeric_field", new BigDecimal("1234.56").toString())
.set("bool_field", true)
.set(
"bytes_field",
Base64.getEncoder()
.encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))
// To learn more about date formatting:
// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
.set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
.set(
"datetime_field",
LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
.set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
.set(
"timestamp_field",
Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT
// To learn more about the geography Well-Known Text (WKT) format:
// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
.set("geography_field", "POINT(30 10)")
// An array has its mode set to REPEATED.
.set("array_field", Arrays.asList(1, 2, 3, 4))
// Any class can be written as a STRUCT as long as all the fields in the
// schema are present and they are encoded correctly as BigQuery types.
.set(
"struct_field",
Stream.of(
new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
new SimpleEntry<>("int64_value", "42"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
return row;
}
}
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
从 Beam 2.7.0 开始,支持 NUMERIC 数据类型。此数据类型支持高精度小数(精度为 38 位,小数位数为 9 位)。GEOGRAPHY 数据类型与 Well-Known Text 结合使用(请参阅 https://en.wikipedia.org/wiki/Well-known_text 格式,用于读取和写入 BigQuery。BigQuery IO 要求 BYTES 数据类型的值在写入 BigQuery 时使用 base64 编码。从 BigQuery 读取字节时,它们将作为 base64 编码的字符串返回。
从 Beam 2.7.0 开始,支持 NUMERIC 数据类型。此数据类型支持高精度小数(精度为 38 位,小数位数为 9 位)。GEOGRAPHY 数据类型与 Well-Known Text 结合使用(请参阅 https://en.wikipedia.org/wiki/Well-known_text 格式,用于读取和写入 BigQuery。BigQuery IO 要求 BYTES 数据类型的值在写入 BigQuery 时使用 base64 编码。从 BigQuery 读取字节时,它们将作为 base64 编码的字节返回。
从 BigQuery 读取
BigQueryIO 允许您从 BigQuery 表读取,或执行 SQL 查询并读取结果。默认情况下,当您应用 BigQueryIO 读取转换时,Beam 会调用 BigQuery 导出请求。但是,Beam SDK for Java 还支持使用 BigQuery 存储读取 API 直接从 BigQuery 存储读取。有关更多信息,请参阅 使用存储读取 API。
Java 版 Beam SDK 有两种 BigQueryIO 读取方法。两种方法都允许您从表格读取数据,或者使用查询字符串读取字段。
read(SerializableFunction)
读取 Avro 格式的记录,并使用指定的解析函数将它们解析为自定义类型对象的PCollection
。PCollection
中的每个元素代表表格中的一行。有关使用查询字符串读取的 示例代码 演示了如何使用read(SerializableFunction)
。readTableRows
返回包含 BigQueryTableRow
对象的PCollection
。PCollection
中的每个元素代表表格中的一行。TableRow
对象中的整数值被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。这种方法很方便,但性能比read(SerializableFunction)
慢 2-3 倍。有关从表格读取的 示例代码 演示了如何使用readTableRows
。
注意: 从 Beam SDK 2.2.0 开始,BigQueryIO.read()
已弃用。请改用 read(SerializableFunction<SchemaAndRecord, T>)
将 BigQuery 行从 Avro GenericRecord
解析为您的自定义类型,或者使用 readTableRows()
将它们解析为 JSON TableRow
对象。
要使用 Python 版 Beam SDK 从 BigQuery 表格读取数据,请应用 ReadFromBigQuery
变换。ReadFromBigQuery
返回包含字典的 PCollection
,其中 PCollection
中的每个元素代表表格中的一行。TableRow
对象中的整数值被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。
注意: 从 Beam SDK 2.25.0 开始,BigQuerySource()
已弃用。在 2.25.0 之前,要使用 Beam SDK 从 BigQuery 表格读取数据,请在 BigQuerySource
上应用 Read
变换。例如,beam.io.Read(beam.io.BigQuerySource(table_spec))
。
从表中读取
要读取整个 BigQuery 表格,请使用带有 BigQuery 表格名称的 from
方法。此示例使用 readTableRows
。
要读取整个 BigQuery 表格,请使用带有 BigQuery 表格名称的 table
参数。
以下代码读取包含气象站数据的整个表格,然后提取 max_temperature
列。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTable {
public static PCollection<MyData> readFromTable(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
使用查询字符串读取
如果您不想读取整个表格,可以使用 fromQuery
方法提供查询字符串。
如果您不想读取整个表格,可以使用 query
参数向 ReadFromBigQuery
提供查询字符串。
以下代码使用 SQL 查询仅读取 max_temperature
列。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQuery {
public static PCollection<MyData> readFromQuery(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows()
.fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
.usingStandardSql())
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
您也可以使用 BigQuery 的标准 SQL 方言和查询字符串,如下例所示。
max_temperatures = (
pipeline
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
查询执行项目
默认情况下,管道在与管道关联的 Google Cloud 项目中执行查询(对于 Dataflow 运行程序,它是在管道运行的项目)。在某些情况下,查询执行项目应该与管道项目不同。如果您使用 Java SDK,可以通过将管道选项“bigQueryProject”设置为所需的 Google Cloud 项目 ID 来定义查询执行项目。
使用存储读取 API
您可以使用 BigQuery Storage API 直接访问 BigQuery 存储中的表格,并支持诸如列选择和谓词过滤器下推之类的功能,这些功能可以提高管道执行效率。
Java 版 Beam SDK 支持在从 BigQuery 读取数据时使用 BigQuery Storage API。2.25.0 之前的 SDK 版本将 BigQuery Storage API 作为 实验性功能 支持,并使用预发布的 BigQuery Storage API 表面。调用者应该将使用 BigQuery Storage API 的管道迁移到 SDK 版本 2.25.0 或更高版本。
Python 版 Beam SDK 支持 BigQuery Storage API。通过将 method=DIRECT_READ
作为参数传递给 ReadFromBigQuery
来启用它。
更新你的代码
在您从表格读取数据时,请使用以下方法。
- 必需:指定 withMethod(Method.DIRECT_READ) 以将 BigQuery Storage API 用于读取操作。
- 可选:要使用诸如 列投影和列过滤 之类的功能,您必须分别指定 withSelectedFields 和 withRowRestriction。
以下代码片段从表格读取数据。此示例来自 BigQueryTornadoes 示例。当示例的读取方法选项设置为 DIRECT_READ
时,管道使用 BigQuery Storage API 和列投影从 BigQuery 表格中读取公共天气数据样本。您可以在 GitHub 上查看完整源代码。
import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTableWithBigQueryStorageAPI {
public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.from(String.format("%s:%s.%s", project, dataset, table))
.withMethod(Method.DIRECT_READ)
.withSelectedFields(
Arrays.asList(
"string_field",
"int64_field",
"float64_field",
"numeric_field",
"bool_field",
"bytes_field",
"date_field",
"datetime_field",
"time_field",
"timestamp_field",
"geography_field",
"array_field",
"struct_field")))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
以下代码片段使用查询字符串读取数据。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQueryWithBigQueryStorageAPI {
public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
String project, String dataset, String table, String query, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
/*
String query = String.format("SELECT\n" +
" string_field,\n" +
" int64_field,\n" +
" float64_field,\n" +
" numeric_field,\n" +
" bool_field,\n" +
" bytes_field,\n" +
" date_field,\n" +
" datetime_field,\n" +
" time_field,\n" +
" timestamp_field,\n" +
" geography_field,\n" +
" array_field,\n" +
" struct_field\n" +
"FROM\n" +
" `%s:%s.%s`", project, dataset, table)
*/
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
写入 BigQuery
BigQueryIO 允许您写入 BigQuery 表格。如果您使用的是 Java 版 Beam SDK,您可以将不同的行写入不同的表格。Java 版 Beam SDK 还支持使用 BigQuery Storage Write API 直接写入 BigQuery 存储。有关更多信息,请参阅 使用 Storage Write API。
当您应用写入变换时,必须为目标表格提供以下信息。
- 表格名称。
- 目标表格的创建处置。创建处置指定目标表格必须存在还是可以由写入操作创建。
- 目标表格的写入处置。写入处置指定您写入的数据是替换现有表格,将行追加到现有表格,还是仅写入空表格。
此外,如果您的写入操作创建了新的 BigQuery 表格,您还必须为目标表格提供表格架构。
创建处置
创建处置控制您的 BigQuery 写入操作是否应该在目标表格不存在时创建表格。
使用 .withCreateDisposition
指定创建处置。有效的枚举值为
Write.CreateDisposition.CREATE_IF_NEEDED
:指定如果不存在表格,写入操作应该创建新的表格。如果您使用此值,则必须使用withSchema
方法提供表格架构。CREATE_IF_NEEDED
是默认行为。Write.CreateDisposition.CREATE_NEVER
:指定绝不创建表格。如果目标表格不存在,写入操作将失败。
使用 create_disposition
参数指定创建处置。有效的枚举值为
BigQueryDisposition.CREATE_IF_NEEDED
:指定如果不存在表格,写入操作应该创建新的表格。如果您使用此值,则必须提供表格架构。CREATE_IF_NEEDED
是默认行为。BigQueryDisposition.CREATE_NEVER
:指定绝不创建表格。如果目标表格不存在,写入操作将失败。
如果您指定 CREATE_IF_NEEDED
作为创建处置,并且您没有提供表格架构,则如果目标表格不存在,变换可能会在运行时失败。
写入处置
写入处置控制您的 BigQuery 写入操作如何应用于现有表格。
使用 .withWriteDisposition
指定写入处置。有效的枚举值为
Write.WriteDisposition.WRITE_EMPTY
:指定如果目标表格不为空,写入操作应该在运行时失败。WRITE_EMPTY
是默认行为。Write.WriteDisposition.WRITE_TRUNCATE
:指定写入操作应该替换现有表格。目标表格中的所有现有行都将被删除,并将新的行添加到表格中。Write.WriteDisposition.WRITE_APPEND
:指定写入操作应该将行追加到现有表格的末尾。
使用 write_disposition
参数指定写入处置。有效的枚举值为
BigQueryDisposition.WRITE_EMPTY
:指定如果目标表格不为空,写入操作应该在运行时失败。WRITE_EMPTY
是默认行为。BigQueryDisposition.WRITE_TRUNCATE
:指定写入操作应该替换现有表格。目标表格中的所有现有行都将被删除,并将新的行添加到表格中。BigQueryDisposition.WRITE_APPEND
:指定写入操作应该将行追加到现有表格的末尾。
当您使用 WRITE_EMPTY
时,对目标表格是否为空的检查可能会在实际写入操作之前发生。此检查不能保证您的管道将独占访问表格。两个并发的管道将数据写入同一个输出表格,并使用 WRITE_EMPTY
写入处置,可能会成功启动,但两个管道都可能在写入尝试发生后失败。
创建表模式
如果您的 BigQuery 写入操作创建了新的表格,则必须提供架构信息。架构包含有关表格中每个字段的信息。当使用新的架构更新管道时,现有架构字段必须保持相同的顺序,否则管道将中断,无法写入 BigQuery。
要在 Java 中创建表格架构,您可以使用 TableSchema
对象,或者使用包含 JSON 序列化 TableSchema
对象的字符串。
要在 Python 中创建表格架构,您可以使用 TableSchema
对象,或者使用定义字段列表的字符串。基于单个字符串的架构不支持嵌套字段、重复字段或为字段指定 BigQuery 模式(模式始终设置为 NULLABLE
)。
使用 TableSchema
要创建并使用表格架构作为 TableSchema
对象,请执行以下步骤。
创建
TableFieldSchema
对象列表。每个TableFieldSchema
对象代表表格中的一个字段。创建
TableSchema
对象,并使用setFields
方法指定您的字段列表。当您应用写入变换时,使用
withSchema
方法提供表格架构。
创建
TableSchema
对象。为表格中的每个字段创建并追加
TableFieldSchema
对象。当您应用写入变换时,使用
schema
参数提供表格架构。将参数的值设置为TableSchema
对象。
以下示例代码演示了如何为包含两个字符串类型字段(源和引用)的表格创建 TableSchema
。
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
class BigQuerySchemaCreate {
public static TableSchema createSchema() {
// To learn more about BigQuery schemas:
// https://cloud.google.com/bigquery/docs/schemas
TableSchema schema =
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema()
.setName("string_field")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("int64_field")
.setType("INT64")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("float64_field")
.setType("FLOAT64"), // default mode is "NULLABLE"
new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
new TableFieldSchema().setName("bool_field").setType("BOOL"),
new TableFieldSchema().setName("bytes_field").setType("BYTES"),
new TableFieldSchema().setName("date_field").setType("DATE"),
new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
new TableFieldSchema().setName("time_field").setType("TIME"),
new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
new TableFieldSchema()
.setName("array_field")
.setType("INT64")
.setMode("REPEATED")
.setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
new TableFieldSchema()
.setName("struct_field")
.setType("STRUCT")
.setDescription(
"A STRUCT accepts a custom data class, the fields must match the custom class fields.")
.setFields(
Arrays.asList(
new TableFieldSchema().setName("string_value").setType("STRING"),
new TableFieldSchema().setName("int64_value").setType("INT64")))));
return schema;
}
}
使用字符串
要创建并使用表格架构作为包含 JSON 序列化 TableSchema
对象的字符串,请执行以下步骤。
创建包含 JSON 序列化
TableSchema
对象的字符串。当您应用写入变换时,使用
withJsonSchema
方法提供表格架构。
要创建并使用表格架构作为字符串,请执行以下步骤。
创建形式为“field1:type1,field2:type2,field3:type3” 的单一逗号分隔字符串,该字符串定义字段列表。类型应指定字段的 BigQuery 类型。
使用
schema
参数在应用写入转换时提供表架构。将参数的值设置为字符串。
以下示例演示如何使用字符串指定与上一个示例相同的表架构。
设置插入方法
BigQueryIO 支持两种将数据插入 BigQuery 的方法:加载作业和流式插入。每种插入方法在成本、配额和数据一致性方面提供了不同的权衡。有关这些权衡的更多信息,请参阅 BigQuery 文档以了解不同的数据摄取选项(具体而言,加载作业和流式插入)。
BigQueryIO 根据输入PCollection
选择默认的插入方法。可以使用withMethod
指定所需的插入方法。有关可用方法及其限制的列表,请参阅Write.Method
。
BigQueryIO 根据输入PCollection
选择默认的插入方法。可以使用method
指定所需的插入方法。有关可用方法及其限制的列表,请参阅WriteToBigQuery
。
BigQueryIO 在以下情况下使用加载作业
- 当将 BigQueryIO 写入转换应用于有界
PCollection
时。 - 当使用
BigQueryIO.write().withMethod(FILE_LOADS)
指定加载作业作为插入方法时。
- 当将 BigQueryIO 写入转换应用于有界
PCollection
时。 - 当使用
WriteToBigQuery(method='FILE_LOADS')
指定加载作业作为插入方法时。
注意: 如果在流式管道中使用批次加载
必须使用withTriggeringFrequency
指定用于启动加载作业的触发频率。注意设置频率,以确保管道不会超过 BigQuery 加载作业的配额限制。
可以使用withNumFileShards
明确设置写入的文件分片数,或者使用withAutoSharding
启用动态分片(从 2.29.0 版本开始),分片数可能在运行时确定和更改。分片行为取决于运行程序。
必须使用triggering_frequency
指定用于启动加载作业的触发频率。注意设置频率,以确保管道不会超过 BigQuery 加载作业的配额限制。
可以设置with_auto_sharding=True
启用动态分片(从 2.29.0 版本开始)。分片数可能在运行时确定和更改。分片行为取决于运行程序。
BigQueryIO 在以下情况下使用流式插入
- 当将 BigQueryIO 写入转换应用于无界
PCollection
时。 - 当使用
BigQueryIO.write().withMethod(STREAMING_INSERTS)
指定流式插入作为插入方法时。
- 当将 BigQueryIO 写入转换应用于无界
PCollection
时。 - 当使用
WriteToBigQuery(method='STREAMING_INSERTS')
指定流式插入作为插入方法时。
注意: 默认情况下,流式插入会启用 BigQuery 的尽力而为的重复数据删除机制。可以通过设置ignoreInsertIds
来禁用该机制。配额限制在启用重复数据删除与禁用重复数据删除时有所不同。
流式插入会为每个表目标应用默认的分片。可以使用withAutoSharding
(从 2.28.0 版本开始)启用动态分片,分片数可能在运行时确定和更改。分片行为取决于运行程序。
注意: 默认情况下,流式插入会启用 BigQuery 的尽力而为的重复数据删除机制。可以通过设置ignore_insert_ids=True
来禁用该机制。配额限制在启用重复数据删除与禁用重复数据删除时有所不同。
流式插入会为每个表目标应用默认的分片。可以设置with_auto_sharding=True
(从 2.29.0 版本开始)启用动态分片。分片数可能在运行时确定和更改。分片行为取决于运行程序。
写入表
要写入 BigQuery 表,请应用writeTableRows
或write
转换。
要写入 BigQuery 表,请应用WriteToBigQuery
转换。WriteToBigQuery
支持批处理模式和流式模式。必须将转换应用于字典的PCollection
。通常,需要使用其他转换(例如ParDo
)将输出数据格式化为集合。
以下示例使用包含引用的PCollection
。
writeTableRows
方法将 BigQueryTableRow
对象的PCollection
写入 BigQuery 表。PCollection
中的每个元素代表表中的一行。本示例使用writeTableRows
将元素写入PCollection<TableRow>
。写入操作将在需要时创建表。如果表已存在,则会将其替换。
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;
class BigQueryWriteToTable {
public static void writeToTable(
String project,
String dataset,
String table,
TableSchema schema,
PCollection<TableRow> rows) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));
// Pipeline pipeline = Pipeline.create();
// PCollection<TableRow> rows = ...
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
// For CreateDisposition:
// - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
// required
// - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
// For WriteDisposition:
// - WRITE_EMPTY (default): raises an error if the table is not empty
// - WRITE_APPEND: appends new rows to existing rows
// - WRITE_TRUNCATE: deletes the existing rows before writing
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
// pipeline.run().waitUntilFinish();
}
}
以下示例代码展示了如何应用WriteToBigQuery
转换将字典的PCollection
写入 BigQuery 表。写入操作将在需要时创建表。如果表已存在,则会将其替换。
write
转换将自定义类型对象的PCollection
写入 BigQuery 表。使用.withFormatFunction(SerializableFunction)
提供格式化函数,将PCollection
中的每个输入元素转换为TableRow
。本示例使用write
写入PCollection<String>
。写入操作将在需要时创建表。如果表已存在,则会将其替换。
quotes.apply(
BigQueryIO.<Quote>write()
.to(tableSpec)
.withSchema(tableSchema)
.withFormatFunction(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
使用流式插入时,可以决定如何处理失败的记录。可以继续重试,或者使用WriteResult.getFailedInserts()
方法将失败的记录返回到单独的PCollection
中。
使用存储写入 API
从 Beam SDK for Java 的 2.36.0 版本开始,可以使用 BigQueryIO 连接器中的BigQuery 存储写入 API。
从 Beam SDK for Python 的 2.47.0 版本开始,SDK 也支持 BigQuery 存储写入 API。
Python SDK 的 BigQuery 存储写入 API 目前对支持的数据类型有一些限制。由于此方法使用了跨语言转换,因此我们仅限于跨语言边界支持的类型。例如,需要apache_beam.utils.timestamp.Timestamp
才能写入TIMESTAMP
BigQuery 类型。此外,某些类型(例如DATETIME
)尚不支持。有关更多详细信息,请参阅完整的类型映射。
注意: 如果要从源代码运行使用存储写入 API 的 WriteToBigQuery,需要运行./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
来构建 expansion-service jar。如果从发布的 Beam SDK 运行,则 jar 已包含在内。
恰好一次语义
要使用存储写入 API 写入 BigQuery,请将withMethod
设置为Method.STORAGE_WRITE_API
。以下示例转换使用存储写入 API 和恰好一次语义写入 BigQuery
如果要更改 BigQueryIO 的行为,以便管道的所有 BigQuery 接收器默认使用存储写入 API,请设置UseStorageWriteApi
选项。
如果管道需要创建表(如果表不存在且已将创建处置指定为CREATE_IF_NEEDED
),则必须提供表架构。API 使用架构验证数据并将其转换为二进制协议。
对于流式管道,需要设置两个附加参数:流数和触发频率。
流数定义了 BigQueryIO 写入转换的并行性,大致对应于管道使用的存储写入 API 流数。可以通过withNumStorageWriteApiStreams
在转换中明确设置,或者在管道中提供numStorageWriteApiStreams
选项,如BigQueryOptions
中所定义。请注意,这仅受流式管道支持。
触发频率决定数据在 BigQuery 中可用于查询的时间。可以通过withTriggeringFrequency
明确设置,或者通过设置storageWriteApiTriggeringFrequencySec
选项指定秒数。
这两个参数的组合会影响 BigQueryIO 在调用存储写入 API 之前创建的行批次的大小。将频率设置得太高会导致批次更小,从而影响性能。一般而言,单个流应该能够处理至少每秒 1Mb 的吞吐量。创建独占流对于 BigQuery 服务来说是一个昂贵的操作,因此应该仅根据用例的需要使用流。对于大多数管道来说,触发频率在个位秒是不错的选择。
与流式插入类似,STORAGE_WRITE_API
支持动态确定写入 BigQuery 的并行流数(从 2.42.0 版本开始)。可以使用withAutoSharding
明确启用此功能。
当numStorageWriteApiStreams
设置为 0 或未指定时,STORAGE_WRITE_API
默认使用动态分片。
使用STORAGE_WRITE_API
时,WriteResult.getFailedStorageApiInserts
返回的PCollection
包含未能写入存储写入 API 接收器的行。
至少一次语义
如果用例允许目标表中存在潜在的重复记录,可以使用STORAGE_API_AT_LEAST_ONCE
方法。此方法不会将要写入 BigQuery 的记录持久化到其混洗存储中,这是提供STORAGE_WRITE_API
方法的恰好一次语义所必需的。因此,对于大多数管道来说,使用此方法通常更便宜,并且延迟更低。如果使用STORAGE_API_AT_LEAST_ONCE
,则不需要指定流数,并且不能指定触发频率。
自动分片不适用于STORAGE_API_AT_LEAST_ONCE
。
使用STORAGE_API_AT_LEAST_ONCE
时,WriteResult.getFailedStorageApiInserts
返回的PCollection
包含未能写入存储写入 API 接收器的行。
配额
使用存储写入 API 之前,请注意BigQuery 存储写入 API 配额。
使用动态目标
可以使用动态目标功能将PCollection
中的元素写入不同的 BigQuery 表,这些表可能具有不同的架构。
动态目标功能按用户定义的目标键对用户类型进行分组,使用该键计算目标表和/或架构,并将每组的元素写入计算出的目标。
此外,还可以编写自己的类型,这些类型具有映射到TableRow
的映射函数,并且可以在所有DynamicDestinations
方法中使用侧输入。
要使用动态目标,必须创建一个DynamicDestinations
对象,并实现以下方法
getDestination
: 返回一个对象,getTable
和getSchema
可以使用该对象作为目标键来计算目标表和/或架构。getTable
: 返回目标键的表(作为TableDestination
对象)。此方法必须为每个唯一的目标返回唯一的表。getSchema
: 返回目标键的表架构(作为TableSchema
对象)。
然后,使用 write().to
和您的 DynamicDestinations
对象。此示例使用包含天气数据的 PCollection
,并将数据写入每个年份的不同表。
/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
final long year;
final long month;
final long day;
final double maxTemp;
public WeatherData() {
this.year = 0;
this.month = 0;
this.day = 0;
this.maxTemp = 0.0f;
}
public WeatherData(long year, long month, long day, double maxTemp) {
this.year = year;
this.month = month;
this.day = day;
this.maxTemp = maxTemp;
}
}
*/
PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
+ "WHERE year BETWEEN 2007 AND 2009")
.withCoder(AvroCoder.of(WeatherData.class)));
// We will send the weather data into different tables for every year.
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(
new DynamicDestinations<WeatherData, Long>() {
@Override
public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
return elem.getValue().year;
}
@Override
public TableDestination getTable(Long destination) {
return new TableDestination(
new TableReference()
.setProjectId(writeProject)
.setDatasetId(writeDataset)
.setTableId(writeTable + "_" + destination),
"Table for year " + destination);
}
@Override
public TableSchema getSchema(Long destination) {
return new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("year")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("month")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("day")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("maxTemp")
.setType("FLOAT")
.setMode("NULLABLE")));
}
})
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
('Obi Wan Kenobi', True)]))
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
使用时间分区
BigQuery 时间分区将您的表划分为较小的分区,称为 分区表。分区表使您更轻松地管理和查询数据。
要使用 BigQuery 时间分区,请使用以下两种方法之一
withTimePartitioning
: 此方法接受TimePartitioning
类,并且仅在您写入单个表时才可用。withJsonTimePartitioning
: 此方法与withTimePartitioning
相同,但接受 JSON 序列化字符串对象。
此示例每天生成一个分区。
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(tableSpec + "_partitioning")
.withSchema(tableSchema)
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
// NOTE: an existing table without time partitioning set up will not work
.withTimePartitioning(new TimePartitioning().setType("DAY"))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
限制
BigQueryIO 目前有以下限制。
您无法将 BigQuery 写入的完成与管道中的其他步骤进行排序。
如果您使用的是 Python 版 Beam SDK,则在写入非常大的数据集时,可能会遇到导入大小配额问题。作为一种解决方法,您可以对数据集进行分区(例如,使用 Beam 的
Partition
变换),并写入多个 BigQuery 表。Java 版 Beam SDK 没有此限制,因为它会为您对数据集进行分区。当您 将数据加载 到 BigQuery 时,将应用 这些限制。默认情况下,BigQuery 使用共享的插槽池来加载数据。这意味着无法保证可用容量,并且您的加载可能会排队,直到有插槽可用。如果插槽在 6 小时内没有可用,则加载将因 BigQuery 设置的限制而失败。为避免这种情况,强烈建议您使用 BigQuery 预留,以确保您的加载不会因容量问题而排队并失败。
其他示例
您可以在 Beam 的示例目录中找到使用 BigQuery 的其他示例。
Java 食谱示例
这些示例来自 Java 版 cookbook 示例 目录。
BigQueryTornadoes 从 BigQuery 中读取公开的样本天气数据,统计每个月出现的龙卷风数量,并将结果写入 BigQuery 表。
CombinePerKeyExamples 从 BigQuery 中读取公开的莎士比亚数据,并为数据集中超过给定长度的每个单词生成一个字符串,其中包含该单词出现的剧本名称列表。然后,管道将结果写入 BigQuery 表。
FilterExamples 从 BigQuery 中读取公开的样本天气数据,对数据进行投影,找到温度读数的全局平均值,筛选单个给定月份的读数,并且仅输出(针对该月份的)平均温度低于推导的全局平均值的温度数据。
JoinExamples 从 BigQuery 中读取 GDELT“世界事件” 的样本,并将事件
action
国家代码与将国家代码映射到国家名称的表进行联接。MaxPerKeyExamples 从 BigQuery 中读取公开的样本天气数据,找到每个月的最高温度,并将结果写入 BigQuery 表。
TriggerExample 对来自圣地亚哥高速公路的流量数据进行流式分析。管道查看来自文本文件的传入数据,并将结果写入 BigQuery 表。
Java 完整示例
这些示例来自 Java 版 完整示例 目录。
AutoComplete 计算每个前缀最受欢迎的哈希标签,可用于自动完成。管道可以选择将结果写入 BigQuery 表。
StreamingWordExtract 读取文本行,将每行拆分为单个单词,将这些单词大写,并将输出写入 BigQuery 表。
TrafficMaxLaneFlow 读取交通传感器数据,找到记录流量最高的通道,并将结果写入 BigQuery 表。
TrafficRoutes 读取交通传感器数据,计算每个窗口的平均速度,并查找路线中的减速,并将结果写入 BigQuery 表。
Python 食谱示例
这些示例来自 Python 版 cookbook 示例 目录。
BigQuery 架构 创建具有嵌套和重复字段的
TableSchema
,生成具有嵌套和重复字段的数据,并将数据写入 BigQuery 表。BigQuery 侧输入 使用 BigQuery 源作为侧输入。它说明了如何在三种不同的形式中将侧输入插入转换:作为单例、作为迭代器和作为列表。
BigQuery 龙卷风 从 BigQuery 表中读取包含“月份”和“龙卷风”字段的表架构,计算每个月的龙卷风数量,并将结果输出到 BigQuery 表。
BigQuery 筛选器 从 BigQuery 表中读取气象站数据,在内存中操作 BigQuery 行,并将结果写入 BigQuery 表。
最后更新于 2024/10/31
您是否找到了所有需要的内容?
它们是否有用且清晰?您想更改什么?请告诉我们!