Nexmark 基准套件

它是什么

Nexmark 是一套受 Nexmark 研究论文 中的“连续数据流”查询启发的管道。

这些是在表示在线拍卖系统的三个实体模型上的多个查询。

查询

查询练习了 Beam 模型的许多方面。

我们已经用五个查询补充了原始查询。

基准工作负载配置

以下是基准工作负载的一些旋钮(请参阅 NexmarkConfiguration.java)。

这些配置项可以传递到启动命令行。

事件生成(默认值)

窗口(默认值)

事件比例(默认值)

技术

Nexmark 输出

以下是在 (本地) Direct 运行器上使用 SMOKE 套件以流式模式运行 Nexmark 基准的示例输出。

Performance:
  Conf       Runtime(sec)         Events(/sec)         Results
  0000                5,5              18138,9          100000
  0001                4,2              23657,4           92000
  0002                2,2              45683,0             351
  0003                3,9              25348,5             444
  0004                1,6               6207,3              40
  0005                5,0              20173,5              12
  0006                0,9              11376,6             401
  0007              121,4                823,5               1
  0008                2,5              40273,9            6000
  0009                0,9              10695,2             298
  0010                4,0              25025,0               1
  0011                4,4              22655,2            1919
  0012                3,5              28208,7            1919

基准启动配置

Nexmark 启动程序接受 --runner 参数,与通常使用 Beam PipelineOptions 管理其命令行参数的程序一样。此外,必须配置必要的依赖项。

通过 Gradle 运行时,以下两个参数控制执行

-P nexmark.args
    The command line to pass to the Nexmark main program.

-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
    `settings.gradle.kts`.

测试数据根据需要确定性地合成。测试数据可以在与查询本身相同的管道中合成,也可以发布到 Pub/Sub 或 Kafka。

查询结果可以

常见配置参数

决定是批处理还是流式

--streaming=true

事件生成器的数量

--numEventGenerators=4

查询可以通过名称或编号运行(编号仍然存在以确保向后兼容性,只有查询 0 到 12 具有编号)

运行查询 N

--query=N

运行名为 PASSTHROUGH 的查询

--query=PASSTHROUGH

可用套件

可以使用此配置参数选择要运行的套件

--suite=SUITE

可用套件为

Google Cloud Dataflow 运行器特定配置

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar

Direct 运行器特定配置

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores

Spark 运行器特定配置

--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true

Kafka 源/接收器配置参数

设置 Kafka 主机/IP(例如,“localhost:9092”)

--bootstrapServers=<kafka host/ip>

将结果写入 Kafka 主题

--sinkType=KAFKA

设置将用于基准结果的主题名称

--kafkaResultsTopic=<topic name>

将事件写入或读取到/从 Kafka 主题

--sourceType=KAFKA

设置将用于基准事件的主题名称

--kafkaTopic=<topic name>

当前状态

这些表包含查询在不同运行器中运行状态。Google Cloud Dataflow 状态尚未公布。

批处理/合成/本地

查询DirectSparkFlink
0正常正常正常
1正常正常正常
2正常正常正常
3正常正常正常
4正常正常正常
5正常正常正常
6正常正常正常
7正常正常正常
8正常正常正常
9正常正常正常
10正常正常正常
11正常正常正常
12正常正常正常
BOUNDED_SIDE_INPUT_JOIN正常正常正常

流式/合成/本地

查询DirectSpark Issue 18416Flink
0正常正常正常
1正常正常正常
2正常正常正常
3正常Issue 18074BEAM-3961正常
4正常正常正常
5正常正常正常
6正常正常正常
7正常BEAM-2112正常
8正常正常正常
9正常正常正常
10正常正常正常
11正常正常正常
12正常正常正常
BOUNDED_SIDE_INPUT_JOIN正常BEAM-2112正常

批处理/合成/集群

即将推出

流式/合成/集群

即将推出

运行 Nexmark

在 DirectRunner(本地)上运行 SMOKE 套件

DirectRunner 是默认的,因此不需要传递 -Pnexmark.runner。这里我们这样做是为了最大程度的清晰度。

DirectRunner 没有单独的批处理和流式模式,但 Nexmark 启动有。

这些参数保留了 DirectRunner 的许多额外的安全检查,以便 SMOKE 套件可以确保 Nexmark 套件中没有错误。

批处理模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=false
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

流式模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=true
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

在 SparkRunner(本地)上运行 SMOKE 套件

SparkRunner 在 Nexmark Gradle 启动中是特殊情况。任务将提供 SparkRunner 所构建的 Spark 版本,并配置日志记录。

批处理模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true"

流式模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true"

在 FlinkRunner(本地)上运行 SMOKE 套件

批处理模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

流式模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

在 Google Cloud Dataflow 上运行 SMOKE 套件

首先设置这些,以便以下命令有效

PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>

启动

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:google-cloud-dataflow-java" \
    -Pnexmark.args="
        --runner=DataflowRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --project=${PROJECT}
        --zone=${ZONE}
        --workerMachineType=n1-highmem-8
        --stagingLocation=${STAGING_LOCATION}
        --sourceType=PUBSUB
        --pubSubMode=PUBLISH_ONLY
        --pubsubTopic=${PUBSUB_TOPIC}
        --resourceNameMode=VERBATIM
        --manageResources=false
        --numEventGenerators=64
        --numWorkers=16
        --maxNumWorkers=16
        --firstEventRate=100000
        --nextEventRate=100000
        --ratePeriodSec=3600
        --isRateLimited=true
        --avgPersonByteSize=500
        --avgAuctionByteSize=500
        --avgBidByteSize=500
        --probDelayedEvent=0.000001
        --occasionalDelaySec=3600
        --numEvents=0
        --useWallclockEventTime=true
        --usePubsubPublishTime=true
        --experiments=enable_custom_pubsub_sink"

使用 Apache Hadoop YARN 在 Spark 集群上运行查询 0

构建软件包

./gradlew :sdks:java:testing:nexmark:assemble

提交到集群

spark-submit \
    --class org.apache.beam.sdk.nexmark.Main \
    --master yarn-client \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
        --runner=SparkRunner \
        --query=0 \
        --streamTimeout=60 \
        --streaming=false \
        --manageResources=false \
        --monitorJobs=true"

Nexmark 仪表板

以下仪表板用作 CI 机制,以检测 Beam 组件的无回归。它们不应是运行器或引擎的基准比较。特别是因为

仪表板内容

在 master 上的每次提交时,Nexmark 套件都会运行,并在图表上创建绘图。所有指标仪表板都托管在 metrics.beam.apache.org 上。

有两种类型的仪表板

这些运行器有仪表板(其他运行器即将推出)

每个仪表盘包含