使用 Beam YAML 和 Protobuf 进行高效的流式数据处理

使用 Beam YAML 和 Protobuf 进行高效的流式数据处理

随着流式数据处理的增长,其维护、复杂性和成本也在不断增加。本文将解释如何通过使用 Protobuf 来有效地扩展管道,从而确保管道可重用且部署速度快。目标是使用 Beam YAML 使工程师能够轻松地实施此过程。

使用 Beam YAML 简化管道

在 Beam 中创建管道可能有些困难,尤其是对于 Apache Beam 新用户而言。设置项目、管理依赖项等可能具有挑战性。Beam YAML 消除了大部分样板代码,使您可以专注于工作中最重要的部分:数据转换。

Beam YAML 的一些主要优势包括

  • 可读性: 通过使用声明性语言 (YAML),管道配置更易于人类阅读。
  • 可重用性: 在不同管道之间重用相同的组件变得更加简单。
  • 可维护性: 管道维护和更新更加容易。

以下模板展示了从 Kafka 主题读取事件并将其写入 BigQuery 的示例。

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'TOPIC_NAME'
        format: RAW/AVRO/JSON/PROTO
        bootstrap_servers: 'BOOTSTRAP_SERVERS'
        schema: 'SCHEMA'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
        useAtLeastOnceSemantics: true

options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

完整的流程

本节演示了此管道的完整流程。

创建一个简单的协议事件

以下代码创建了一个简单的电影事件。

// events/v1/movie_event.proto

syntax = "proto3";

package event.v1;

import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";

message MovieEvent {
  option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
  google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
  google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
  google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
  google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
    // validates the average rating is at least 0
    gte: 0,
    // validates the average rating is at most 100
    lte: 100
  }, (gen_bq_schema.bigquery).description = "Movie rating"];
  string event_dt = 5 [
    (gen_bq_schema.bigquery).type_override = "DATETIME",
    (gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
    (buf.validate.field) = {
      string: {
        pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
      },
      ignore_empty: false,
    }
  ];
}

由于这些事件被写入 BigQuery,因此导入了 bq_field 协议和 bq_table 协议。这些协议文件有助于生成 BigQuery JSON 架构。本示例还演示了左移方法,该方法将测试、质量和性能尽早地移入开发过程。例如,为了确保仅从源代码生成有效的事件,包含了 buf.validate 元素。

events/v1 文件夹中创建 movie_event.proto 协议后,您可以生成必要的 文件描述符。文件描述符是架构的编译表示,允许各种工具和系统动态地理解和处理 Protobuf 数据。为了简化流程,本示例使用了 Buf,它需要以下配置文件。

Buf 配置

# buf.yaml

version: v2
deps:
  - buf.build/googlecloudplatform/bq-schema-api
  - buf.build/bufbuild/protovalidate
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
# buf.gen.yaml

version: v2
managed:
  enabled: true
plugins:
  # Python Plugins
  - remote: buf.build/protocolbuffers/python
    out: gen/python
  - remote: buf.build/grpc/python
    out: gen/python

  # Java Plugins
  - remote: buf.build/protocolbuffers/java:v25.2
    out: gen/maven/src/main/java
  - remote: buf.build/grpc/java
    out: gen/maven/src/main/java

  # BQ Schemas
  - remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
    out: protoc-gen/bq_schema

运行以下两个命令以生成必要的 Java、Python、BigQuery 架构和描述符文件

// Generate the buf.lock file
buf deps update

// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports

// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports

使 Beam YAML 读取协议

对 YAML 文件进行以下修改

# movie_events_pipeline.yml

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'movie_proto'
        format: PROTO
        bootstrap_servers: '<BOOTSTRAP_SERVERS>'
        file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
        message_name: 'event.v1.MovieEvent'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: '<PROJECT_ID>.raw.movie_table'
        useAtLeastOnceSemantics: true
options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

此步骤将格式更改为 PROTO 并添加了 file_descriptor_pathmessage_name

使用 Terraform 部署管道

您可以使用 Terraform 使用 Dataflow 作为运行器来部署 Beam YAML 管道。以下 Terraform 代码示例演示了如何实现此目的

// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
  project = var.gcp_project_id
  service = "dataflow.googleapis.com"
}

// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
  provider                     = google-beta
  project                      = var.gcp_project_id
  name                         = "movie-proto-events"
  container_spec_gcs_path      = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
  region                       = var.gcp_region
  on_delete                    = "drain"
  machine_type                 = "n2d-standard-4"
  enable_streaming_engine      = true
  subnetwork                   = var.subnetwork
  skip_wait_on_job_termination = true
  parameters = {
    yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
    max_num_workers    = 40
    worker_zone        = var.gcp_zone
  }
  depends_on = [google_project_service.enable_dataflow_api]
}

假设 BigQuery 表存在(您可以使用 Terraform 和 Proto 完成此操作),此代码使用从 Kafka 读取协议事件并将其写入 BigQuery 的 Beam YAML 代码创建了一个 Dataflow 作业。

改进和结论

以下社区贡献可以改进本示例中的 Beam YAML 代码

  • 支持架构注册表: 与架构注册表(例如 Buf Registry 或 Apicurio)集成以实现更好的架构管理。当前工作流程使用 Buf 生成描述符并将其存储在 Google Cloud Storage 中。描述符可以存储在架构注册表中而不是存储在 Google Cloud Storage 中。

  • 增强监控: 实施高级监控和警报机制,以便快速识别和解决数据管道中的问题。

利用 Beam YAML 和 Protobuf 使我们能够简化数据处理管道的创建和维护,从而显著降低复杂性。这种方法确保工程师能够更有效地实施和扩展健壮、可重用的管道,而无需手动编写 Beam 代码。

贡献

希望帮助构建和添加功能的开发人员欢迎开始为 Beam YAML 模块 中的努力做出贡献。

GitHub 存储库中还列出了 已发现的 bug - 现在标记为 yaml 标签。

虽然 Beam YAML 从 Beam 2.52 开始被标记为稳定,但它仍在积极开发中,每个版本都会添加新功能。希望参与设计决策并提供有关框架使用方式的见解的人员,强烈建议您加入 开发者邮件列表,这些讨论正在进行中。