内置 I/O 转换

Google BigQuery I/O 连接器

Beam SDK 包含可以从 Google BigQuery 表读取数据和向其写入数据的内置转换。

在开始之前

要使用 BigQueryIO,请将 Maven 工件依赖项添加到你的 pom.xml 文件中。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.60.0</version>
</dependency>

其他资源

要使用 BigQueryIO,您必须通过运行 pip install apache-beam[gcp] 安装 Google Cloud Platform 依赖项。

其他资源

BigQuery 基础

表名

要从 BigQuery 表读取或写入 BigQuery 表,您必须提供一个完全限定的 BigQuery 表名(例如,bigquery-public-data:github_repos.sample_contents)。完全限定的 BigQuery 表名由三个部分组成

如果您使用的是 时间分区表,表名还可以包含 表修饰符

要指定 BigQuery 表,您可以使用表的完全限定名称作为字符串,也可以使用 TableReference TableReference 对象。

使用字符串

要使用字符串指定表,请使用格式 [project_id]:[dataset_id].[table_id] 指定完全限定的 BigQuery 表名。

String tableSpec = "apache-beam-testing.samples.weather_stations";
# project-id:dataset_id.table_id
table_spec = 'apache-beam-testing.samples.weather_stations'

您也可以省略 project_id 并使用 [dataset_id].[table_id] 格式。如果您省略了项目 ID,Beam 将使用您 管道选项. 管道选项. 中的默认项目 ID。

String tableSpec = "samples.weather_stations";
# dataset_id.table_id
table_spec = 'samples.weather_stations'

使用 TableReference

要使用 TableReference 指定表,请使用 BigQuery 表名的三个部分创建一个新的 TableReference

TableReference tableSpec =
    new TableReference()
        .setProjectId("clouddataflow-readonly")
        .setDatasetId("samples")
        .setTableId("weather_stations");
from apache_beam.io.gcp.internal.clients import bigquery

table_spec = bigquery.TableReference(
    projectId='clouddataflow-readonly',
    datasetId='samples',
    tableId='weather_stations')

Beam SDK for Java 还提供了 parseTableSpec 帮助程序方法,该方法使用包含完全限定 BigQuery 表名的字符串构造一个 TableReference 对象。但是,BigQueryIO 转换的静态工厂方法接受表名作为字符串,并为您构造一个 TableReference 对象。

表行

BigQueryIO 读取和写入转换生成并使用字典的 PCollection,其中 PCollection 中的每个元素表示表中的一行。

模式

写入 BigQuery 时,您必须为要写入的目标表提供表模式,除非您指定 CREATE_NEVER创建处置创建表模式 更详细地介绍了模式。

数据类型

BigQuery 支持以下数据类型:STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME 和 GEOGRAPHY。有关 Google Standard SQL 数据类型的概述,请参阅 数据类型。BigQueryIO 允许您使用所有这些数据类型。以下示例显示了从 BigQuery 读取和写入 BigQuery 时使用的数据类型的正确格式

