Java 版 WordCount 快速入门

本快速入门向您展示如何设置 Java 开发环境并运行一个使用 Apache Beam Java SDK 编写的 示例管道,使用您选择的 运行器

如果您有兴趣为 Apache Beam Java 代码库做出贡献,请参阅 贡献指南

本页内容

设置您的开发环境

  1. 下载并安装 Java 开发工具包 (JDK) 版本 8、11 或 17。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装目录。
  2. 下载并安装 Apache Maven,方法是按照您的操作系统的 安装指南 进行操作。
  3. 可选:如果您想将 Maven 项目转换为 Gradle,请安装 Gradle

获取示例代码

  1. 生成一个针对最新 Beam 版本构建的 Maven 示例项目

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.60.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
       
    mvn archetype:generate `
      -D archetypeGroupId=org.apache.beam `
      -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
      -D archetypeVersion=2.60.0 `
      -D groupId=org.example `
      -D artifactId=word-count-beam `
      -D version="0.1" `
      -D package=org.apache.beam.examples `
      -D interactiveMode=false
       

    Maven 在 word-count-beam 目录中创建一个新项目。

  2. 进入 word-count-beam

    cd word-count-beam/
       
    cd .\word-count-beam
       
    该目录包含一个 pom.xml 和一个包含示例管道的 src 目录。

  3. 列出示例管道

    ls src/main/java/org/apache/beam/examples/
       
    dir .\src\main\java\org\apache\beam\examples
       
    您应该看到以下示例

    本教程中使用的示例 WordCount.java 定义了一个 Beam 管道,该管道从输入文件(默认情况下,一个包含莎士比亚的“李尔王”的 .txt 文件)中统计单词。要了解有关这些示例的更多信息,请参阅 WordCount 示例演练

可选:从 Maven 转换为 Gradle

以下步骤说明了如何为以下运行器将构建从 Maven 转换为 Gradle

其他运行器的转换过程类似。有关更多指导,请参阅 从 Apache Maven 迁移构建

  1. 在包含 pom.xml 文件的目录中,运行自动 Maven 到 Gradle 转换
    gradle init
       
    您将被问及是否要生成一个 Gradle 构建。输入 yes。您还将被提示选择 DSL(Groovy 或 Kotlin)。对于本教程,请输入 2 选择 Kotlin。
  2. 打开生成的 build.gradle.kts 文件并进行以下更改
    1. repositories 中,将 mavenLocal() 替换为 mavenCentral()
    2. repositories 中,声明一个 Confluent Kafka 依赖项的存储库
      maven {
          url = uri("https://packages.confluent.io/maven/")
      }
            
    3. 在构建脚本的末尾,添加以下条件依赖项
      if (project.hasProperty("dataflow-runner")) {
          dependencies {
              runtimeOnly("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.60.0")
          }
      }
            
    4. 在构建脚本的末尾,添加以下任务
      tasks.register<JavaExec>("execute") {
        mainClass.set(System.getProperty("mainClass"))
        classpath = sourceSets.main.get().runtimeClasspath
      }
            
  3. 构建您的项目
    gradle build
       

获取示例文本

如果您计划使用 DataflowRunner,可以跳过此步骤。运行器将直接从 Google Cloud Storage 中提取文本。

  1. word-count-beam 目录中,创建一个名为 sample.txt 的文件。
  2. 在文件中添加一些文本。对于本示例,请使用莎士比亚的 李尔王 的文本。

运行管道

单个 Beam 管道可以在多个 Beam 运行器 上运行。 DirectRunner 对于入门非常有用,因为它在您的机器上运行,并且不需要任何特定设置。如果您只是尝试使用 Beam 并且不确定使用什么,请使用 DirectRunner

运行管道的常规过程如下

  1. 完成任何特定于运行器的设置。
  2. 构建您的命令行
    1. 使用 --runner=<runner> 指定运行器(默认值为 DirectRunner)。
    2. 添加任何特定于运行器的必需选项。
    3. 选择运行器可访问的输入文件和输出位置。(例如,如果您在外部集群上运行管道,则无法访问本地文件。)
  3. 运行命令。

要运行 WordCount 管道

  1. 按照运行器的设置步骤进行操作

    DirectRunner 无需额外设置即可运行。

  2. 运行下面相应的 Maven 或 Gradle 命令。

使用 Maven 运行 WordCount

对于 Unix shell

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=counts" -Pdirect-runner
mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                 --inputFile=sample.txt --output=/tmp/counts" -Pflink-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -Pspark-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
                 --region=<your-gcp-region> \
                 --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                 --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
    -Pdataflow-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -Psamza-runner
mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -Pjet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/sample.txt --output=counts

对于 Windows PowerShell

mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--inputFile=sample.txt --output=counts" -P direct-runner
mvn package exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=.\target\word-count-beam-bundled-0.1.jar `
               --inputFile=C:\path\to\quickstart\sample.txt --output=C:\tmp\counts" -P flink-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -P spark-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=DataflowRunner --project=<your-gcp-project> `
               --region=<your-gcp-region> \
               --gcpTempLocation=gs://<your-gcs-bucket>/tmp `
               --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" `
 -P dataflow-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
    -D exec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -P samza-runner
mvn package -P nemo-runner -DskipTests
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -P jet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=JetRunner --jetLocalMode=3 --inputFile=$pwd/sample.txt --output=counts

使用 Gradle 运行 WordCount

对于 Unix shell

gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--inputFile=sample.txt --output=counts"
TODO: document FlinkCluster on Gradle: https://github.com/apache/beam/issues/21499
TODO: document Spark on Gradle: https://github.com/apache/beam/issues/21502
gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--project=<your-gcp-project> --inputFile=gs://apache-beam-samples/shakespeare/* \
    --output=gs://<your-gcs-bucket>/counts --runner=DataflowRunner" -Pdataflow-runner
TODO: document Samza on Gradle: https://github.com/apache/beam/issues/21500
TODO: document Nemo on Gradle: https://github.com/apache/beam/issues/21503
TODO: document Jet on Gradle: https://github.com/apache/beam/issues/21501

检查结果

管道完成后,您可以查看输出。可能会有多个以 count 为前缀的输出文件。输出文件的数量由运行器决定,使其具有灵活地进行高效的分布式执行的灵活性。

  1. 在 Unix shell 中查看输出文件
    ls counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    gsutil ls gs://<your-gcs-bucket>/counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    ls counts*
       
    输出文件包含唯一的单词以及每个单词出现的次数。
  2. 在 Unix shell 中查看输出内容
    more counts*
       
    more /tmp/counts*
       
    more counts*
       
    gsutil cat gs://<your-gcs-bucket>/counts*
       
    more /tmp/counts*
       
    more counts*
       
    more counts*
       
    元素的顺序没有保证,以允许运行器为效率进行优化。但输出应该类似于以下内容
    ...
    Think: 3
    slower: 1
    Having: 1
    revives: 1
    these: 33
    wipe: 1
    arrives: 1
    concluded: 1
    begins: 3
    ...
    

下一步

如果您遇到任何问题,请随时 联系我们