PTransform 样式指南

编写新的可重用 PTransform 的样式指南。

语言中立的注意事项

一致性

与现有技术保持一致

公开 PTransform 与其他事物的区别

因此,您想开发一个人们将在 Beam 管道中使用的库 - 与第三方系统的连接器、机器学习算法等。您应该如何公开它?

执行

不要

命名

执行

不要

配置

配置内容与输入集合内容的区别

公开哪些参数

执行

不要

错误处理

转换配置错误

尽早检测错误。可以在以下阶段检测到错误

例如

运行时错误和数据一致性

优先考虑数据一致性高于一切。不要掩盖数据丢失或损坏。如果无法防止数据丢失,则失败。

执行

不要

性能

许多运行器以优化 ParDo 链的方式来提高性能,如果 ParDo 针对每个输入元素发出少量到中等数量的元素,或者每个元素的处理成本相对较低(例如 Dataflow 的“融合”),但如果违反了这些假设,则会限制并行化。在这种情况下,您可能需要“融合中断”(Reshuffle.of())来提高处理 ParDo 输出 PCollection 的并行化能力。

文档

记录如何配置转换(提供代码示例),以及它对输入或输出的预期保证,考虑到 Beam 模型。例如:

日志记录

预测转换用户可能遇到的异常情况。记录他们认为足以进行调试的信息,但要限制日志记录量。以下是一些适用于所有程序的建议,但在数据量巨大且执行分布式的情况下尤其重要。

执行

不要

测试

数据处理很棘手,充满了特殊情况,而且难以调试,因为管道运行时间很长,很难检查输出是否正确,你无法附加调试器,而且由于数据量大,你经常无法像你希望的那样记录很多东西。因此,测试尤其重要。

测试转换的运行时行为

测试转换的构造和验证

构造和验证转换的代码通常很简单,而且大部分是样板代码。但是,其中微小的错误或错别字可能会造成严重的后果(例如忽略用户设置的属性),因此也需要对其进行测试。然而,过多的简单测试可能难以维护,并给人一种转换已得到充分测试的错误印象。

执行

不要

兼容性

执行

不要

Java 特定注意事项

大多数以下实践的良好示例是JdbcIOMongoDbIO

API

选择输入和输出 PCollection 的类型

尽可能使用特定于转换性质的类型。人们可以根据需要使用自己的类型进行转换的DoFn对其进行包装。例如,数据存储连接器应该使用数据存储Entity类型,MongoDB 连接器应该使用 MongoDocument类型,而不是 JSON 的字符串表示形式。

有时这不可能(例如 JDBC 不提供与 Beam 兼容的(可以使用 Coder 编码)“JDBC 记录”数据类型) - 然后让用户提供一个函数来在特定于转换的类型和与 Beam 兼容的类型之间进行转换(例如,请参阅JdbcIOMongoDbGridFSIO)。

当转换在逻辑上应该返回一个尚未存在 Java 类的复合类型时,创建一个具有良好命名的字段的新 POJO 类。不要使用泛型元组类或KV(除非字段合法地是键和值)。

具有多个输出集合的转换

如果转换需要返回多个集合,它应该是一个PTransform<..., PCollectionTuple>,并为每个集合公开方法getBlahTag()

例如,如果你想要返回一个PCollection<Foo>和一个PCollection<Bar>,请公开TupleTag<Foo> getFooTag()TupleTag<Bar> getBarTag()

例如

public class MyTransform extends PTransform<..., PCollectionTuple> {
  private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
  private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
  ...
  PCollectionTuple expand(... input) {
    ...
    PCollection<Moo> moo = ...;
    PCollection<Blah> blah = ...;
    return PCollectionTuple.of(mooTag, moo)
                           .and(blahTag, blah);
  }

  public TupleTag<Moo> getMooTag() {
    return mooTag;
  }

  public TupleTag<Blah> getBlahTag() {
    return blahTag;
  }
  ...
}

配置的流畅构建器

使转换类不可变,并使用方法生成修改后的不可变对象。使用AutoValue。Autovalue 可以提供一个 Builder 帮助类。使用@Nullable标记没有默认值或其默认值为 null 的类类型的参数,但基本类型(例如 int)除外。

