Apache Beam 词频统计示例

词频统计示例演示了如何设置一个处理管道,该管道可以读取文本、将文本行分解为单个词,并对每个词执行频率统计。Beam SDK 包含一系列四个逐渐更加详细的词频统计示例,这些示例相互建立。所有示例的输入文本都是莎士比亚的文本集。

每个词频统计示例都介绍了 Beam 编程模型中的不同概念。从理解 MinimalWordCount 开始,它是所有示例中最简单的。一旦您对构建管道的基本原理感到满意,就可以继续学习其他示例中的更多概念。

MinimalWordCount 示例

MinimalWordCount 演示了一个简单的管道,该管道使用 Direct Runner 从文本文件读取数据,应用转换将词标记化并统计词频,并将数据写入输出文本文件。

此示例对输入和输出文件的路径进行了硬编码,并且没有执行任何错误检查;它只用于向您展示创建 Beam 管道的“基本骨架”。这种缺乏参数化使得此特定管道不如标准 Beam 管道那样在不同运行器之间可移植。在后面的示例中,我们将参数化管道的输入和输出源,并展示其他最佳实践。

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
$ go install github.com/apache/beam/sdks/v2/go/examples/minimal_wordcount
$ minimal_wordcount

要查看 Java 中的完整代码,请参见 **MinimalWordCount。**

要查看 Python 中的完整代码,请参见 **wordcount_minimal.py。**

要查看 Go 中的完整代码,请参见 **minimal_wordcount.go。**

关键概念

以下部分将详细解释这些概念,并使用 MinimalWordCount 管道的相关代码片段。

创建管道

在此示例中,代码首先创建一个 PipelineOptions 对象。此对象允许我们为管道设置各种选项,例如执行管道的管道运行器以及所选运行器所需的任何运行器特定配置。在此示例中,我们以编程方式设置了这些选项,但在大多数情况下,命令行参数用于设置 PipelineOptions

您可以指定一个运行器来执行您的管道,例如 DataflowRunnerSparkRunner。如果您省略指定运行器,如本示例所示,您的管道将在本地使用 DirectRunner 执行。在接下来的部分中,我们将指定管道的运行器。

 // Create a PipelineOptions object. This object lets us set various execution
 // options for our pipeline, such as the runner you wish to use. This example
 // will run with the DirectRunner by default, based on the class path configured
 // in its dependencies.
 PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
)

下一步是使用我们刚刚构建的选项创建一个 Pipeline 对象。Pipeline 对象构建要执行的转换图,与该特定管道相关联。

第一步是创建一个 Pipeline 对象。它构建要执行的转换图,与该特定管道相关联。作用域允许将转换分组为复合转换。

Pipeline p = Pipeline.create(options);
pipeline = beam.Pipeline(options=beam_options)
p := beam.NewPipeline()
s := p.Root()

应用管道转换

MinimalWordCount 管道包含几个转换,用于将数据读入管道、操作或转换数据以及写出结果。转换可以包含单个操作,也可以包含多个嵌套转换(即 复合转换)。

每个转换都接受某种输入数据,并生成某种输出数据。输入和输出数据通常由 SDK 类 PCollection 表示。PCollection 是 Beam SDK 提供的一个特殊类,您可以使用它来表示几乎任何大小的数据集,包括无界数据集。

The MinimalWordCount pipeline data flow.

图 1:MinimalWordCount 管道数据流。

