Python 流式管道

Python 流式管道执行从 Beam SDK 版本 2.5.0 开始可用(有一些 限制)。

为什么要使用流式执行?

如果您的管道从流式或持续更新的数据源(如 Cloud Pub/Sub)读取数据,Beam 会创建一个无界 PCollection。运行器必须使用持续运行的流式作业来处理无界 PCollection,因为整个集合在任何时间点都不可用以进行处理。 大小和有界性 提供了有关有界和无界集合的更多信息。

修改管道以使用流式处理

要修改批处理管道以支持流式处理,您必须进行以下代码更改

Beam SDK for Python 包含两个支持无界 PCollection 的 I/O 连接器:Google Cloud Pub/Sub(读取和写入)和 Google BigQuery(写入)。

以下代码片段显示了修改批处理 WordCount 示例以支持流式处理所需的代码更改

这些批处理 WordCount 代码片段来自 wordcount.py。此代码使用 TextIO I/O 连接器从有界集合读取和写入。

  lines = p | 'read' >> ReadFromText(known_args.input)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)

这些流式 WordCount 代码片段来自 streaming_wordcount.py。此代码使用从无界源(Cloud Pub/Sub)读取和写入的 I/O 连接器,并指定了固定窗口策略。

  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | beam.WindowInto(window.FixedWindows(15, 0))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write to Pub/Sub
  output | beam.io.WriteStringsToPubSub(known_args.output_topic)

运行流式管道

要运行示例流式 WordCount 管道,您必须拥有 Cloud Pub/Sub 输入主题和输出主题。要创建、订阅和拉取主题以进行测试目的,您可以使用 Cloud Pub/Sub 快速入门 中的命令。

以下简单的 bash 脚本将输入文本文件中的行馈送到您的输入主题

cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done

或者,您可以从公开可用的 Cloud Pub/Sub 流读取,例如 projects/pubsub-public-data/topics/taxirides-realtime。但是,您必须创建自己的输出主题以测试写入。

以下命令运行 streaming_wordcount.py 示例流式管道。指定您的 Cloud Pub/Sub 项目和输入主题 (--input_topic),输出 Cloud Pub/Sub 项目和主题 (--output_topic)。

# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
See /documentation/runners/spark/ for more information.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]

# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

有关执行流式管道的任何其他特定于运行器的信息,请查看您运行器的文档

不支持的功能

Python 流式执行目前不支持以下功能