Nexmark 基准套件
它是什么
Nexmark 是一套受 Nexmark 研究论文 中的“连续数据流”查询启发的管道。
这些是在表示在线拍卖系统的三个实体模型上的多个查询。
- Person 代表提交拍卖物品或对拍卖出价的人。
- Auction 代表拍卖中的物品。
- Bid 代表对拍卖中物品的出价。
查询
查询练习了 Beam 模型的许多方面。
- Query1 或 CURRENCY_CONVERSION:欧元的出价值是多少?说明一个简单的映射。
- Query2 或 SELECTION:拍卖号为特定值的拍卖有哪些?说明一个简单的过滤器。
- Query3 或 LOCAL_ITEM_SUGGESTION:谁在美国的特定州销售?说明增量联接(使用每个键状态和计时器)和过滤器。
- Query4 或 AVERAGE_PRICE_FOR_CATEGORY:每个拍卖类别的平均售价是多少?说明复杂联接(使用自定义窗口函数)和聚合。
- Query5 或 HOT_ITEMS:哪些拍卖在过去一段时间内收到的出价最多?说明滑动窗口和组合器。
- Query6 或 AVERAGE_SELLING_PRICE_BY_SELLER:卖家最近 10 次已关闭的拍卖的平均售价是多少?与 Query4 共享相同的“中标出价”核心,并说明专门的组合器。
- Query7 或 HIGHEST_BID:每段时间的最高出价是多少?故意使用侧输入来实现以说明扇出。
- Query8 或 MONITOR_NEW_USERS:谁在过去一段时间内进入系统并创建了拍卖?说明一个简单的联接。
我们已经用五个查询补充了原始查询。
- Query0 或 PASSTHROUGH:直通。让我们测量监控开销。
- Query9 或 WINNING_BIDS:中标出价。Query4 和 Query6 共享的通用子查询。
- Query10 或 LOG_TO_SHARDED_FILES:将所有事件记录到 GCS 文件中。说明带有对触发的大量副作用的窗口。
- Query11 或 USER_SESSIONS:用户在每个活跃会话中进行了多少次出价?说明会话窗口。
- Query12 或 PROCESSING_TIME_WINDOWS:用户在固定的处理时间限制内进行多少次出价?说明在全局窗口中使用处理时间,与所有其他查询在非全局窗口中使用事件时间进行对比。
- BOUNDED_SIDE_INPUT_JOIN:将流联接到有界侧输入,模拟基本的流富集。
基准工作负载配置
以下是基准工作负载的一些旋钮(请参阅 NexmarkConfiguration.java)。
这些配置项可以传递到启动命令行。
事件生成(默认值)
- 生成了 100,000 个事件
- 100 个生成器线程
- SIN 曲线中的事件速率
- 初始事件速率为 10,000
- 事件速率步长为 10,000
- 100 个并发拍卖
- 1,000 个并发出价/创建拍卖的人员
窗口(默认值)
- 大小 10 秒
- 滑动周期 5 秒
- 水印保持 0 秒
事件比例(默认值)
- 热门拍卖 = ½
- 热门出价者 = ¼
- 热门卖家 = ¼
技术
- 人工 CPU 负载
- 人工 I/O 负载
Nexmark 输出
以下是在 (本地) Direct 运行器上使用 SMOKE 套件以流式模式运行 Nexmark 基准的示例输出。
Performance: Conf Runtime(sec) Events(/sec) Results 0000 5,5 18138,9 100000 0001 4,2 23657,4 92000 0002 2,2 45683,0 351 0003 3,9 25348,5 444 0004 1,6 6207,3 40 0005 5,0 20173,5 12 0006 0,9 11376,6 401 0007 121,4 823,5 1 0008 2,5 40273,9 6000 0009 0,9 10695,2 298 0010 4,0 25025,0 1 0011 4,4 22655,2 1919 0012 3,5 28208,7 1919
基准启动配置
Nexmark 启动程序接受 --runner
参数,与通常使用 Beam PipelineOptions 管理其命令行参数的程序一样。此外,必须配置必要的依赖项。
通过 Gradle 运行时,以下两个参数控制执行
-P nexmark.args
The command line to pass to the Nexmark main program.
-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
`settings.gradle.kts`.
测试数据根据需要确定性地合成。测试数据可以在与查询本身相同的管道中合成,也可以发布到 Pub/Sub 或 Kafka。
查询结果可以
- 发布到 Pub/Sub 或 Kafka。
- 写入纯文本的文本文件。
- 使用 Avro 编码写入文本文件。
- 发送到 BigQuery。
- 丢弃。
常见配置参数
决定是批处理还是流式
--streaming=true
事件生成器的数量
--numEventGenerators=4
查询可以通过名称或编号运行(编号仍然存在以确保向后兼容性,只有查询 0 到 12 具有编号)
运行查询 N
--query=N
运行名为 PASSTHROUGH 的查询
--query=PASSTHROUGH
可用套件
可以使用此配置参数选择要运行的套件
--suite=SUITE
可用套件为
- DEFAULT:使用查询 0 测试默认配置。
- SMOKE:使用默认配置运行所有查询。
- STRESS:与 smoke 相似,但适用于 100 万个事件。
- FULL_THROTTLE:与 SMOKE 相似,但适用于 1 亿个事件。
Google Cloud Dataflow 运行器特定配置
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar
Direct 运行器特定配置
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
Flink 运行器特定配置
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores
Spark 运行器特定配置
--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
Kafka 源/接收器配置参数
设置 Kafka 主机/IP(例如,“localhost:9092”)
--bootstrapServers=<kafka host/ip>
将结果写入 Kafka 主题
--sinkType=KAFKA
设置将用于基准结果的主题名称
--kafkaResultsTopic=<topic name>
将事件写入或读取到/从 Kafka 主题
--sourceType=KAFKA
设置将用于基准事件的主题名称
--kafkaTopic=<topic name>
当前状态
这些表包含查询在不同运行器中运行状态。Google Cloud Dataflow 状态尚未公布。
批处理/合成/本地
查询 | Direct | Spark | Flink |
---|---|---|---|
0 | 正常 | 正常 | 正常 |
1 | 正常 | 正常 | 正常 |
2 | 正常 | 正常 | 正常 |
3 | 正常 | 正常 | 正常 |
4 | 正常 | 正常 | 正常 |
5 | 正常 | 正常 | 正常 |
6 | 正常 | 正常 | 正常 |
7 | 正常 | 正常 | 正常 |
8 | 正常 | 正常 | 正常 |
9 | 正常 | 正常 | 正常 |
10 | 正常 | 正常 | 正常 |
11 | 正常 | 正常 | 正常 |
12 | 正常 | 正常 | 正常 |
BOUNDED_SIDE_INPUT_JOIN | 正常 | 正常 | 正常 |
流式/合成/本地
查询 | Direct | Spark Issue 18416 | Flink |
---|---|---|---|
0 | 正常 | 正常 | 正常 |
1 | 正常 | 正常 | 正常 |
2 | 正常 | 正常 | 正常 |
3 | 正常 | Issue 18074,BEAM-3961 | 正常 |
4 | 正常 | 正常 | 正常 |
5 | 正常 | 正常 | 正常 |
6 | 正常 | 正常 | 正常 |
7 | 正常 | BEAM-2112 | 正常 |
8 | 正常 | 正常 | 正常 |
9 | 正常 | 正常 | 正常 |
10 | 正常 | 正常 | 正常 |
11 | 正常 | 正常 | 正常 |
12 | 正常 | 正常 | 正常 |
BOUNDED_SIDE_INPUT_JOIN | 正常 | BEAM-2112 | 正常 |
批处理/合成/集群
即将推出
流式/合成/集群
即将推出
运行 Nexmark
在 DirectRunner(本地)上运行 SMOKE 套件
DirectRunner 是默认的,因此不需要传递 -Pnexmark.runner
。这里我们这样做是为了最大程度的清晰度。
DirectRunner 没有单独的批处理和流式模式,但 Nexmark 启动有。
这些参数保留了 DirectRunner 的许多额外的安全检查,以便 SMOKE 套件可以确保 Nexmark 套件中没有错误。
批处理模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=false
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
流式模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=true
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
在 SparkRunner(本地)上运行 SMOKE 套件
SparkRunner 在 Nexmark Gradle 启动中是特殊情况。任务将提供 SparkRunner 所构建的 Spark 版本,并配置日志记录。
批处理模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true"
流式模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true"
在 FlinkRunner(本地)上运行 SMOKE 套件
批处理模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
流式模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
在 Google Cloud Dataflow 上运行 SMOKE 套件
首先设置这些,以便以下命令有效
PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>
启动
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:google-cloud-dataflow-java" \
-Pnexmark.args="
--runner=DataflowRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--project=${PROJECT}
--zone=${ZONE}
--workerMachineType=n1-highmem-8
--stagingLocation=${STAGING_LOCATION}
--sourceType=PUBSUB
--pubSubMode=PUBLISH_ONLY
--pubsubTopic=${PUBSUB_TOPIC}
--resourceNameMode=VERBATIM
--manageResources=false
--numEventGenerators=64
--numWorkers=16
--maxNumWorkers=16
--firstEventRate=100000
--nextEventRate=100000
--ratePeriodSec=3600
--isRateLimited=true
--avgPersonByteSize=500
--avgAuctionByteSize=500
--avgBidByteSize=500
--probDelayedEvent=0.000001
--occasionalDelaySec=3600
--numEvents=0
--useWallclockEventTime=true
--usePubsubPublishTime=true
--experiments=enable_custom_pubsub_sink"
使用 Apache Hadoop YARN 在 Spark 集群上运行查询 0
构建软件包
./gradlew :sdks:java:testing:nexmark:assemble
提交到集群
spark-submit \
--class org.apache.beam.sdk.nexmark.Main \
--master yarn-client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
--runner=SparkRunner \
--query=0 \
--streamTimeout=60 \
--streaming=false \
--manageResources=false \
--monitorJobs=true"
Nexmark 仪表板
以下仪表板用作 CI 机制,以检测 Beam 组件的无回归。它们不应是运行器或引擎的基准比较。特别是因为
- 运行器的参数不相同
- Nexmark 在本地(大多数情况下是嵌入式)模式下使用运行器运行
- Nexmark 在运行所有 CI 和构建的共享机器上运行。
- 运行器对 Beam 模型的支持不同
- 运行器具有不同的优势,这使得比较变得困难
- 一些运行器被设计为面向批处理,另一些则面向流式处理
- 有些被设计为亚秒级延迟,另一些则支持自动扩展
仪表板内容
在 master 上的每次提交时,Nexmark 套件都会运行,并在图表上创建绘图。所有指标仪表板都托管在 metrics.beam.apache.org 上。
有两种类型的仪表板
- 一个用于性能(查询的运行时间)
- 一个用于输出 PCollection 的大小(应该保持不变)
这些运行器有仪表板(其他运行器即将推出)
- spark
- flink
- direct 运行器
- 数据流
每个仪表盘包含
- 批处理模式图表
- 流式模式图表
- 所有查询的图表。