Python 流式管道
Python 流式管道执行从 Beam SDK 版本 2.5.0 开始可用(有一些 限制)。
为什么要使用流式执行?
如果您的管道从流式或持续更新的数据源(如 Cloud Pub/Sub)读取数据,Beam 会创建一个无界 PCollection。运行器必须使用持续运行的流式作业来处理无界 PCollection,因为整个集合在任何时间点都不可用以进行处理。 大小和有界性 提供了有关有界和无界集合的更多信息。
修改管道以使用流式处理
要修改批处理管道以支持流式处理,您必须进行以下代码更改
- 使用支持从无界源读取的 I/O 连接器。
- 使用支持写入无界源的 I/O 连接器。
- 选择 窗口策略。
Beam SDK for Python 包含两个支持无界 PCollection 的 I/O 连接器:Google Cloud Pub/Sub(读取和写入)和 Google BigQuery(写入)。
以下代码片段显示了修改批处理 WordCount 示例以支持流式处理所需的代码更改
这些批处理 WordCount 代码片段来自 wordcount.py。此代码使用 TextIO I/O 连接器从有界集合读取和写入。
lines = p | 'read' >> ReadFromText(known_args.input)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
output | 'write' >> WriteToText(known_args.output)
这些流式 WordCount 代码片段来自 streaming_wordcount.py。此代码使用从无界源(Cloud Pub/Sub)读取和写入的 I/O 连接器,并指定了固定窗口策略。
lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write to Pub/Sub
output | beam.io.WriteStringsToPubSub(known_args.output_topic)
运行流式管道
要运行示例流式 WordCount 管道,您必须拥有 Cloud Pub/Sub 输入主题和输出主题。要创建、订阅和拉取主题以进行测试目的,您可以使用 Cloud Pub/Sub 快速入门 中的命令。
以下简单的 bash 脚本将输入文本文件中的行馈送到您的输入主题
cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done
或者,您可以从公开可用的 Cloud Pub/Sub 流读取,例如 projects/pubsub-public-data/topics/taxirides-realtime
。但是,您必须创建自己的输出主题以测试写入。
以下命令运行 streaming_wordcount.py 示例流式管道。指定您的 Cloud Pub/Sub 项目和输入主题 (--input_topic
),输出 Cloud Pub/Sub 项目和主题 (--output_topic
)。
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
有关执行流式管道的任何其他特定于运行器的信息,请查看您运行器的文档
不支持的功能
Python 流式执行目前不支持以下功能
- 自定义源 API
- 用户定义的自定义合并
WindowFn
(使用 fnapi) - 对于可移植运行器,请参见 可移植性支持表。