@AutoValue
public abstract static class MyTransform extends PTransform<...> {
  int getMoo();
  @Nullable abstract String getBlah();

  abstract Builder toBuilder();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMoo(int moo);
    abstract Builder setBlah(String blah);

    abstract MyTransform build();
  }
  ...
}
工厂方法

提供一个单一的无参数静态工厂方法,要么在封闭类中(请参阅“打包一组转换”),要么在转换类本身中。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }

  public abstract static class Twiddle extends PTransform<...> { ... }
}

// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
  public static TwiddleThumbs create() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }
  ...
}

例外情况:当转换只有一个压倒性的最重要的参数时,然后调用工厂方法of并将参数放入工厂方法的参数中:ParDo.of(DoFn).withAllowedLateness()

用于设置参数的流畅构建器方法

将它们称为withBlah()。所有构建器方法必须返回完全相同的类型;如果是参数化的(泛型)类型,则使用类型参数的相同值。

withBlah()方法视为一组无序的关键字参数 - 结果不能取决于调用withFoo()withBar()的顺序(例如,withBar()不能读取 foo 的当前值)。

每个 `withBlah` 方法的文档影响:何时使用此方法,允许哪些值,默认值是什么,更改值的影响是什么。

/**
 * Returns a new {@link TwiddleThumbs} transform with moo set
 * to the given value.
 *
 * <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
 *
 * <p>Higher values generally improve throughput, but increase chance
 * of spontaneous combustion.
 */
public Twiddle withMoo(int moo) {
  checkArgument(moo >= 0 && moo < 100,
      "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
      + "Valid values are 0 (inclusive) to 100 (exclusive)",
      moo);
  return toBuilder().setMoo(moo).build();
}
参数的默认值

在工厂方法中指定它们(工厂方法返回一个包含默认值的**对象**)。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().setMoo(42).build();
  }
  ...
}
将多个参数打包到可重用的对象中

如果转换的几个参数在逻辑上紧密耦合,有时将它们封装到容器对象中是有意义的。对这个容器对象使用相同的准则(使其不可变,使用带有构建器的 AutoValue,记录 `withBlah()` 方法等)。例如,参见 JdbcIO.DataSourceConfiguration.

带有类型参数的转换

所有类型参数都应该在工厂方法上显式指定。构建器方法(`withBlah()`)不应该改变类型。

public class Thumbs {
  public static Twiddle<T> twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
  }

  @AutoValue
  public abstract static class Twiddle<T>
       extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
    
    @Nullable abstract Bar<T> getBar();

    abstract Builder<T> toBuilder();

    @AutoValue.Builder
    abstract static class Builder<T> {
      
      abstract Builder<T> setBar(Bar<T> bar);

      abstract Twiddle<T> build();
    }
    
  }
}

// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle()  );

例外:当转换只有一个最重要的参数,并且该参数取决于类型 T 时,最好将其直接放入工厂方法中:例如 `Combine.globally(SerializableFunction<Iterable<V>,V>`。这将改善 Java 的类型推断,并允许用户不必显式指定类型参数。

当转换有多个类型参数,或者参数的含义不明显时,将类型参数命名为 `SomethingT`,例如:实现分类算法并为每个输入元素分配标签的 PTransform 可以被类型化为 `Classify<InputT, LabelT>`。

注入用户指定的行为

如果转换的某个行为方面需要由用户的代码进行定制,请做出以下决定

执行

不要

打包转换族

在开发一个高度相关的转换系列时(例如,以不同的方式与同一个系统交互,或提供同一个高级任务的不同实现),使用一个顶级类作为命名空间,用多个工厂方法返回对应于每个单独用例的转换。

容器类必须有一个私有构造函数,因此它不能被直接实例化。

在 `FooIO` 级别记录通用内容,并单独记录每个工厂方法。

/** Transforms for clustering data. */
public class Cluster {
  // Force use of static factory methods.
  private Cluster() {}

  /** Returns a new {@link UsingKMeans} transform. */
  public static UsingKMeans usingKMeans() { ... }
  public static Hierarchically hierarchically() { ... }

  /** Clusters data using the K-Means algorithm. */
  public static class UsingKMeans extends PTransform<...> { ... }
  public static class Hierarchically extends PTransform<...> { ... }
}