MinimalWordCount 管道包含五个转换

  1. 一个文本文件 Read 转换应用于 Pipeline 对象本身,并生成一个 PCollection 作为输出。输出 PCollection 中的每个元素都代表输入文件中的一个文本行。此示例使用存储在公开可访问的 Google Cloud Storage 存储桶(“gs://”)中的输入数据。
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
pipeline
| beam.io.ReadFromText(input_file)
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/*")
  1. 此转换将 PCollection<String> 中的行拆分,其中每个元素都是莎士比亚的文本集中中的一个单独的词。作为替代方案,可以使用一个 ParDo 转换,该转换对每个元素调用一个 DoFn(作为匿名类内联定义),将文本行标记化为单个词。此转换的输入是前一个 TextIO.Read 转换生成的文本行的 PCollectionParDo 转换输出一个新的 PCollection,其中每个元素都代表文本中的一个单独的词。
    .apply("ExtractWords", FlatMapElements
        .into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
# The Flatmap transform is a simplified version of ParDo.

| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)
  1. SDK 提供的 Count 转换是一个通用转换,它接受任何类型的 PCollection,并返回一个键值对的 PCollection。每个键都代表输入集合中的一个唯一元素,每个值都代表该键在输入集合中出现的次数。

    在此管道中,Count 的输入是前一个 ParDo 生成的单个词的 PCollection,输出是一个键值对的 PCollection,其中每个键都代表文本中的一个唯一词,而关联的值是每个词的出现次数。

.apply(Count.<String>perElement())
| beam.combiners.Count.PerElement()
counted := stats.Count(s, words)
  1. 下一个转换将每个唯一词和出现次数的键值对格式化为可打印的字符串,适合写入输出文件。

    映射转换是一个更高级别的复合转换,它封装了一个简单的 ParDo。对于输入 PCollection 中的每个元素,映射转换应用一个函数,该函数生成正好一个输出元素。

.apply("FormatResults", MapElements
    .into(TypeDescriptors.strings())
    .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)
  1. 一个文本文件写入转换。此转换接受最终的格式化字符串的 PCollection 作为输入,并将每个元素写入输出文本文件。输入 PCollection 中的每个元素都代表结果输出文件中的一个文本行。
.apply(TextIO.write().to("wordcounts"));
| beam.io.WriteToText(output_path)
textio.Write(s, "wordcounts.txt", formatted)

请注意,Write 转换会生成一个类型为 PDone 的微不足道的结果值,在本例中被忽略了。

请注意,Write 转换不会返回任何 PCollection。

运行管道

通过调用 run 方法运行管道,该方法将您的管道发送到您在 PipelineOptions 中指定的管道运行器执行。

通过将管道传递给运行器运行管道。

p.run().waitUntilFinish();
with beam.Pipeline(...) as p:
  [construction]
# p.run() automatically called
direct.Execute(context.Background(), p)

请注意,run 方法是异步的。对于阻塞执行,请对 run 调用返回的结果对象调用 waitUntilFinish wait_until_finish 方法。

在 Playground 中尝试完整示例

WordCount 示例

此词频统计示例介绍了一些推荐的编程实践,这些实践可以使您的管道更易于阅读、编写和维护。虽然没有明确要求,但它们可以使您的管道执行更加灵活,有助于测试您的管道,并有助于使您的管道代码可重用。

本节假设您已经很好地理解了构建管道的基本概念。如果您认为自己尚未达到这个水平,请阅读上一节 MinimalWordCount

要在 Java 中运行此示例

Java 词频统计快速入门 中所述设置您的开发环境并生成 Maven 原型。然后使用以下运行器之一运行管道

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                  --project=YOUR_PROJECT --region=GCE_REGION \
                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

要查看 Java 中的完整代码,请参见 **WordCount。**

要在 Python 中运行此示例

python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
# Running Beam Python on a distributed Flink cluster requires additional configuration.
# See /documentation/runners/flink/ for more information.
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
                                         --output /path/to/write/counts \
                                         --runner SparkRunner
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --region YOUR_GCP_REGION \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

要查看 Python 中的完整代码,请参见 **wordcount.py。**

要在 Go 中运行此示例

$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
$ wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --region your-gcp-region \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache/beam_go_sdk:latest
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

要查看 Go 中的完整代码,请参见 **wordcount.go。**

新概念

以下部分将详细解释这些关键概念,并将管道代码分解为更小的部分。

指定显式 DoFns

在使用ParDo转换时,您需要指定应用于输入PCollection中每个元素的处理操作。此处理操作是 SDK 类DoFn的子类。您可以为每个ParDo内联创建DoFn子类,作为匿名内部类的实例,就像在前面的示例(MinimalWordCount)中所做的那样。但是,通常最好在全局级别定义DoFn,这使其更易于进行单元测试,并使ParDo代码更易读。

在使用ParDo转换时,您需要指定应用于输入PCollection中每个元素的处理操作。此处理操作要么是命名函数,要么是带有特殊命名方法的结构体。您可以使用匿名函数(但不包括闭包)。但是,通常最好在全局级别定义DoFn,这使其更易于进行单元测试,并使ParDo代码更易读。

// In this example, ExtractWordsFn is a DoFn that is defined as a static class:

static class ExtractWordsFn extends DoFn<String, String> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
        ...
    }
}
# In this example, the DoFns are defined as classes:


class FormatAsTextFn(beam.DoFn):
  def process(self, element):
    word, count = element
    yield '%s: %s' % (word, count)

formatted = counts | beam.ParDo(FormatAsTextFn())
// In this example, extractFn is a DoFn that is defined as a function:

func extractFn(ctx context.Context, line string, emit func(string)) {
   ...
}

创建复合转换

如果您有一个由多个转换或ParDo步骤组成的处理操作,则可以将其创建为PTransform的子类。创建PTransform子类允许您封装复杂的转换,可以使您的管道结构更清晰、更模块化,并使单元测试更容易。

如果您有一个由多个转换或ParDo步骤组成的处理操作,则可以使用普通的 Go 函数来封装它们。此外,您还可以使用命名子范围将它们分组为复合转换,以便用于监控。

在此示例中,两个转换被封装为PTransform子类CountWordsCountWords包含运行ExtractWordsFnParDo以及 SDK 提供的Count转换。

在此示例中,两个转换被封装为CountWords函数。

在定义CountWords时,我们指定其最终的输入和输出;输入是提取操作的PCollection<String>,输出是计数操作生成的PCollection<KV<String, Long>>

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >>
      beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement())

counts = lines | CountWords()
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
	s = s.Scope("CountWords")

	// Convert lines of text into individual words.
	col := beam.ParDo(s, extractFn, lines)

	// Count the number of times each word occurs.
	return stats.Count(s, col)
}

使用可参数化的 PipelineOptions

您可以在运行管道时对各种执行选项进行硬编码。但是,更常见的方法是通过命令行参数解析定义您自己的配置选项。通过命令行定义配置选项可以使代码更容易移植到不同的运行程序中。

添加要由命令行解析器处理的参数,并为其指定默认值。然后,您可以在管道代码中访问选项值。

您可以使用标准的flag包来实现此目的。

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = pipeline | beam.io.ReadFromText(args.input_file)
var input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")

func main() {
    ...
    p := beam.NewPipeline()
    s := p.Root()

    lines := textio.Read(s, *input)
    ...

在 Playground 中尝试完整示例

DebuggingWordCount 示例

DebuggingWordCount 示例演示了一些有关为管道代码添加工具的最佳实践。

要在 Java 中运行此示例

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=JetRunner --jetLocalMode=3 --output=counts

要查看 Java 中的完整代码,请参阅 DebuggingWordCount

要在 Python 中运行此示例

python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

要查看 Python 中的完整代码,请参阅 wordcount_debugging.py

要在 Go 中运行此示例

$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
$ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                      --output gs://<your-gcs-bucket>/counts \
                      --runner dataflow \
                      --project your-gcp-project \
                      --region your-gcp-region \
                      --temp_location gs://<your-gcs-bucket>/tmp/ \
                      --staging_location gs://<your-gcs-bucket>/binaries/ \
                      --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

要查看 Go 中的完整代码,请参阅 debugging_wordcount.go

新概念

以下部分将详细解释这些关键概念,并将管道代码分解为更小的部分。

日志记录

每个运行程序可能会选择以自己的方式处理日志。

// This example uses .trace and .debug:

public class DebuggingWordCount {

  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (...) {
        ...
        LOG.debug("Matched: " + c.element().getKey());
      } else {
        ...
        LOG.trace("Did not match: " + c.element().getKey());
      }
    }
  }
}
# [START example_wordcount_debugging_aggregators]
import logging

class FilterTextFn(beam.DoFn):
  """A DoFn that filters for a specific key based on a regular expression."""
  def __init__(self, pattern):
    self.pattern = pattern
    # A custom metric can track values in your pipeline as it runs. Create
    # custom metrics matched_word and unmatched_words.
    self.matched_words = Metrics.counter(self.__class__, 'matched_words')
    self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')

  def process(self, element):
    word, _ = element
    if re.match(self.pattern, word):
      # Log at INFO level each element we match. When executing this pipeline
      # using the Dataflow service, these log lines will appear in the Cloud
      # Logging UI.
      logging.info('Matched %s', word)

      # Add 1 to the custom metric counter matched_words
      self.matched_words.inc()
      yield element
    else:
      # Log at the "DEBUG" level each element that is not matched. Different
      # log levels can be used to control the verbosity of logging providing
      # an effective mechanism to filter less important information. Note
      # currently only "INFO" and higher level logs are emitted to the Cloud
      # Logger. This log message will not be visible in the Cloud Logger.
      logging.debug('Did not match %s', word)

      # Add 1 to the custom metric counter umatched_words
      self.umatched_words.inc()
type filterFn struct {
    ...
}

func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
    if f.re.MatchString(word) {
         // Log at the "INFO" level each element that we match.
         log.Infof(ctx, "Matched: %v", word)
         emit(word, count)
    } else {
        // Log at the "DEBUG" level each element that is not matched.
        log.Debugf(ctx, "Did not match: %v", word)
    }
}

Direct Runner

在使用DirectRunner执行管道时,您可以将日志消息直接打印到本地控制台。如果您使用的是 Java 的 Beam SDK,则必须将Slf4j添加到您的类路径中。

Cloud Dataflow Runner

在使用DataflowRunner执行管道时,可以使用 Stackdriver Logging。Stackdriver Logging 会将来自所有 Cloud Dataflow 作业工作程序的日志聚合到 Google Cloud Platform 控制台中的单个位置。您可以使用 Stackdriver Logging 搜索和访问来自 Cloud Dataflow 为完成您的作业而启动的所有工作程序的日志。管道中DoFn实例中的日志语句将在管道运行时出现在 Stackdriver Logging 中。

您还可以控制工作程序日志级别。默认情况下,执行用户代码的 Cloud Dataflow 工作程序配置为将日志记录到 Stackdriver Logging,日志级别为“INFO”及更高级别。您可以通过指定以下内容来覆盖特定日志记录命名空间的日志级别:--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}。例如,通过在使用 Cloud Dataflow 服务执行管道时指定--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"},Stackdriver Logging 将除了默认的“INFO”或更高级别日志之外,还包含该包的“DEBUG”或更高级别日志。

默认的 Cloud Dataflow 工作程序日志记录配置可以通过指定--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>来覆盖。例如,通过在使用 Cloud Dataflow 服务执行管道时指定--defaultWorkerLogLevel=DEBUG,Cloud Logging 将包含所有“DEBUG”或更高级别日志。请注意,将默认工作程序日志级别更改为 TRACE 或 DEBUG 会显着增加输出的日志数量。

Apache Spark Runner

注意:此部分尚未添加。对此有一个未解决的问题(问题 18076)。

注意:此部分尚未添加。对此有一个未解决的问题(问题 18075)。

Apache Nemo Runner

在使用NemoRunner执行管道时,大多数日志消息将直接打印到本地控制台。您应该将Slf4j添加到您的类路径中,以充分利用日志。为了观察驱动程序和执行程序两侧的日志,您应该观察 Apache REEF 创建的文件夹。例如,在通过本地运行时运行管道时,将在您的工作目录中创建一个名为REEF_LOCAL_RUNTIME的文件夹,并且日志和指标信息都可以在该目录下找到。

使用断言测试您的管道

PAssertassert_that 是一组方便的 PTransforms,它们类似于 Hamcrest 的集合匹配器,可以在编写管道级测试时用于验证 PCollections 的内容。断言最适合在使用小型数据集的单元测试中使用。

passert 包含方便的 PTransforms,可以在编写管道级测试时用于验证 PCollections 的内容。断言最适合在使用小型数据集的单元测试中使用。

以下示例验证过滤后的单词集是否与我们预期的计数相匹配。断言不会产生任何输出,并且只有在满足所有期望时,管道才会成功。

以下示例验证两个集合是否包含相同的值。断言不会产生任何输出,并且只有在满足所有期望时,管道才会成功。

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

with TestPipeline() as p:
  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
...
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")

有关示例单元测试,请参阅 DebuggingWordCountTest

在 Playground 中尝试完整示例

WindowedWordCount 示例

WindowedWordCount 示例像之前的示例一样计算文本中的单词,但它介绍了几个高级概念。

新概念

以下部分将详细解释这些关键概念,并将管道代码分解为更小的部分。

要在 Java 中运行此示例

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

要查看 Java 中的完整代码,请参阅 WindowedWordCount

要在 Python 中运行此示例

此管道使用--output_table参数将其结果写入 BigQuery 表中。格式为PROJECT:DATASET.TABLEDATASET.TABLE

python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
                                         --output_table PROJECT:DATASET.TABLE \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

要查看 Python 中的完整代码,请参阅 windowed_wordcount.py

要在 Go 中运行此示例

$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
$ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

要查看 Go 中的完整代码,请参阅 windowed_wordcount.go

无界和有界数据集

Beam 允许您创建一个可以处理有界和无界数据集的单个管道。如果您的数据集具有固定数量的元素,则它是有界数据集,并且可以将所有数据一起处理。对于有界数据集,要问的问题是“我是否拥有所有数据?”如果数据不断到达(例如,移动游戏示例中无尽的游戏得分流),则它是一个无界数据集。无界数据集在任何时间都不可用于处理,因此必须使用持续运行的流式管道来处理数据。数据集只会在某个特定时间点之前完成,因此要问的问题是“我是否拥有直到某个时间点的所有数据?”Beam 使用窗口将不断更新的数据集划分为有限大小的逻辑窗口。如果您的输入是无界的,则必须使用支持流式传输的运行程序。

如果您的管道输入是有界的,那么所有下游 PCollections 也是有界的。类似地,如果输入是无界的,那么管道的下游 PCollections 都是无界的,尽管独立的分支可能是独立有界的。

回想一下,此示例的输入是莎士比亚作品集,它是一组有限的数据。因此,此示例从文本文件读取有界数据

public static void main(String[] args) throws IOException {
    Options options = ...
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline
      .apply(TextIO.read().from(options.getInputFile()))
def main(arvg=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input-file',
                      dest='input_file',
                      default='/Users/home/words-example.txt')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)
  lines  = p | 'read' >> ReadFromText(known_args.input_file)
func main() {
   ...
   p := beam.NewPipeline()
   s := p.Root()

   lines := textio.Read(s, *input)
   ...
}

向数据添加时间戳

PCollection 中的每个元素都具有关联的时间戳。每个元素的时间戳最初由创建PCollection 的源分配。一些创建无界 PCollections 的源可以为每个新元素分配对应于读取或添加元素的时间的时间戳。您可以使用DoFn手动分配或调整时间戳;但是,您只能将时间戳向前移动。

在此示例中,输入是有界的。为了示例的目的,名为AddTimestampsFnDoFn 方法(由ParDo 调用)将为PCollection 中的每个元素设置时间戳。

.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
beam.Map(AddTimestampFn(min_timestamp, max_timestamp))
timestampedLines := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, lines)

以下是AddTimestampFn 的代码,它是一个由ParDo 调用的DoFn,它根据元素本身设置时间戳的数据元素。例如,如果元素是日志行,则此ParDo 可以从日志字符串中解析时间并将其设置为元素的时间戳。莎士比亚作品中没有内在的时间戳,因此在这种情况下,我们只是编造了随机时间戳来说明这个概念。输入文本的每一行都将在 2 小时内获得一个随机关联的时间戳。

static class AddTimestampFn extends DoFn<String, String> {
  private final Instant minTimestamp;
  private final Instant maxTimestamp;

  AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
    this.minTimestamp = minTimestamp;
    this.maxTimestamp = maxTimestamp;
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    Instant randomTimestamp =
      new Instant(
          ThreadLocalRandom.current()
          .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

    /**
     * Concept #2: Set the data element with that timestamp.
     */
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}
class AddTimestampFn(beam.DoFn):

  def __init__(self, min_timestamp, max_timestamp):
     self.min_timestamp = min_timestamp
     self.max_timestamp = max_timestamp

  def process(self, element):
    return window.TimestampedValue(
       element,
       random.randint(self.min_timestamp, self.max_timestamp))