import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class BigQueryTableRowCreate {
  public static TableRow createTableRow() {
    TableRow row =
        new TableRow()
            // To learn more about BigQuery data types:
            // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
            .set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
            .set("int64_field", 432)
            .set("float64_field", 3.141592653589793)
            .set("numeric_field", new BigDecimal("1234.56").toString())
            .set("bool_field", true)
            .set(
                "bytes_field",
                Base64.getEncoder()
                    .encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))

            // To learn more about date formatting:
            // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
            .set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
            .set(
                "datetime_field",
                LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
            .set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
            .set(
                "timestamp_field",
                Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT

            // To learn more about the geography Well-Known Text (WKT) format:
            // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
            .set("geography_field", "POINT(30 10)")

            // An array has its mode set to REPEATED.
            .set("array_field", Arrays.asList(1, 2, 3, 4))

            // Any class can be written as a STRUCT as long as all the fields in the
            // schema are present and they are encoded correctly as BigQuery types.
            .set(
                "struct_field",
                Stream.of(
                        new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
                        new SimpleEntry<>("int64_value", "42"))
                    .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
    return row;
  }
}
bigquery_data = [{
    'string': 'abc',
    'bytes': base64.b64encode(b'\xab\xac'),
    'integer': 5,
    'float': 0.5,
    'numeric': Decimal('5'),
    'boolean': True,
    'timestamp': '2018-12-31 12:44:31.744957 UTC',
    'date': '2018-12-31',
    'time': '12:44:31',
    'datetime': '2018-12-31T12:44:31',
    'geography': 'POINT(30 10)'
}]

从 Beam 2.7.0 开始,支持 NUMERIC 数据类型。此数据类型支持高精度小数(精度为 38 位,小数位数为 9 位)。GEOGRAPHY 数据类型与 Well-Known Text 结合使用(请参阅 https://en.wikipedia.org/wiki/Well-known_text 格式,用于读取和写入 BigQuery。BigQuery IO 要求 BYTES 数据类型的值在写入 BigQuery 时使用 base64 编码。从 BigQuery 读取字节时,它们将作为 base64 编码的字符串返回。

从 Beam 2.7.0 开始,支持 NUMERIC 数据类型。此数据类型支持高精度小数(精度为 38 位,小数位数为 9 位)。GEOGRAPHY 数据类型与 Well-Known Text 结合使用(请参阅 https://en.wikipedia.org/wiki/Well-known_text 格式,用于读取和写入 BigQuery。BigQuery IO 要求 BYTES 数据类型的值在写入 BigQuery 时使用 base64 编码。从 BigQuery 读取字节时,它们将作为 base64 编码的字节返回。

从 BigQuery 读取

BigQueryIO 允许您从 BigQuery 表读取,或执行 SQL 查询并读取结果。默认情况下,当您应用 BigQueryIO 读取转换时,Beam 会调用 BigQuery 导出请求。但是,Beam SDK for Java 还支持使用 BigQuery 存储读取 API 直接从 BigQuery 存储读取。有关更多信息,请参阅 使用存储读取 API

Beam 使用 BigQuery API 受 BigQuery 的 配额定价 策略约束。

Java 版 Beam SDK 有两种 BigQueryIO 读取方法。两种方法都允许您从表格读取数据,或者使用查询字符串读取字段。

  1. read(SerializableFunction) 读取 Avro 格式的记录,并使用指定的解析函数将它们解析为自定义类型对象的 PCollectionPCollection 中的每个元素代表表格中的一行。有关使用查询字符串读取的 示例代码 演示了如何使用 read(SerializableFunction)

  2. readTableRows 返回包含 BigQuery TableRow 对象的 PCollectionPCollection 中的每个元素代表表格中的一行。TableRow 对象中的整数值被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。这种方法很方便,但性能比 read(SerializableFunction) 慢 2-3 倍。有关从表格读取的 示例代码 演示了如何使用 readTableRows

注意: 从 Beam SDK 2.2.0 开始,BigQueryIO.read() 已弃用。请改用 read(SerializableFunction<SchemaAndRecord, T>) 将 BigQuery 行从 Avro GenericRecord 解析为您的自定义类型,或者使用 readTableRows() 将它们解析为 JSON TableRow 对象。

要使用 Python 版 Beam SDK 从 BigQuery 表格读取数据,请应用 ReadFromBigQuery 变换。ReadFromBigQuery 返回包含字典的 PCollection,其中 PCollection 中的每个元素代表表格中的一行。TableRow 对象中的整数值被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。

注意: 从 Beam SDK 2.25.0 开始,BigQuerySource() 已弃用。在 2.25.0 之前,要使用 Beam SDK 从 BigQuery 表格读取数据,请在 BigQuerySource 上应用 Read 变换。例如,beam.io.Read(beam.io.BigQuerySource(table_spec))

从表中读取

要读取整个 BigQuery 表格,请使用带有 BigQuery 表格名称的 from 方法。此示例使用 readTableRows

要读取整个 BigQuery 表格,请使用带有 BigQuery 表格名称的 table 参数。

以下代码读取包含气象站数据的整个表格,然后提取 max_temperature 列。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTable {
  public static PCollection<MyData> readFromTable(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

使用查询字符串读取

如果您不想读取整个表格,可以使用 fromQuery 方法提供查询字符串。

如果您不想读取整个表格,可以使用 query 参数向 ReadFromBigQuery 提供查询字符串。

以下代码使用 SQL 查询仅读取 max_temperature 列。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQuery {
  public static PCollection<MyData> readFromQuery(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows()
                    .fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
                    .usingStandardSql())
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '[apache-beam-testing.samples.weather_stations]')
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

您也可以使用 BigQuery 的标准 SQL 方言和查询字符串,如下例所示。

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
            .usingStandardSql()
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    pipeline
    | 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '`clouddataflow-readonly.samples.weather_stations`',
        use_standard_sql=True)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

查询执行项目

默认情况下,管道在与管道关联的 Google Cloud 项目中执行查询(对于 Dataflow 运行程序,它是在管道运行的项目)。在某些情况下,查询执行项目应该与管道项目不同。如果您使用 Java SDK,可以通过将管道选项“bigQueryProject”设置为所需的 Google Cloud 项目 ID 来定义查询执行项目。

使用存储读取 API

您可以使用 BigQuery Storage API 直接访问 BigQuery 存储中的表格,并支持诸如列选择和谓词过滤器下推之类的功能,这些功能可以提高管道执行效率。

Java 版 Beam SDK 支持在从 BigQuery 读取数据时使用 BigQuery Storage API。2.25.0 之前的 SDK 版本将 BigQuery Storage API 作为 实验性功能 支持,并使用预发布的 BigQuery Storage API 表面。调用者应该将使用 BigQuery Storage API 的管道迁移到 SDK 版本 2.25.0 或更高版本。

Python 版 Beam SDK 支持 BigQuery Storage API。通过将 method=DIRECT_READ 作为参数传递给 ReadFromBigQuery 来启用它。

更新你的代码

在您从表格读取数据时,请使用以下方法。

以下代码片段从表格读取数据。此示例来自 BigQueryTornadoes 示例。当示例的读取方法选项设置为 DIRECT_READ 时,管道使用 BigQuery Storage API 和列投影从 BigQuery 表格中读取公共天气数据样本。您可以在 GitHub 上查看完整源代码

import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTableWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .from(String.format("%s:%s.%s", project, dataset, table))
                    .withMethod(Method.DIRECT_READ)
                    .withSelectedFields(
                        Arrays.asList(
                            "string_field",
                            "int64_field",
                            "float64_field",
                            "numeric_field",
                            "bool_field",
                            "bytes_field",
                            "date_field",
                            "datetime_field",
                            "time_field",
                            "timestamp_field",
                            "geography_field",
                            "array_field",
                            "struct_field")))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTableWithStorageAPI' >> beam.io.ReadFromBigQuery(
        table=table_spec, method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
    | beam.Map(lambda elem: elem['max_temperature']))

以下代码片段使用查询字符串读取数据。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQueryWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
      String project, String dataset, String table, String query, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    /*
    String query = String.format("SELECT\n" +
        "  string_field,\n" +
        "  int64_field,\n" +
        "  float64_field,\n" +
        "  numeric_field,\n" +
        "  bool_field,\n" +
        "  bytes_field,\n" +
        "  date_field,\n" +
        "  datetime_field,\n" +
        "  time_field,\n" +
        "  timestamp_field,\n" +
        "  geography_field,\n" +
        "  array_field,\n" +
        "  struct_field\n" +
        "FROM\n" +
        "  `%s:%s.%s`", project, dataset, table)
    */

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withMethod(Method.DIRECT_READ))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
# The SDK for Python does not support the BigQuery Storage API.

写入 BigQuery

BigQueryIO 允许您写入 BigQuery 表格。如果您使用的是 Java 版 Beam SDK,您可以将不同的行写入不同的表格。Java 版 Beam SDK 还支持使用 BigQuery Storage Write API 直接写入 BigQuery 存储。有关更多信息,请参阅 使用 Storage Write API

BigQueryIO 写入变换使用受 BigQuery 的 配额定价 策略约束的 API。

当您应用写入变换时,必须为目标表格提供以下信息。

此外,如果您的写入操作创建了新的 BigQuery 表格,您还必须为目标表格提供表格架构。

创建处置

创建处置控制您的 BigQuery 写入操作是否应该在目标表格不存在时创建表格。

使用 .withCreateDisposition 指定创建处置。有效的枚举值为

  • Write.CreateDisposition.CREATE_IF_NEEDED:指定如果不存在表格,写入操作应该创建新的表格。如果您使用此值,则必须使用 withSchema 方法提供表格架构。CREATE_IF_NEEDED 是默认行为。

  • Write.CreateDisposition.CREATE_NEVER:指定绝不创建表格。如果目标表格不存在,写入操作将失败。

使用 create_disposition 参数指定创建处置。有效的枚举值为

  • BigQueryDisposition.CREATE_IF_NEEDED:指定如果不存在表格,写入操作应该创建新的表格。如果您使用此值,则必须提供表格架构。CREATE_IF_NEEDED 是默认行为。

  • BigQueryDisposition.CREATE_NEVER:指定绝不创建表格。如果目标表格不存在,写入操作将失败。

如果您指定 CREATE_IF_NEEDED 作为创建处置,并且您没有提供表格架构,则如果目标表格不存在,变换可能会在运行时失败。

写入处置

写入处置控制您的 BigQuery 写入操作如何应用于现有表格。

使用 .withWriteDisposition 指定写入处置。有效的枚举值为

  • Write.WriteDisposition.WRITE_EMPTY:指定如果目标表格不为空,写入操作应该在运行时失败。WRITE_EMPTY 是默认行为。

  • Write.WriteDisposition.WRITE_TRUNCATE:指定写入操作应该替换现有表格。目标表格中的所有现有行都将被删除,并将新的行添加到表格中。

  • Write.WriteDisposition.WRITE_APPEND:指定写入操作应该将行追加到现有表格的末尾。

使用 write_disposition 参数指定写入处置。有效的枚举值为

  • BigQueryDisposition.WRITE_EMPTY:指定如果目标表格不为空,写入操作应该在运行时失败。WRITE_EMPTY 是默认行为。

  • BigQueryDisposition.WRITE_TRUNCATE:指定写入操作应该替换现有表格。目标表格中的所有现有行都将被删除,并将新的行添加到表格中。

  • BigQueryDisposition.WRITE_APPEND:指定写入操作应该将行追加到现有表格的末尾。

当您使用 WRITE_EMPTY 时,对目标表格是否为空的检查可能会在实际写入操作之前发生。此检查不能保证您的管道将独占访问表格。两个并发的管道将数据写入同一个输出表格,并使用 WRITE_EMPTY 写入处置,可能会成功启动,但两个管道都可能在写入尝试发生后失败。

创建表模式

如果您的 BigQuery 写入操作创建了新的表格,则必须提供架构信息。架构包含有关表格中每个字段的信息。当使用新的架构更新管道时,现有架构字段必须保持相同的顺序,否则管道将中断,无法写入 BigQuery。

要在 Java 中创建表格架构,您可以使用 TableSchema 对象,或者使用包含 JSON 序列化 TableSchema 对象的字符串。

要在 Python 中创建表格架构,您可以使用 TableSchema 对象,或者使用定义字段列表的字符串。基于单个字符串的架构不支持嵌套字段、重复字段或为字段指定 BigQuery 模式(模式始终设置为 NULLABLE)。

使用 TableSchema

要创建并使用表格架构作为 TableSchema 对象,请执行以下步骤。

  1. 创建 TableFieldSchema 对象列表。每个 TableFieldSchema 对象代表表格中的一个字段。

  2. 创建 TableSchema 对象,并使用 setFields 方法指定您的字段列表。

  3. 当您应用写入变换时,使用 withSchema 方法提供表格架构。

  1. 创建 TableSchema 对象。

  2. 为表格中的每个字段创建并追加 TableFieldSchema 对象。

  3. 当您应用写入变换时,使用 schema 参数提供表格架构。将参数的值设置为 TableSchema 对象。

以下示例代码演示了如何为包含两个字符串类型字段(源和引用)的表格创建 TableSchema

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;

class BigQuerySchemaCreate {
  public static TableSchema createSchema() {
    // To learn more about BigQuery schemas:
    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema schema =
        new TableSchema()
            .setFields(
                Arrays.asList(
                    new TableFieldSchema()
                        .setName("string_field")
                        .setType("STRING")
                        .setMode("REQUIRED"),
                    new TableFieldSchema()
                        .setName("int64_field")
                        .setType("INT64")
                        .setMode("NULLABLE"),
                    new TableFieldSchema()
                        .setName("float64_field")
                        .setType("FLOAT64"), // default mode is "NULLABLE"
                    new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
                    new TableFieldSchema().setName("bool_field").setType("BOOL"),
                    new TableFieldSchema().setName("bytes_field").setType("BYTES"),
                    new TableFieldSchema().setName("date_field").setType("DATE"),
                    new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
                    new TableFieldSchema().setName("time_field").setType("TIME"),
                    new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
                    new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
                    new TableFieldSchema()
                        .setName("array_field")
                        .setType("INT64")
                        .setMode("REPEATED")
                        .setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
                    new TableFieldSchema()
                        .setName("struct_field")
                        .setType("STRUCT")
                        .setDescription(
                            "A STRUCT accepts a custom data class, the fields must match the custom class fields.")
                        .setFields(
                            Arrays.asList(
                                new TableFieldSchema().setName("string_value").setType("STRING"),
                                new TableFieldSchema().setName("int64_value").setType("INT64")))));
    return schema;
  }
}
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

使用字符串

要创建并使用表格架构作为包含 JSON 序列化 TableSchema 对象的字符串,请执行以下步骤。

  1. 创建包含 JSON 序列化 TableSchema 对象的字符串。

  2. 当您应用写入变换时,使用 withJsonSchema 方法提供表格架构。

要创建并使用表格架构作为字符串,请执行以下步骤。

  1. 创建形式为“field1:type1,field2:type2,field3:type3” 的单一逗号分隔字符串,该字符串定义字段列表。类型应指定字段的 BigQuery 类型。

  2. 使用schema参数在应用写入转换时提供表架构。将参数的值设置为字符串。

以下示例演示如何使用字符串指定与上一个示例相同的表架构。

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'

设置插入方法

BigQueryIO 支持两种将数据插入 BigQuery 的方法:加载作业和流式插入。每种插入方法在成本、配额和数据一致性方面提供了不同的权衡。有关这些权衡的更多信息,请参阅 BigQuery 文档以了解不同的数据摄取选项(具体而言,加载作业流式插入)。

BigQueryIO 根据输入PCollection选择默认的插入方法。可以使用withMethod指定所需的插入方法。有关可用方法及其限制的列表,请参阅Write.Method

BigQueryIO 根据输入PCollection选择默认的插入方法。可以使用method指定所需的插入方法。有关可用方法及其限制的列表,请参阅WriteToBigQuery

BigQueryIO 在以下情况下使用加载作业

  • 当将 BigQueryIO 写入转换应用于有界PCollection时。
  • 当使用BigQueryIO.write().withMethod(FILE_LOADS)指定加载作业作为插入方法时。
  • 当将 BigQueryIO 写入转换应用于有界PCollection时。
  • 当使用WriteToBigQuery(method='FILE_LOADS')指定加载作业作为插入方法时。

注意: 如果在流式管道中使用批次加载

必须使用withTriggeringFrequency指定用于启动加载作业的触发频率。注意设置频率,以确保管道不会超过 BigQuery 加载作业的配额限制

可以使用withNumFileShards明确设置写入的文件分片数,或者使用withAutoSharding启用动态分片(从 2.29.0 版本开始),分片数可能在运行时确定和更改。分片行为取决于运行程序。

必须使用triggering_frequency指定用于启动加载作业的触发频率。注意设置频率,以确保管道不会超过 BigQuery 加载作业的配额限制

可以设置with_auto_sharding=True启用动态分片(从 2.29.0 版本开始)。分片数可能在运行时确定和更改。分片行为取决于运行程序。

BigQueryIO 在以下情况下使用流式插入

  • 当将 BigQueryIO 写入转换应用于无界PCollection时。
  • 当使用BigQueryIO.write().withMethod(STREAMING_INSERTS)指定流式插入作为插入方法时。
  • 当将 BigQueryIO 写入转换应用于无界PCollection时。
  • 当使用WriteToBigQuery(method='STREAMING_INSERTS')指定流式插入作为插入方法时。

注意: 默认情况下,流式插入会启用 BigQuery 的尽力而为的重复数据删除机制。可以通过设置ignoreInsertIds来禁用该机制。配额限制在启用重复数据删除与禁用重复数据删除时有所不同。

流式插入会为每个表目标应用默认的分片。可以使用withAutoSharding(从 2.28.0 版本开始)启用动态分片,分片数可能在运行时确定和更改。分片行为取决于运行程序。

注意: 默认情况下,流式插入会启用 BigQuery 的尽力而为的重复数据删除机制。可以通过设置ignore_insert_ids=True来禁用该机制。配额限制在启用重复数据删除与禁用重复数据删除时有所不同。

流式插入会为每个表目标应用默认的分片。可以设置with_auto_sharding=True(从 2.29.0 版本开始)启用动态分片。分片数可能在运行时确定和更改。分片行为取决于运行程序。

写入表

要写入 BigQuery 表,请应用writeTableRowswrite转换。

要写入 BigQuery 表,请应用WriteToBigQuery转换。WriteToBigQuery支持批处理模式和流式模式。必须将转换应用于字典的PCollection。通常,需要使用其他转换(例如ParDo)将输出数据格式化为集合。

以下示例使用包含引用的PCollection

writeTableRows方法将 BigQueryTableRow对象的PCollection写入 BigQuery 表。PCollection中的每个元素代表表中的一行。本示例使用writeTableRows将元素写入PCollection<TableRow>。写入操作将在需要时创建表。如果表已存在,则会将其替换。

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;

class BigQueryWriteToTable {
  public static void writeToTable(
      String project,
      String dataset,
      String table,
      TableSchema schema,
      PCollection<TableRow> rows) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // TableSchema schema = new TableSchema().setFields(Arrays.asList(...));

    // Pipeline pipeline = Pipeline.create();
    // PCollection<TableRow> rows = ...

    rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            // For CreateDisposition:
            // - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
            // required
            // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            // For WriteDisposition:
            // - WRITE_EMPTY (default): raises an error if the table is not empty
            // - WRITE_APPEND: appends new rows to existing rows
            // - WRITE_TRUNCATE: deletes the existing rows before writing
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    // pipeline.run().waitUntilFinish();
  }
}
quotes = pipeline | beam.Create([
    {
        'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
    },
    {
        'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
    },
])

以下示例代码展示了如何应用WriteToBigQuery转换将字典的PCollection写入 BigQuery 表。写入操作将在需要时创建表。如果表已存在,则会将其替换。

quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

write转换将自定义类型对象的PCollection写入 BigQuery 表。使用.withFormatFunction(SerializableFunction)提供格式化函数,将PCollection中的每个输入元素转换为TableRow。本示例使用write写入PCollection<String>。写入操作将在需要时创建表。如果表已存在,则会将其替换。

quotes.apply(
    BigQueryIO.<Quote>write()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withFormatFunction(
            (Quote elem) ->
                new TableRow().set("source", elem.source).set("quote", elem.quote))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

使用流式插入时,可以决定如何处理失败的记录。可以继续重试,或者使用WriteResult.getFailedInserts()方法将失败的记录返回到单独的PCollection中。

使用存储写入 API

从 Beam SDK for Java 的 2.36.0 版本开始,可以使用 BigQueryIO 连接器中的BigQuery 存储写入 API

从 Beam SDK for Python 的 2.47.0 版本开始,SDK 也支持 BigQuery 存储写入 API。

Python SDK 的 BigQuery 存储写入 API 目前对支持的数据类型有一些限制。由于此方法使用了跨语言转换,因此我们仅限于跨语言边界支持的类型。例如,需要apache_beam.utils.timestamp.Timestamp才能写入TIMESTAMP BigQuery 类型。此外,某些类型(例如DATETIME)尚不支持。有关更多详细信息,请参阅完整的类型映射

注意: 如果要从源代码运行使用存储写入 API 的 WriteToBigQuery,需要运行./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build来构建 expansion-service jar。如果从发布的 Beam SDK 运行,则 jar 已包含在内。

恰好一次语义

要使用存储写入 API 写入 BigQuery,请将withMethod设置为Method.STORAGE_WRITE_API。以下示例转换使用存储写入 API 和恰好一次语义写入 BigQuery

WriteResult writeResult = rows.apply("Save Rows to BigQuery",
BigQueryIO.writeTableRows()
        .to(options.getFullyQualifiedTableName())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withMethod(Method.STORAGE_WRITE_API)
);
quotes | "WriteTableWithStorageAPI" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)

如果要更改 BigQueryIO 的行为,以便管道的所有 BigQuery 接收器默认使用存储写入 API,请设置UseStorageWriteApi选项

如果管道需要创建表(如果表不存在且已将创建处置指定为CREATE_IF_NEEDED),则必须提供表架构。API 使用架构验证数据并将其转换为二进制协议。

TableSchema schema = new TableSchema().setFields(
        List.of(
            new TableFieldSchema()
                .setName("request_ts")
                .setType("TIMESTAMP")
                .setMode("REQUIRED"),
            new TableFieldSchema()
                .setName("user_name")
                .setType("STRING")
                .setMode("REQUIRED")));
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

对于流式管道,需要设置两个附加参数:流数和触发频率。

BigQueryIO.writeTableRows()
        // ...
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(3)
);
# The Python SDK doesn't currently support setting the number of write streams
quotes | "StorageWriteAPIWithFrequency" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
    triggering_frequency=5)

