Apache Beam Typescript SDK
Apache Beam 的 Typescript SDK 提供了一个简单而强大的 API,用于构建批处理和流式数据处理管道。
使用 Typescript SDK 入门
使用 Beam Typescript SDK 快速入门 开始使用,设置您的开发环境,获取 Beam SDK for Typescript 以及运行示例管道。然后,阅读 Beam 编程指南,了解适用于 Beam 中所有 SDK 的基本概念。
概述
我们通常尝试以 TypeScript 惯用的方式应用 Beam API 中的概念。此外,我们还从传统的 SDK 中做出了一些显著的改变。
我们采用“关系基础”方法,其中 带模式的数据 是与数据交互的主要方式,我们通常避免使用需要键值转换的转换,而采用更灵活的方法来命名字段或表达式。例如,我们更喜欢更灵活的 GroupBy PTransform,而不是传统的 GroupByKey。JavaScript 的原生 Object 用作行类型。
作为模式优先的一部分,我们还淡化了 Coders 作为 SDK 中的一级概念,将其降级为用于互操作的更高级功能。虽然我们可以从单个元素推断出模式,但仍然需要确定是否以及如何利用类型系统和/或函数自省来定期在构造时推断模式。当我们没有足够的类型信息时,将使用使用 BSON 编码的备用编码器。
我们在 PCollection 对象中添加了其他方法,特别是
map
和flatmap
, 而不是只允许 apply。此外,apply
可以接受函数参数(PCollection) => ...
以及 PTransform 子类,这将把这个可调用函数视为 PTransform 的扩展。另一方面,我们从 API 中删除了 存在问题的 Pipeline 对象,而是提供一个
Root
PValue,在该 PValue 上构建管道,并调用 Runner 上的 run()。我们提供了一个不易出错的Runner.run
,它只有在管道完全完成时才结束,以及Runner.runAsync
,它返回正在运行管道的句柄。与其引入 PCollectionTuple、PCollectionList 等,不如让 PValue 实际上是一个 包含 PValue 值的数组或对象,转换可以消耗或生成这些值。这些值通过用
P
运算符包装它们来应用,例如P([pc1, pc2, pc3]).apply(new Flatten())
。与 Python 一样,
flatMap
和ParDo.process
通过从生成器中生成多个元素来返回多个元素,而不是调用传递给它的回调。目前有一个操作可以根据元素的属性将 PCollection 拆分为多个 PCollection,我们可能会考虑使用回调来进行侧输出。map
、flatMap
和ParDo.process
方法接受一个额外的(可选)上下文参数,它类似于 Python 中使用的关键字参数。这些是 javascript 对象,其成员可能是常量(按原样传递)或特殊的 DoFnParam 对象,这些对象提供获取器来获取运行时特定于元素的信息(例如当前时间戳、窗口或侧输入)。与其将多输出复杂性引入 map/do 操作本身,不如通过使用新的
Split
原语来生成多个输出,该原语接收PCollection<{a?: AType, b: BType, ... }>
并生成一个对象{a: PCollection<AType>, b: PCollection<BType>, ...}
。JavaScript 支持(并鼓励)异步编程模型,许多库需要使用 async/await 范式。由于没有办法(设计上)从异步风格回到同步风格,因此在设计 API 时需要考虑这一点。我们目前提供
PValue.apply(...)
的异步变体(除了同步变体,因为它们更容易连接)以及使Runner.run
异步。待定,对所有用户回调也这样做。
可以在 wordcount.ts 中找到示例管道,并在 beam 编程指南 中找到更多文档。
管道 I/O
请参见 Beam 提供的 I/O 转换 页面,了解当前可用的 I/O 转换列表。
支持的功能
Typescript SDK 仍在开发中,但已经支持 Beam 模型目前支持的许多功能,但并非全部功能,包括批处理和流式处理。它还对 跨语言转换 提供广泛的支持,可以利用这些转换来使用 Typescript 管道的更高级功能。
序列化
由于 Beam 设计为在分布式环境中运行,因此所有函数和数据都需要可序列化。
默认情况下,数据使用 BSON 编码进行序列化,但这可以通过将 withRowCoder 或 withCoderInternal 转换应用于 PCollection 来进行自定义。
在转换中使用的函数(例如 map
),包括闭包及其捕获的数据,通过 ts-serialize-closures 进行序列化。虽然这在大多数情况下都能很好地处理,但它仍然有一些限制,它仍然可以捕获,并且在对引用对象的传递闭包进行遍历时,它可能会捕获那些更适合导入而不是序列化的对象。为了避免这些限制,可以显式地使用 requireForSerialization 函数注册引用,如下所示。
// in module my_package/module_to_be_required
import { requireForSerialization } from "apache-beam/serialization";
// define or import various objects_to_register here
requireForSerialization(
"my_package/module_to_be_required", { objects_to_register });
入门项目有一个这样的 示例。