type addTimestampFn struct {
	Min beam.EventTime `json:"min"`
}

func (f *addTimestampFn) ProcessElement(x beam.X) (beam.EventTime, beam.X) {
	timestamp := f.Min.Add(time.Duration(rand.Int63n(2 * time.Hour.Nanoseconds())))
	return timestamp, x
}

请注意,使用beam.X“类型变量”允许转换用于任何类型。

窗口化

Beam 使用称为窗口的概念来将PCollection细分为有界元素集。聚合多个元素的 PTransforms 将每个PCollection 处理为多个有限窗口的连续序列,即使整个集合本身可能具有无限大小(无界)。

WindowedWordCount 示例应用了固定时间窗口,其中每个窗口代表一个固定时间间隔。此示例的固定窗口大小默认为 1 分钟(您可以使用命令行选项更改此大小)。

PCollection<String> windowedWords = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
windowed_words = input | beam.WindowInto(window.FixedWindows(60 * window_size_minutes))
windowedLines := beam.WindowInto(s, window.NewFixedWindows(time.Minute), timestampedLines)

在窗口化 PCollection 上重用 PTransforms

您还可以重用为操作简单 PCollections 而创建的现有 PTransforms 来操作窗口化 PCollections。

PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
word_counts = windowed_words | CountWords()
counted := wordcount.CountWords(s, windowedLines)