流数定义了 BigQueryIO 写入转换的并行性,大致对应于管道使用的存储写入 API 流数。可以通过withNumStorageWriteApiStreams在转换中明确设置,或者在管道中提供numStorageWriteApiStreams选项,如BigQueryOptions中所定义。请注意,这仅受流式管道支持。

触发频率决定数据在 BigQuery 中可用于查询的时间。可以通过withTriggeringFrequency明确设置,或者通过设置storageWriteApiTriggeringFrequencySec选项指定秒数。

这两个参数的组合会影响 BigQueryIO 在调用存储写入 API 之前创建的行批次的大小。将频率设置得太高会导致批次更小,从而影响性能。一般而言,单个流应该能够处理至少每秒 1Mb 的吞吐量。创建独占流对于 BigQuery 服务来说是一个昂贵的操作,因此应该仅根据用例的需要使用流。对于大多数管道来说,触发频率在个位秒是不错的选择。

与流式插入类似,STORAGE_WRITE_API 支持动态确定写入 BigQuery 的并行流数(从 2.42.0 版本开始)。可以使用withAutoSharding明确启用此功能。

numStorageWriteApiStreams设置为 0 或未指定时,STORAGE_WRITE_API 默认使用动态分片。

使用STORAGE_WRITE_API时,WriteResult.getFailedStorageApiInserts返回的PCollection包含未能写入存储写入 API 接收器的行。

