使用 BatchElements
转换动态分组元素
为什么要批处理元素?
Apache Beam 提供了 BatchElements
转换 来摊销后续操作的处理时间。这最适合应用于那些每次调用都有相当大的固定成本,而每次调用中处理的每个元素的成本较小的操作。简单地说,批处理元素是在效率更高的操作之前进行的优化步骤,这些操作一次处理多个元素,与一次处理一个元素相比,效率更高。
示例:RunInference 中的批处理
在 RunInference
的上下文中,批处理元素主要用于减少对用于推理的模型的调用次数。这对于在工作程序推理上下文中提高效率很重要,但在查看发送到远程服务的推理调用时影响更大。当 API 速率限制需要考虑时,尽可能地批处理元素是减少超过配额风险的重要方法。RunInference
框架 总是 调用 BatchElements
,进一步突出了此步骤对于推理工作负载的重要性。
捆绑内的批处理
用于批处理元素的主要方法是在元素捆绑内进行批处理。遍历捆绑,将元素追加到捆绑,直到达到目标批处理大小,然后发出。同一批次中的元素保证存在于同一事件时间窗口中,因为批处理函数是窗口感知的。如果在达到捆绑结束时缓冲了一个未完成的批次,则会发出该批次。
参数调整注意事项
这种方式下批处理的大部分调整围绕着用于根据下游性能估计最佳批处理大小的参数。_BatchSizeEstimator
对象 保留有关批处理大小和已发出捆绑的时钟时间的数据点,然后尝试使用线性回归求解当前理想的批处理大小。BatchElements
的 文档字符串 详细解释了这一点以及此处处理的参数。对于大多数用户来说,最重要的是最小和最大批处理大小(将它们设置为相同的值会产生固定批处理大小而不是动态批处理大小)和元素大小函数。后者允许用户定义一个 lambda,它可以单独对每个输入进行大小调整,而不是简单地将每个元素计为大小 1。如果输入在传递到模型时具有明显不同的处理时间,这将非常有用;例如,将文本正文发送到模型可以按字符长度进行大小调整。
跨捆绑的批处理
在捆绑内批处理元素有一个主要缺点:如果捆绑很小(甚至单个元素),则操作实际上是无操作的。这在流管道中尤其如此,因为我们预计捆绑仅包含 1 到 2 个元素。为了在小捆绑的情况下启用批处理,我们必须能够跨捆绑批处理元素。为了深入了解其工作原理的技术解释,跨捆绑批处理(也称为有状态批处理)的设计文档概述了代码;然而,高级解释是,我们利用 Beam 的 状态 API 跨捆绑存储元素,在我们达到所需捆绑大小或将捆绑缓冲了某个最大时间量后,在下游发出这些元素。
要启用有状态批处理,请将 max_batch_duration_secs
参数 传递给 Apache Beam 版本 2.52.0 或更高版本中的 BatchElements
。这将对输入元素进行键控,以确保状态 API 可以使用,然后使用有状态批处理函数。当批次达到当前目标批次大小或批次已缓冲的时间量大于或等于 max_batch_duration_secs
时,将发出批次。需要注意的是,基于时间的行为是使用 计时器 API 实现的,并且是尽力而为的,因此在发出之前,批次实际保持的时间可能大于设置的最大值。
跨捆绑批处理有其自身的缺点
- 缓冲批次确实会在管道中引入瓶颈,与在捆绑内批处理相比,吞吐量会降低
- 允许状态 API 正常工作的键控步骤会在工作程序之间对数据进行洗牌,如果正在处理的数据量很大,这可能会在工作程序之间产生大量网络流量
参数调整注意事项
除了用于在捆绑内批处理的所有调整参数之外,调整 max_batch_duration_secs
参数也会对转换的吞吐量产生重大影响。选择最大批处理持续时间时进行的权衡是在吞吐量和一致的批处理大小之间进行权衡。较大的值通常会降低吞吐量,因为不完整的批次将在发送到推理调用本身之前保持更长时间;然而,这将更一致地产生完整的批次。另一方面,较小的值通常会提高吞吐量,但可能会导致批处理大小较小。您优先考虑哪一项取决于您的用例。
需要注意的是,这些趋势并不总是成立!在流式处理环境中,元素被摄入管道的频率存在相当大的差异。你可能能够以较低的最大批处理时长达到目标批处理大小,但这与以较长的最大批处理时长始终达到该批处理大小是不同的。最好将这些描述的权衡视为平均情况。
最后更新于 2024/10/31
您找到您要找的所有内容了吗?
所有内容都实用且清晰吗?您想更改任何内容吗?请告诉我们!