博客
2024/09/20
使用 Beam YAML 和 Protobuf 进行高效的流式数据处理
使用 Beam YAML 和 Protobuf 进行高效的流式数据处理
随着流式数据处理的增长,其维护、复杂性和成本也在不断增加。本文将解释如何通过使用 Protobuf 来有效地扩展管道,从而确保管道可重用且部署速度快。目标是使用 Beam YAML 使工程师能够轻松地实施此过程。
使用 Beam YAML 简化管道
在 Beam 中创建管道可能有些困难,尤其是对于 Apache Beam 新用户而言。设置项目、管理依赖项等可能具有挑战性。Beam YAML 消除了大部分样板代码,使您可以专注于工作中最重要的部分:数据转换。
Beam YAML 的一些主要优势包括
- 可读性: 通过使用声明性语言 (YAML),管道配置更易于人类阅读。
- 可重用性: 在不同管道之间重用相同的组件变得更加简单。
- 可维护性: 管道维护和更新更加容易。
以下模板展示了从 Kafka 主题读取事件并将其写入 BigQuery 的示例。
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'TOPIC_NAME'
format: RAW/AVRO/JSON/PROTO
bootstrap_servers: 'BOOTSTRAP_SERVERS'
schema: 'SCHEMA'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
完整的流程
本节演示了此管道的完整流程。
创建一个简单的协议事件
以下代码创建了一个简单的电影事件。
// events/v1/movie_event.proto
syntax = "proto3";
package event.v1;
import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";
message MovieEvent {
option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
// validates the average rating is at least 0
gte: 0,
// validates the average rating is at most 100
lte: 100
}, (gen_bq_schema.bigquery).description = "Movie rating"];
string event_dt = 5 [
(gen_bq_schema.bigquery).type_override = "DATETIME",
(gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
(buf.validate.field) = {
string: {
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
},
ignore_empty: false,
}
];
}
由于这些事件被写入 BigQuery,因此导入了 bq_field
协议和 bq_table
协议。这些协议文件有助于生成 BigQuery JSON 架构。本示例还演示了左移方法,该方法将测试、质量和性能尽早地移入开发过程。例如,为了确保仅从源代码生成有效的事件,包含了 buf.validate
元素。
在 events/v1
文件夹中创建 movie_event.proto
协议后,您可以生成必要的 文件描述符。文件描述符是架构的编译表示,允许各种工具和系统动态地理解和处理 Protobuf 数据。为了简化流程,本示例使用了 Buf,它需要以下配置文件。
Buf 配置
# buf.yaml
version: v2
deps:
- buf.build/googlecloudplatform/bq-schema-api
- buf.build/bufbuild/protovalidate
breaking:
use:
- FILE
lint:
use:
- DEFAULT
# buf.gen.yaml
version: v2
managed:
enabled: true
plugins:
# Python Plugins
- remote: buf.build/protocolbuffers/python
out: gen/python
- remote: buf.build/grpc/python
out: gen/python
# Java Plugins
- remote: buf.build/protocolbuffers/java:v25.2
out: gen/maven/src/main/java
- remote: buf.build/grpc/java
out: gen/maven/src/main/java
# BQ Schemas
- remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
out: protoc-gen/bq_schema
运行以下两个命令以生成必要的 Java、Python、BigQuery 架构和描述符文件
// Generate the buf.lock file
buf deps update
// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports
// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports
使 Beam YAML 读取协议
对 YAML 文件进行以下修改
# movie_events_pipeline.yml
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'movie_proto'
format: PROTO
bootstrap_servers: '<BOOTSTRAP_SERVERS>'
file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
message_name: 'event.v1.MovieEvent'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: '<PROJECT_ID>.raw.movie_table'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
此步骤将格式更改为 PROTO
并添加了 file_descriptor_path
和 message_name
。
使用 Terraform 部署管道
您可以使用 Terraform 使用 Dataflow 作为运行器来部署 Beam YAML 管道。以下 Terraform 代码示例演示了如何实现此目的
// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
project = var.gcp_project_id
service = "dataflow.googleapis.com"
}
// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
provider = google-beta
project = var.gcp_project_id
name = "movie-proto-events"
container_spec_gcs_path = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
region = var.gcp_region
on_delete = "drain"
machine_type = "n2d-standard-4"
enable_streaming_engine = true
subnetwork = var.subnetwork
skip_wait_on_job_termination = true
parameters = {
yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
max_num_workers = 40
worker_zone = var.gcp_zone
}
depends_on = [google_project_service.enable_dataflow_api]
}
假设 BigQuery 表存在(您可以使用 Terraform 和 Proto 完成此操作),此代码使用从 Kafka 读取协议事件并将其写入 BigQuery 的 Beam YAML 代码创建了一个 Dataflow 作业。
改进和结论
以下社区贡献可以改进本示例中的 Beam YAML 代码
支持架构注册表: 与架构注册表(例如 Buf Registry 或 Apicurio)集成以实现更好的架构管理。当前工作流程使用 Buf 生成描述符并将其存储在 Google Cloud Storage 中。描述符可以存储在架构注册表中而不是存储在 Google Cloud Storage 中。
增强监控: 实施高级监控和警报机制,以便快速识别和解决数据管道中的问题。
利用 Beam YAML 和 Protobuf 使我们能够简化数据处理管道的创建和维护,从而显著降低复杂性。这种方法确保工程师能够更有效地实施和扩展健壮、可重用的管道,而无需手动编写 Beam 代码。
贡献
希望帮助构建和添加功能的开发人员欢迎开始为 Beam YAML 模块 中的努力做出贡献。
GitHub 存储库中还列出了 已发现的 bug - 现在标记为 yaml
标签。
虽然 Beam YAML 从 Beam 2.52 开始被标记为稳定,但它仍在积极开发中,每个版本都会添加新功能。希望参与设计决策并提供有关框架使用方式的见解的人员,强烈建议您加入 开发者邮件列表,这些讨论正在进行中。