public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static Read read() { ... }
  ...

  public static class Read extends PTransform<...> { ... }
  public static class Write extends PTransform<...> { ... }
  public static class Delete extends PTransform<...> { ... }
  public static class Mutate extends PTransform<...> { ... }
}

在支持多个具有不兼容 API 的版本时,也将版本作为命名空间类,并将不同 API 版本的实现放在不同的文件中。

// FooIO.java
public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static FooV1 v1() { return new FooV1(); }
  public static FooV2 v2() { return new FooV2(); }
}

// FooV1.java
public class FooV1 {
  // Force use of static factory methods outside the package.
  FooV1() {}
  public static Read read() { ... }
  public static class Read extends PTransform<...> { ... }
}

// FooV2.java
public static class FooV2 {
  // Force use of static factory methods outside the package.
  FooV2() {}
  public static Read read() { ... }

  public static class Read extends PTransform<...> { ... }
}

行为

不可变性

序列化

`DoFn`、`PTransform`、`CombineFn` 和其他实例将被序列化。将序列化数据的数量降至最低:将不想序列化的字段标记为 `transient`。尽可能使类为 `static`(这样实例就不会捕获和序列化封闭类实例)。注意:在某些情况下,这意味着你不能使用匿名类。

验证

@AutoValue
public abstract class TwiddleThumbs
    extends PTransform<PCollection<Foo>, PCollection<Bar>> {
  abstract int getMoo();
  abstract String getBoo();

  ...
  // Validating individual parameters
  public TwiddleThumbs withMoo(int moo) {
    checkArgument(
        moo >= 0 && moo < 100,
        "Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
        moo);
    return toBuilder().setMoo(moo).build();
  }

  public TwiddleThumbs withBoo(String boo) {
    checkArgument(boo != null, "Boo can not be null");
    checkArgument(!boo.isEmpty(), "Boo can not be empty");
    return toBuilder().setBoo(boo).build();
  }

  @Override
  public void validate(PipelineOptions options) {
    int woo = options.as(TwiddleThumbsOptions.class).getWoo();
    checkArgument(
       woo > getMoo(),
      "Woo (%s) must be smaller than moo (%s)",
      woo, getMoo());
  }

  @Override
  public PCollection<Bar> expand(PCollection<Foo> input) {
    // Validating that a required parameter is present
    checkArgument(getBoo() != null, "Must specify boo");

    // Validating a combination of parameters
    checkArgument(
        getMoo() == 0 || getBoo() == null,
        "Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
        getMoo(), getBoo());

    ...
  }
}

编码器

`Coder` 是 Beam 运行器在必要时物化中间数据或在工作器之间传输中间数据的一种方式。 `Coder` 不应该用作解析或写入二进制格式的通用 API,因为 `Coder` 的特定二进制编码旨在成为其私有实现细节。

为类型提供默认编码器

为所有新数据类型提供默认 `Coder`。使用 `@DefaultCoder` 注解或 `CoderProviderRegistrar` 类,这些类用 `@AutoService` 进行注解:参见 SDK 中这些类的用法以获取示例。如果性能不重要,您可以使用 `SerializableCoder` 或 `AvroCoder`。否则,开发一个高效的自定义编码器(为具体类型子类化 `AtomicCoder`,为泛型类型子类化 `StructuredCoder`)。

在输出集合上设置编码器

你的 `PTransform` 创建的所有 `PCollection`(包括输出集合和中间集合)都必须在其上设置 `Coder`:用户永远不需要调用 `setCoder()` 来“修复”你的 `PTransform` 生成的 `PCollection` 上的编码器(实际上,Beam 意图最终弃用 `setCoder`)。在某些情况下,编码器推断足以实现这一点;在其他情况下,你的转换需要显式调用其集合上的 `setCoder`。

如果集合是具体类型的,该类型通常具有相应的编码器。使用最有效的特定编码器(例如,字符串使用 `StringUtf8Coder.of()`,字节数组使用 `ByteArrayCoder.of()` 等),而不是像 `SerializableCoder` 这样的通用编码器。

如果集合的类型涉及泛型类型变量,情况会更加复杂