在 Playground 中尝试完整示例

StreamingWordCount 示例

StreamingWordCount 示例是一个流式管道,它从 Pub/Sub 订阅或主题读取 Pub/Sub 消息,并对每条消息中的单词执行频率计数。与 WindowedWordCount 类似,此示例应用了固定时间窗口,其中每个窗口代表一个固定时间间隔。此示例的固定窗口大小为 15 秒。管道输出每个 15 秒窗口中看到的单词的频率计数。

新概念

要在 Java 中运行此示例

注意:StreamingWordCount 尚未适用于 Java SDK。

要在 Python 中运行此示例

python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

要查看 Python 中的完整代码,请参阅 streaming_wordcount.py

要在 Go 中运行此示例

注意:StreamingWordCount 尚未适用于 Go SDK。对此有一个未解决的问题(问题 18879)。

读取无界数据集

此示例使用无界数据集作为输入。代码使用beam.io.ReadFromPubSub从 Pub/Sub 订阅或主题读取 Pub/Sub 消息。

  // This example is not currently available for the Beam SDK for Java.
  # Read from Pub/Sub into a PCollection.
  if known_args.input_subscription:
    data = p | beam.io.ReadFromPubSub(
        subscription=known_args.input_subscription)
  else:
    data = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  lines = data | 'DecodeString' >> beam.Map(lambda d: d.decode('utf-8'))
  // This example is not currently available for the Beam SDK for Go.

写入无界结果

当输入是无界时,输出 `PCollection` 也是无界的。因此,您必须确保选择适合结果的 I/O。某些 I/O 仅支持有界输出,而另一些则支持有界和无界输出。

此示例使用无界 `PCollection` 并将结果流式传输到 Google Pub/Sub。代码格式化结果并使用 beam.io.WriteToPubSub 将其写入 Pub/Sub 主题。

  // This example is not currently available for the Beam SDK for Java.
  # Write to Pub/Sub
  _ = (output
    | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
    | beam.io.WriteToPubSub(known_args.output_topic))
  // This example is not currently available for the Beam SDK for Go.

后续步骤

如果您遇到任何问题,请随时 联系我们