PTransform 样式指南
编写新的可重用 PTransform 的样式指南。
语言中立的注意事项
一致性
与现有技术保持一致
- 请阅读贡献指南。
- 如果在某些 SDK 中已经存在类似的转换,请使您的转换的 API 与之相似,这样用户对其中一个的经验就可以转移到另一个。这适用于相同语言 SDK 和不同语言 SDK 中的转换。例外情况: 由于在制定本指南之前开发,明显违反当前样式指南的现有转换。在这种情况下,样式指南优先于与现有转换的一致性。
- 当不存在类似的转换时,请保持您选择的语言(例如 Java 或 Python)的惯用法。
- 在提出新的转换时,如果可能,请使用设计文档模板。
公开 PTransform 与其他事物的区别
因此,您想开发一个人们将在 Beam 管道中使用的库 - 与第三方系统的连接器、机器学习算法等。您应该如何公开它?
执行
- 将库完成的每个主要数据并行任务公开为复合
PTransform
。这允许转换的结构对使用它的代码透明地演变:例如,最初是ParDo
的东西,随着时间的推移可以成为更复杂的转换。 - 将转换代码中大型、非平凡、可重用、顺序的代码段公开为常规函数或类库,其他人可能希望以您未预料到的方式重用这些代码段。转换应该简单地将这些逻辑连接在一起。作为附带好处,您可以独立地对这些函数和类进行单元测试。示例: 在开发解析自定义数据格式文件的转换时,将格式解析器公开为库;同样,对于实现复杂机器学习算法的转换也是如此等等。
- 在某些情况下,这可能包括 Beam 特定类,例如
CombineFn
或非平凡的DoFn
(那些不仅仅是单个@ProcessElement
方法的)。一般来说:如果您预计完整的打包PTransform
可能不足以满足用户的需求,并且用户可能希望重用更低级的基元,那么就公开这些基元。
不要
- 不要公开转换的内部结构方式。例如:您的库的公共 API 表面通常(除上面最后一项外)不应公开
DoFn
、具体Source
或Sink
类等,以避免向用户展示在应用PTransform
或使用DoFn
/Source
/Sink
之间进行令人困惑的选择。
命名
执行
- 尊重特定语言的命名约定,例如在 Java 和 Python 中将类命名为
PascalCase
,在 Java 中将函数命名为camelCase
,而在 Python 中将函数命名为snake_case
等等。 - 将工厂函数命名为使函数名称是动词,或者引用转换时读起来像动词:例如
MongoDbIO.read()
、Flatten.iterables()
。 - 在类型化语言中,将
PTransform
类也命名为动词(例如:MongoDbIO.Read
、Flatten.Iterables
)。 - 将用于与存储系统交互的转换族命名为使用单词“IO”:
MongoDbIO
、JdbcIO
。
不要
- 不要在
PTransform
类名中使用单词transform
、source
、sink
、reader
、writer
、bound
、unbound
(注意:bounded
和unbounded
在指代PCollection
是有界还是无界时是可以的):这些单词是多余的、令人困惑的、过时的,或者命名了 SDK 中现有的不同概念。
配置
配置内容与输入集合内容的区别
- 进入输入
PCollection
:任何可能存在大量实例的内容(如果可能存在 > 1000 个,则应该放在PCollection
中),或者在管道构造时可能未知的内容。例如:要处理或写入第三方系统的记录;要读取的文件名。例外情况:有时 Beam API 需要在管道构造时知道一些事情 - 例如Bounded
/UnboundedSource
API。如果您绝对必须使用此类 API,则其输入当然只能进入转换配置。 - 进入转换配置:在整个转换过程中保持不变的内容(包括
ValueProvider
),并且不依赖于转换输入PCollection
的内容。例如:数据库查询或连接字符串;凭据;用户指定的回调;调整参数。将参数放入转换配置的一个优点是,它可以在管道构造时进行验证。
公开哪些参数
执行
- 公开计算输出所需的参数。
不要
- 不要公开调整旋钮,例如批次大小、连接池大小,除非无法自动提供或计算足够好的值(即,除非你能想象一个合理的人报告有关缺少此旋钮的错误)。
- 在开发连接到具有多个参数的库的连接器时,不要镜像底层库的每个参数 - 如果需要,请重用底层库的配置类,并让用户提供整个实例。示例:
JdbcIO
。例外情况 1: 如果底层库的某些参数与 Beam 语义非平凡地交互,那么公开这些参数。例如,在开发连接到具有发布者“传递保证”参数的发布/订阅系统的连接器时,公开该参数,但禁止与 Beam 模型(最多一次和恰好一次)不兼容的值。例外情况 2: 如果底层库的配置类使用起来很麻烦 - 例如没有声明稳定的 API、公开有问题的传递依赖关系或不遵守语义版本控制 - 在这种情况下,最好将其包装起来,并向转换的用户公开更清晰、更稳定的 API。
错误处理
转换配置错误
尽早检测错误。可以在以下阶段检测到错误
- (在编译语言中)编译用户管道的源代码
- 构造或设置转换
- 在管道中应用转换
- 运行管道
例如
- 在类型化语言中,通过使转换的 API 具有强类型来利用编译时错误检查
- 强类型配置:例如在 Java 中,作为 URL 的参数应该使用
URL
类,而不是String
类。 - 强类型输入和输出:例如,写入 Mongo DB 的转换应该使用
PCollection<Document>
而不是PCollection<String>
(假设可以为Document
提供Coder
)。
- 强类型配置:例如在 Java 中,作为 URL 的参数应该使用
- 在 setter 方法中检测单个参数的无效值。
- 在转换的 validate 方法中检测参数的无效组合。
运行时错误和数据一致性
优先考虑数据一致性高于一切。不要掩盖数据丢失或损坏。如果无法防止数据丢失,则失败。
执行
- 在
DoFn
中,如果操作很可能在重试时成功,则重试瞬态错误。在尽可能窄的范围内执行此类重试,以最大限度地减少重试工作量(即,理想情况下在 RPC 库本身级别,或在直接将失败的 RPC 发送到第三方系统级别)。否则,让运行器为您以适当的粒度级别重试工作(不同的运行器可能具有不同的重试行为,但大多数运行器都执行一些重试)。 - 如果转换具有副作用,请努力使其幂等(即可以安全地多次应用)。由于重试,副作用可能会被多次执行,甚至可能并行执行。
- 如果转换可能存在无法处理的(永久失败的)记录,并且您希望管道尽管如此继续执行
- 如果可以安全地忽略错误记录,请在指标中统计错误记录。确保转换的文档中提到了此聚合器。请注意,在执行过程中,无法从管道内部以编程方式访问聚合器值。
- 如果错误记录可能需要用户手动检查,请将它们发送到仅包含这些记录的输出中。
- 或者,使用一个(默认值为零)阈值,超过该阈值后元素失败将变为捆绑失败(将转换构建为统计元素总数和失败元素的数量,比较它们,如果失败超过阈值,则失败)。
- 如果用户请求比您能提供的更高的数据一致性保证,则失败。例如:如果用户从 MQTT 连接器请求 QoS 2(恰好一次传递),则连接器应该失败,因为 Beam 运行器可能会重试写入连接器,因此无法实现恰好一次传递。
不要
- 如果您无法处理失败,则根本不要捕获它。例外情况:*如果您能够提供原始错误没有的宝贵上下文,则捕获错误、记录消息并重新抛出它可能很有价值。
- 永远、永远、永远不要这样做:
catch(...) { log an error; return null or false or otherwise ignore; }
经验法则:如果一个捆绑没有失败,则其输出必须是正确和完整的。对于用户而言,记录错误但成功执行的转换是沉默的数据丢失。
性能
许多运行器以优化 ParDo
链的方式来提高性能,如果 ParDo
针对每个输入元素发出少量到中等数量的元素,或者每个元素的处理成本相对较低(例如 Dataflow 的“融合”),但如果违反了这些假设,则会限制并行化。在这种情况下,您可能需要“融合中断”(Reshuffle.of()
)来提高处理 ParDo
输出 PCollection
的并行化能力。
- 如果转换包含一个
ParDo
,该ParDo
对于每个输入元素输出潜在的大量元素,请在此ParDo
之后应用融合断点,以确保下游转换可以并行处理其输出。 - 如果转换包含一个
ParDo
,该ParDo
处理一个元素需要很长时间,请在此ParDo
之前插入融合断点,以确保所有或大多数元素可以并行处理,而无论其输入PCollection
是如何生成的。
文档
记录如何配置转换(提供代码示例),以及它对输入或输出的预期保证,考虑到 Beam 模型。例如:
- 此转换的输入和输出集合是有界的还是无界的,或者它可以与两者一起使用吗?
- 如果转换将数据写入第三方系统,它是否保证数据至少写入一次?最多写入一次?正好写入一次?(如果运行程序由于重试或推测性执行(又称备份)而多次执行捆绑包,它如何在情况下实现正好写入一次?)
- 如果转换从第三方系统读取数据,读取的最大潜在并行度是多少?例如,如果转换按顺序读取数据(例如执行单个 SQL 查询),文档应提及这一点。
- 如果转换在处理过程中查询外部系统(例如将
PCollection
与外部键值存储中的信息连接),查询数据的最新性保证是什么?例如,它是在转换开始时全部加载,还是按元素查询(在这种情况下,如果一个元素的数据在转换运行时发生变化,会怎么样)? - 如果输入
PCollection
中项目的到达与输出PCollection
中发出输出之间存在非平凡的关系,这种关系是什么?(例如,如果转换在内部执行窗口化、触发、分组,或者使用状态或计时器 API)
日志记录
预测转换用户可能遇到的异常情况。记录他们认为足以进行调试的信息,但要限制日志记录量。以下是一些适用于所有程序的建议,但在数据量巨大且执行分布式的情况下尤其重要。
执行
- 处理来自第三方系统的错误时,记录完整的错误,以及第三方系统提供的任何错误详细信息,并包含转换已知的任何其他上下文。这使用户能够根据消息中提供的信息采取行动。处理异常并重新抛出自己的异常时,将原始异常包装在其中(某些语言提供更高级的功能,例如 Java 的“抑制异常”)。绝不要静默丢弃有关错误的可用信息。
- 执行罕见(非按元素)且缓慢的操作(例如扩展大型文件模式或启动导入/导出作业)时,记录操作开始和结束的时间。如果操作有标识符,请记录标识符,以便用户可以稍后调试时查找操作。
- 计算对后续处理的正确性或性能至关重要的低容量内容时,记录输入和输出,以便用户在调试过程中可以对其进行健全性检查或手动复制异常结果。例如,扩展文件模式为文件时,记录文件模式是什么以及它被拆分为多少个部分;执行查询时,记录查询并记录它产生了多少结果。
不要
- 不要在
INFO
级别记录每个元素或每个捆绑包。DEBUG
/TRACE
可能没问题,因为这些级别默认情况下是被禁用的。 - 避免记录可能包含敏感信息的数据有效负载,或在记录之前对其进行消毒(例如用户数据、凭据等)。
测试
数据处理很棘手,充满了特殊情况,而且难以调试,因为管道运行时间很长,很难检查输出是否正确,你无法附加调试器,而且由于数据量大,你经常无法像你希望的那样记录很多东西。因此,测试尤其重要。
测试转换的运行时行为
- 使用
TestPipeline
和PAssert
对转换的整体语义进行单元测试。从针对直接运行程序进行测试开始。对PCollection
内容的断言应该是严格的:例如,如果预期从数据库读取数字 1 到 10,则断言输出PCollection
中不仅仅有 10 个元素,或者每个元素都在范围 [1, 10] 内 - 而是断言每个数字 1 到 10 恰好出现一次。 - 确定转换中难以使用
TestPipeline
可靠地模拟的特殊情况易发的非平凡的顺序逻辑,将此逻辑提取到可单元测试的函数中,并对它们进行单元测试。常见的特殊情况包括- 处理空捆绑包的
DoFn
- 处理极其大型捆绑包的
DoFn
(内容不适合内存,包括具有大量值的“热键”) - 第三方 API 失败
- 第三方 API 提供了非常不准确的信息
- 在失败情况下泄漏
Closeable
/AutoCloseable
资源 - 开发源时常见的特殊情况:
BoundedSource.split
中的复杂算术(例如拆分键或偏移量范围)、对空数据源或包含一些空组件的复合数据源进行迭代。
- 处理空捆绑包的
- 模拟与第三方系统的交互,或者更好的是,使用“假”实现(如果可用)。确保模拟的交互代表这些系统的实际行为的所有有趣情况。
- 要对
DoFn
、CombineFn
和BoundedSource
进行单元测试,请考虑分别使用DoFnTester
、CombineFnTester
和SourceTestUtils
,它们可以通过非平凡的方式来练习代码以找出潜在的错误。 - 对于在无界集合上工作的转换,使用
TestStream
测试它们在存在延迟或乱序数据时的行为。 - 测试必须始终通过,包括在不利的、CPU 或网络受限的环境中(持续集成服务器)。永远不要将依赖于时间的代码(例如睡眠)放入测试中。经验表明,无论睡眠多长时间都不够 - 代码可以暂停超过几秒钟。
- 有关测试代码组织的详细说明,请参阅Beam 测试指南.
测试转换的构造和验证
构造和验证转换的代码通常很简单,而且大部分是样板代码。但是,其中微小的错误或错别字可能会造成严重的后果(例如忽略用户设置的属性),因此也需要对其进行测试。然而,过多的简单测试可能难以维护,并给人一种转换已得到充分测试的错误印象。
执行
- 测试非平凡的验证代码,其中缺少/错误/不提供信息的验证可能会导致严重的问题:数据丢失、违反直觉的行为、属性值被静默忽略或其他难以调试的错误。为每个非平凡的验证错误类创建一个测试。一些应该测试的验证示例
- 如果属性
withFoo()
和withBar()
不能同时指定,请测试同时指定这两个属性的转换会被拒绝,而不是其中一个属性在运行时被静默忽略。 - 如果已知转换对于特定配置的行为不正确或违反直觉,请测试此配置会被拒绝,而不是在运行时产生错误的结果。例如,转换可能仅对有界集合或全局窗口化集合有效。或者,假设流式系统支持几个级别的“服务质量”,其中一个级别是“正好一次交付”。但是,写入此系统的转换可能无法提供正好一次的交付,因为在发生故障时会进行重试。在这种情况下,测试转换是否禁止指定正好一次的 QoS,而不是在运行时无法提供预期的端到端语义。
- 如果属性
- 测试每个
withFoo()
方法(包括每个重载)是否有效果(不被忽略),使用TestPipeline
和PAssert
创建测试,其中预期的测试结果取决于withFoo()
的值。
不要
- 不要测试成功的验证(例如“验证在转换配置正确时不会失败”)
- 不要测试简单的验证错误(例如“当属性未设置/为空/为空/为负/……时,验证会失败”)
兼容性
执行
- 通常,遵循语义版本控制的规则。
- 如果转换的 API 尚未稳定,请将其注释为
@Experimental
(Java)或@experimental
(Python)。 - 如果 API 已弃用,请将其注释为
@Deprecated
(Java)或@deprecated
(Python)。 - 注意转换 API 公开的第三方类的稳定性和版本控制:如果它们不稳定或版本控制不当(不遵守语义版本控制),最好将它们包装在自己的类中。
不要
- 不要静默更改转换的行为,这会导致代码继续编译但会执行与之前记录的行为不同的操作(例如,产生不同的输出或期望不同的输入,当然除非之前的输出不正确)。努力使这种不兼容的行为更改导致编译错误(例如,最好为新行为引入新的转换,并弃用然后删除旧的转换(在新的大版本中),而不是静默更改现有转换的行为),或者至少是运行时错误。
- 如果转换的行为保持不变,并且你只是在更改实现或 API - 不要以会导致用户代码停止编译的方式更改转换的 API。
Java 特定注意事项
大多数以下实践的良好示例是JdbcIO
和MongoDbIO
。
API
选择输入和输出 PCollection 的类型
尽可能使用特定于转换性质的类型。人们可以根据需要使用自己的类型进行转换的DoFn
对其进行包装。例如,数据存储连接器应该使用数据存储Entity
类型,MongoDB 连接器应该使用 MongoDocument
类型,而不是 JSON 的字符串表示形式。
有时这不可能(例如 JDBC 不提供与 Beam 兼容的(可以使用 Coder 编码)“JDBC 记录”数据类型) - 然后让用户提供一个函数来在特定于转换的类型和与 Beam 兼容的类型之间进行转换(例如,请参阅JdbcIO
和MongoDbGridFSIO
)。
当转换在逻辑上应该返回一个尚未存在 Java 类的复合类型时,创建一个具有良好命名的字段的新 POJO 类。不要使用泛型元组类或KV
(除非字段合法地是键和值)。
具有多个输出集合的转换
如果转换需要返回多个集合,它应该是一个PTransform<..., PCollectionTuple>
,并为每个集合公开方法getBlahTag()
。
例如,如果你想要返回一个PCollection<Foo>
和一个PCollection<Bar>
,请公开TupleTag<Foo> getFooTag()
和TupleTag<Bar> getBarTag()
。
例如
public class MyTransform extends PTransform<..., PCollectionTuple> {
private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
...
PCollectionTuple expand(... input) {
...
PCollection<Moo> moo = ...;
PCollection<Blah> blah = ...;
return PCollectionTuple.of(mooTag, moo)
.and(blahTag, blah);
}
public TupleTag<Moo> getMooTag() {
return mooTag;
}
public TupleTag<Blah> getBlahTag() {
return blahTag;
}
...
}
配置的流畅构建器
使转换类不可变,并使用方法生成修改后的不可变对象。使用AutoValue。Autovalue 可以提供一个 Builder 帮助类。使用@Nullable
标记没有默认值或其默认值为 null 的类类型的参数,但基本类型(例如 int)除外。
@AutoValue
public abstract static class MyTransform extends PTransform<...> {
int getMoo();
@Nullable abstract String getBlah();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMoo(int moo);
abstract Builder setBlah(String blah);
abstract MyTransform build();
}
...
}
工厂方法
提供一个单一的无参数静态工厂方法,要么在封闭类中(请参阅“打包一组转换”),要么在转换类本身中。
public class Thumbs {
public static Twiddle twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
public abstract static class Twiddle extends PTransform<...> { ... }
}
// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
public static TwiddleThumbs create() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
...
}
例外情况:当转换只有一个压倒性的最重要的参数时,然后调用工厂方法of
并将参数放入工厂方法的参数中:ParDo.of(DoFn).withAllowedLateness()
。
用于设置参数的流畅构建器方法
将它们称为withBlah()
。所有构建器方法必须返回完全相同的类型;如果是参数化的(泛型)类型,则使用类型参数的相同值。
将withBlah()
方法视为一组无序的关键字参数 - 结果不能取决于调用withFoo()
和withBar()
的顺序(例如,withBar()
不能读取 foo 的当前值)。
每个 `withBlah` 方法的文档影响:何时使用此方法,允许哪些值,默认值是什么,更改值的影响是什么。
/**
* Returns a new {@link TwiddleThumbs} transform with moo set
* to the given value.
*
* <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
*
* <p>Higher values generally improve throughput, but increase chance
* of spontaneous combustion.
*/
public Twiddle withMoo(int moo) {
checkArgument(moo >= 0 && moo < 100,
"Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
+ "Valid values are 0 (inclusive) to 100 (exclusive)",
moo);
return toBuilder().setMoo(moo).build();
}
参数的默认值
在工厂方法中指定它们(工厂方法返回一个包含默认值的**对象**)。
将多个参数打包到可重用的对象中
如果转换的几个参数在逻辑上紧密耦合,有时将它们封装到容器对象中是有意义的。对这个容器对象使用相同的准则(使其不可变,使用带有构建器的 AutoValue,记录 `withBlah()` 方法等)。例如,参见 JdbcIO.DataSourceConfiguration.
带有类型参数的转换
所有类型参数都应该在工厂方法上显式指定。构建器方法(`withBlah()`)不应该改变类型。
public class Thumbs {
public static Twiddle<T> twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
}
@AutoValue
public abstract static class Twiddle<T>
extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
…
@Nullable abstract Bar<T> getBar();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
…
abstract Builder<T> setBar(Bar<T> bar);
abstract Twiddle<T> build();
}
…
}
}
// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle() … );
例外:当转换只有一个最重要的参数,并且该参数取决于类型 T 时,最好将其直接放入工厂方法中:例如 `Combine.globally(SerializableFunction<Iterable<V>,V>`。这将改善 Java 的类型推断,并允许用户不必显式指定类型参数。
当转换有多个类型参数,或者参数的含义不明显时,将类型参数命名为 `SomethingT`,例如:实现分类算法并为每个输入元素分配标签的 PTransform 可以被类型化为 `Classify<InputT, LabelT>`。
注入用户指定的行为
如果转换的某个行为方面需要由用户的代码进行定制,请做出以下决定
执行
- 如果可能,只使用 PTransform 组成作为可扩展性机制 - 也就是说,如果用户在他们的管道中应用转换并将其与另一个 `PTransform` 组成可以实现相同的效果,那么转换本身不应该可扩展。例如,将 JSON 对象写入第三方系统的转换应该接受一个 `PCollection<JsonObject>`(假设可以为 `JsonObject` 提供 `Coder`),而不是接受一个通用的 `PCollection<T>` 和一个 `ProcessFunction<T, JsonObject>`(应该修复的反例:`TextIO`)。
- 如果转换内部需要由用户代码进行扩展,请将用户代码作为 `ProcessFunction` 传递,或定义您自己的可序列化函数式类型(理想情况下是单方法,以便与 Java 8 lambda 互操作)。因为 Java 会擦除 lambda 的类型,所以即使用户提供了原始类型的 `ProcessFunction`,也应该确保有足够的类型信息。参见 `MapElements` 和 `FlatMapElements`,它们是如何使用 `ProcessFunction` 和 `InferableFunction` 来共同提供对 lambda 和具有类型信息的具体子类的良好支持的示例。
不要
- 不要使用继承来实现可扩展性:用户不应该子类化 `PTransform` 类。
打包转换族
在开发一个高度相关的转换系列时(例如,以不同的方式与同一个系统交互,或提供同一个高级任务的不同实现),使用一个顶级类作为命名空间,用多个工厂方法返回对应于每个单独用例的转换。
容器类必须有一个私有构造函数,因此它不能被直接实例化。
在 `FooIO` 级别记录通用内容,并单独记录每个工厂方法。
/** Transforms for clustering data. */
public class Cluster {
// Force use of static factory methods.
private Cluster() {}
/** Returns a new {@link UsingKMeans} transform. */
public static UsingKMeans usingKMeans() { ... }
public static Hierarchically hierarchically() { ... }
/** Clusters data using the K-Means algorithm. */
public static class UsingKMeans extends PTransform<...> { ... }
public static class Hierarchically extends PTransform<...> { ... }
}
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static Read read() { ... }
...
public static class Read extends PTransform<...> { ... }
public static class Write extends PTransform<...> { ... }
public static class Delete extends PTransform<...> { ... }
public static class Mutate extends PTransform<...> { ... }
}
在支持多个具有不兼容 API 的版本时,也将版本作为命名空间类,并将不同 API 版本的实现放在不同的文件中。
// FooIO.java
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static FooV1 v1() { return new FooV1(); }
public static FooV2 v2() { return new FooV2(); }
}
// FooV1.java
public class FooV1 {
// Force use of static factory methods outside the package.
FooV1() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
// FooV2.java
public static class FooV2 {
// Force use of static factory methods outside the package.
FooV2() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
行为
不可变性
- 转换类必须是不可变的:所有变量必须是私有的 final 且本身是不可变的(例如,如果是列表,则必须是 `ImmutableList`)。
- 所有 `PCollection` 的元素必须是不可变的。
序列化
`DoFn`、`PTransform`、`CombineFn` 和其他实例将被序列化。将序列化数据的数量降至最低:将不想序列化的字段标记为 `transient`。尽可能使类为 `static`(这样实例就不会捕获和序列化封闭类实例)。注意:在某些情况下,这意味着你不能使用匿名类。
验证
- 使用 `checkArgument()` 在 `withBlah()` 方法中验证单个参数。错误消息应提及参数的名称、实际值和有效值的范围。
- 在 `PTransform` 的 `expand()` 方法中验证参数组合和缺少的必需参数。
- 在 `PTransform` 的 `validate(PipelineOptions)` 方法中验证 `PTransform` 从 `PipelineOptions` 中获取的参数。当管道已经完全构建/扩展并即将使用特定的 `PipelineOptions` 运行时,这些验证将被执行。大多数 `PTransform` 不使用 `PipelineOptions`,因此不需要 `validate()` 方法 - 相反,它们应该通过上述两种方法执行验证。
@AutoValue
public abstract class TwiddleThumbs
extends PTransform<PCollection<Foo>, PCollection<Bar>> {
abstract int getMoo();
abstract String getBoo();
...
// Validating individual parameters
public TwiddleThumbs withMoo(int moo) {
checkArgument(
moo >= 0 && moo < 100,
"Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
moo);
return toBuilder().setMoo(moo).build();
}
public TwiddleThumbs withBoo(String boo) {
checkArgument(boo != null, "Boo can not be null");
checkArgument(!boo.isEmpty(), "Boo can not be empty");
return toBuilder().setBoo(boo).build();
}
@Override
public void validate(PipelineOptions options) {
int woo = options.as(TwiddleThumbsOptions.class).getWoo();
checkArgument(
woo > getMoo(),
"Woo (%s) must be smaller than moo (%s)",
woo, getMoo());
}
@Override
public PCollection<Bar> expand(PCollection<Foo> input) {
// Validating that a required parameter is present
checkArgument(getBoo() != null, "Must specify boo");
// Validating a combination of parameters
checkArgument(
getMoo() == 0 || getBoo() == null,
"Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
getMoo(), getBoo());
...
}
}
编码器
`Coder` 是 Beam 运行器在必要时物化中间数据或在工作器之间传输中间数据的一种方式。 `Coder` 不应该用作解析或写入二进制格式的通用 API,因为 `Coder` 的特定二进制编码旨在成为其私有实现细节。
为类型提供默认编码器
为所有新数据类型提供默认 `Coder`。使用 `@DefaultCoder` 注解或 `CoderProviderRegistrar` 类,这些类用 `@AutoService` 进行注解:参见 SDK 中这些类的用法以获取示例。如果性能不重要,您可以使用 `SerializableCoder` 或 `AvroCoder`。否则,开发一个高效的自定义编码器(为具体类型子类化 `AtomicCoder`,为泛型类型子类化 `StructuredCoder`)。
在输出集合上设置编码器
你的 `PTransform` 创建的所有 `PCollection`(包括输出集合和中间集合)都必须在其上设置 `Coder`:用户永远不需要调用 `setCoder()` 来“修复”你的 `PTransform` 生成的 `PCollection` 上的编码器(实际上,Beam 意图最终弃用 `setCoder`)。在某些情况下,编码器推断足以实现这一点;在其他情况下,你的转换需要显式调用其集合上的 `setCoder`。
如果集合是具体类型的,该类型通常具有相应的编码器。使用最有效的特定编码器(例如,字符串使用 `StringUtf8Coder.of()`,字节数组使用 `ByteArrayCoder.of()` 等),而不是像 `SerializableCoder` 这样的通用编码器。
如果集合的类型涉及泛型类型变量,情况会更加复杂
- 如果它与转换的输入类型一致,或者是对它的简单包装,则可以重用输入 `PCollection` 的编码器,通过 `input.getCoder()` 获取。
- 尝试通过 `input.getPipeline().getCoderRegistry().getCoder(TypeDescriptor)` 推断编码器。使用 `TypeDescriptors` 中的工具来获取泛型类型的 `TypeDescriptor`。有关此方法的示例,请参见 `AvroIO.parseGenericRecords()` 的实现。但是,泛型类型的编码器推断是尽力而为的,在某些情况下可能会由于 Java 类型擦除而失败。
- 始终使用户能够显式地为相关类型变量指定 `Coder`,作为你的 `PTransform` 的配置参数。(例如 `AvroIO.<T>parseGenericRecords().withCoder(Coder<T>)`)。如果编码器未被显式指定,则回退到推断。
最后更新于 2024/10/31
你找到了你想要找的所有东西吗?
所有内容都实用且清晰吗?你想要更改任何内容吗?请告诉我们!