至少一次语义

如果用例允许目标表中存在潜在的重复记录,可以使用STORAGE_API_AT_LEAST_ONCE方法。此方法不会将要写入 BigQuery 的记录持久化到其混洗存储中,这是提供STORAGE_WRITE_API方法的恰好一次语义所必需的。因此,对于大多数管道来说,使用此方法通常更便宜,并且延迟更低。如果使用STORAGE_API_AT_LEAST_ONCE,则不需要指定流数,并且不能指定触发频率。

自动分片不适用于STORAGE_API_AT_LEAST_ONCE

使用STORAGE_API_AT_LEAST_ONCE时,WriteResult.getFailedStorageApiInserts返回的PCollection包含未能写入存储写入 API 接收器的行。

配额

使用存储写入 API 之前,请注意BigQuery 存储写入 API 配额

使用动态目标

可以使用动态目标功能将PCollection中的元素写入不同的 BigQuery 表,这些表可能具有不同的架构。

动态目标功能按用户定义的目标键对用户类型进行分组,使用该键计算目标表和/或架构,并将每组的元素写入计算出的目标。

此外,还可以编写自己的类型,这些类型具有映射到TableRow的映射函数,并且可以在所有DynamicDestinations方法中使用侧输入。

要使用动态目标,必须创建一个DynamicDestinations对象,并实现以下方法

  • getDestination: 返回一个对象,getTablegetSchema可以使用该对象作为目标键来计算目标表和/或架构。

  • getTable: 返回目标键的表(作为 TableDestination 对象)。此方法必须为每个唯一的目标返回唯一的表。

  • getSchema: 返回目标键的表架构(作为 TableSchema 对象)。

