尝试 Apache Beam

您可以使用我们的交互式笔记本尝试 Apache Beam 管道。

Colab 中的交互式 WordCount

此交互式笔记本向您展示了 WordCount 的简单最小版本。

package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

public class WordCount {
  public static void main(String[] args) {
    String inputsDir = "data/*";
    String outputsPrefix = "outputs/part";

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read lines", TextIO.read().from(inputsDir))
        .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
        .apply("Count words", Count.perElement())
        .apply("Write results", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                  wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to(outputsPrefix));
    pipeline.run();
  }
}

在 Colab 中运行 在 GitHub 上查看

要了解如何在自己的计算机上安装和运行 Apache Beam Java SDK,请按照 Java 快速入门中的说明操作。

import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

在 Colab 中运行 在 GitHub 上查看

要了解如何在自己的计算机上安装和运行 Apache Beam Python SDK,请按照 Python 快速入门中的说明操作。

package main

import (
	"context"
	"flag"
	"fmt"
	"regexp"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
)

var (
	input = flag.String("input", "data/*", "File(s) to read.")
	output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func main() {
  flag.Parse()

	beam.Init()

	pipeline := beam.NewPipeline()
	root := pipeline.Root()

	lines := textio.Read(root, *input)
	words := beam.ParDo(root, func(line string, emit func(string)) {
		for _, word := range wordRE.FindAllString(line, -1) {
			emit(word)
		}
	}, lines)
	counted := stats.Count(root, words)
	formatted := beam.ParDo(root, func(word string, count int) string {
		return fmt.Sprintf("%s: %v", word, count)
	}, counted)
	textio.Write(root, *output, formatted)

	direct.Execute(context.Background(), pipeline)
}

在 Colab 中运行 在 GitHub 上查看

要了解如何在自己的计算机上安装和运行 Apache Beam Go SDK,请按照 Go 快速入门中的说明操作。

有关 WordCount 工作原理的更详细说明,请参阅 WordCount 示例演练

下一步

如果您遇到任何问题,请不要犹豫 与我们联系