然后,使用 write().to 和您的 DynamicDestinations 对象。此示例使用包含天气数据的 PCollection,并将数据写入每个年份的不同表。

/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
  final long year;
  final long month;
  final long day;
  final double maxTemp;

  public WeatherData() {
    this.year = 0;
    this.month = 0;
    this.day = 0;
    this.maxTemp = 0.0f;
  }
  public WeatherData(long year, long month, long day, double maxTemp) {
    this.year = year;
    this.month = month;
    this.day = day;
    this.maxTemp = maxTemp;
  }
}
*/

PCollection<WeatherData> weatherData =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return new WeatherData(
                      (Long) record.get("year"),
                      (Long) record.get("month"),
                      (Long) record.get("day"),
                      (Double) record.get("max_temperature"));
                })
            .fromQuery(
                "SELECT year, month, day, max_temperature "
                    + "FROM [apache-beam-testing.samples.weather_stations] "
                    + "WHERE year BETWEEN 2007 AND 2009")
            .withCoder(AvroCoder.of(WeatherData.class)));

// We will send the weather data into different tables for every year.
weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(
            new DynamicDestinations<WeatherData, Long>() {
              @Override
              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
                return elem.getValue().year;
              }

              @Override
              public TableDestination getTable(Long destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId(writeProject)
                        .setDatasetId(writeDataset)
                        .setTableId(writeTable + "_" + destination),
                    "Table for year " + destination);
              }

              @Override
              public TableSchema getSchema(Long destination) {
                return new TableSchema()
                    .setFields(
                        ImmutableList.of(
                            new TableFieldSchema()
                                .setName("year")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("month")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("day")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("maxTemp")
                                .setType("FLOAT")
                                .setMode("NULLABLE")));
              }
            })
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
                                                  ('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

使用时间分区

BigQuery 时间分区将您的表划分为较小的分区,称为 分区表。分区表使您更轻松地管理和查询数据。

要使用 BigQuery 时间分区,请使用以下两种方法之一

  • withTimePartitioning: 此方法接受 TimePartitioning 类,并且仅在您写入单个表时才可用。

  • withJsonTimePartitioning: 此方法与 withTimePartitioning 相同,但接受 JSON 序列化字符串对象。

此示例每天生成一个分区。

weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(tableSpec + "_partitioning")
        .withSchema(tableSchema)
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        // NOTE: an existing table without time partitioning set up will not work
        .withTimePartitioning(new TimePartitioning().setType("DAY"))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
quotes | 'WriteWithTimePartitioning' >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    additional_bq_parameters={'timePartitioning': {
        'type': 'HOUR'
    }})

限制

BigQueryIO 目前有以下限制。

  1. 您无法将 BigQuery 写入的完成与管道中的其他步骤进行排序。

  2. 如果您使用的是 Python 版 Beam SDK,则在写入非常大的数据集时,可能会遇到导入大小配额问题。作为一种解决方法,您可以对数据集进行分区(例如,使用 Beam 的 Partition 变换),并写入多个 BigQuery 表。Java 版 Beam SDK 没有此限制,因为它会为您对数据集进行分区。

  3. 当您 将数据加载 到 BigQuery 时,将应用 这些限制。默认情况下,BigQuery 使用共享的插槽池来加载数据。这意味着无法保证可用容量,并且您的加载可能会排队,直到有插槽可用。如果插槽在 6 小时内没有可用,则加载将因 BigQuery 设置的限制而失败。为避免这种情况,强烈建议您使用 BigQuery 预留,以确保您的加载不会因容量问题而排队并失败。

其他示例

您可以在 Beam 的示例目录中找到使用 BigQuery 的其他示例。

Java 食谱示例

这些示例来自 Java 版 cookbook 示例 目录。

Java 完整示例

这些示例来自 Java 版 完整示例 目录。

Python 食谱示例

这些示例来自 Python 版 cookbook 示例 目录。