Euphoria Java 8 DSL

什么是 Euphoria

易于使用的 Java 8 API,构建在 Beam 的 Java SDK 之上。API 提供了对数据转换的 高级抽象,专注于 Java 8 语言特性(例如 lambda 和流)。它与现有的 Beam SDK 完全互操作,可以来回转换。它允许通过使用(可选的)Kryo 基于编码器、lambda 和高级运算符来快速原型设计,并且可以无缝集成到现有的 Beam Pipelines 中。

Euphoria API 项目始于 2014 年,其明确目标是为 Seznam.cz 的数据基础设施提供主要构建块。在 2015 年,DataFlow 白皮书 启发了最初的作者更进一步,还为流处理和批处理提供统一的 API。该 API 于 2016 年开源,目前仍在积极开发中。由于 Beam 社区的目标非常相似,我们决定将该 API 贡献为 Beam Java SDK 之上的高级 DSL,并将我们的努力与社区分享。

Euphoria DSL 集成仍在进行中,并作为 BEAM-3900 的一部分进行跟踪。

WordCount 示例

让我们从一个小例子开始。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);

// Source of data loaded from Beam IO.
PCollection<String> input =
    pipeline
        .apply(Create.of(textLineByLine))
        .setTypeDescriptor(TypeDescriptor.of(String.class));

// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
    FlatMap.named("TOKENIZER")
        .of(lines)
        .using(
            (String line, Collector<String> context) -> {
              for (String word : Splitter.onPattern("\\s+").split(line)) {
                context.collect(word);
              }
            })
        .output();

// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
    CountByKey.named("COUNT")
        .of(words)
        .keyBy(w -> w)
        .output();

// Format output.
PCollection<String> output =
    MapElements.named("FORMAT")
        .of(counted)
        .using(p -> p.getKey() + ": " + p.getValue())
        .output();

// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
    .apply(TextIO.write()
    .to("counted_words"));

pipeline.run();

Euphoria 指南

Euphoria API 由一组运算符组成,这些运算符允许您根据应用程序需求构建 Pipeline

输入和输出

输入数据可以通过 Beams IO 提供到 PCollection 中,与 Beam 中的方式相同。

PCollection<String> input =
  pipeline
    .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
    .setTypeDescriptor(TypeDescriptor.of(String.class));

添加运算符

Euphoria API 的真正强大之处在于它的 运算符套件。每个运算符都会消耗一个或多个输入,并生成一个输出 PCollection。让我们看一个简单的 MapElements 示例。

PCollection<Integer> input = ...

PCollection<String> mappedElements =
  MapElements
    .named("Int2Str")
    .of(input)
    .using(String::valueOf)
    .output();
该运算符会消耗 input,它会对 input 中的每个元素应用给定的 lambda 表达式 (String::valueOf),并返回映射的 PCollection。在创建运算符时,开发人员会通过一系列步骤进行指导,因此运算符的声明非常直观。要开始构建运算符,只需编写它的名称和 ‘.’(点)。您的 IDE 会提供提示。

构建任何运算符的第一步是通过 named() 方法为其命名。名称会通过系统传播,以后可以在调试时使用。

编码器和类型

Beam 的 Java SDK 要求开发人员为自定义元素类型提供 Coder,以便能够对元素进行物化。Euphoria 允许使用 Kryo 作为一种序列化方式。Kryo 位于 :sdks:java:extensions:kryo 模块中。

//gradle
dependencies {
    compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-kryo</artifactId>
  <version>${beam.version}</version>
</dependency>

您只需要创建 KryoCoderProvider 并将其注册到您的 Pipeline 中。有两种方法可以做到这一点。

在原型设计时,您可能决定不关心编码器,然后创建一个没有任何类注册到 KryoKryoCoderProvider

//Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
KryoCoderProvider.of().registerTo(pipeline);
此类 KryoCoderProvider 将为每个非基本元素类型返回 KryoCoder。这当然会降低性能,因为 Kryo 无法有效地序列化未知类型的实例。但这会加快管道开发速度。此行为默认启用,可以在创建 Pipeline 时通过 KryoOptions 禁用。
PipelineOptions options = PipelineOptionsFactory.create();
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

第二种更注重性能的方法是注册 Kryo 将序列化的所有类型。有时,注册 Kryo 自身的序列化器也是一个好主意。Euphoria 允许您通过实现自己的 KryoRegistrar 并将其用于创建 KryoCoderProvider 来做到这一点。

//Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

KryoCoderProvider.of(
        (kryo) -> { //KryoRegistrar of your uwn
          kryo.register(KryoSerializedElementType.class); //other may follow
        })
    .registerTo(pipeline);
Beam 使用元素的类型解析编码器。当元素类型由 lambda 实现描述时,运行时不会提供类型信息。这是由于类型擦除和 lambda 表达式的动态特性。因此,有一个可选的方法可以在 Operator 构造期间每次引入新类型时提供 TypeDescriptor
PCollection<Integer> input = ...

MapElements
  .named("Int2Str")
  .of(input)
  .using(String::valueOf, TypeDescriptors.strings())
  .output();
当用户没有提供 TypeDescriptors 时,Euphoria 运算符将使用 TypeDescriptor<Object>。因此,如果 KryoOptions 允许,KryoCoderProvider 可能会为每个类型未知的元素返回 KryoCoder<Object>。当使用 .setKryoRegistrationRequired(true) 时,提供 TypeDescriptors 变得强制性。

指标和累加器

有关作业内部情况的统计信息在开发分布式作业时非常有用。Euphoria 将它们称为累加器。可以通过环境 Context 访问它们,Context 可以从 Collector 中获取,只要使用它即可。通常在从运算符预期零到多个输出元素的情况下存在。例如,在 FlatMap 的情况下。

Pipeline pipeline = ...
PCollection<String> dataset = ..

PCollection<String> mapped =
FlatMap
  .named("FlatMap1")
  .of(dataset)
  .using(
    (String value, Collector<String> context) -> {
      context.getCounter("my-counter").increment();
        context.collect(value);
    })
  .output();
MapElements 还允许通过提供 UnaryFunctionEnv 的实现(添加第二个上下文参数)而不是 UnaryFunctor 来访问 Context
Pipeline pipeline = ...
PCollection<String> dataset = ...

PCollection<String> mapped =
  MapElements
    .named("MapThem")
    .of(dataset)
    .using(
      (input, context) -> {
        // use simple counter
        context.getCounter("my-counter").increment();
        return input.toLowerCase();
        })
      .output();
累加器会在后台转换为 Beam 指标,因此可以以相同的方式查看它们。翻译后的指标命名空间设置为运算符的名称。

窗口化

Euphoria 遵循与 Beam Java SDK 相同的 窗口化原则。每个混洗运算符(需要通过网络混洗数据的运算符)都允许您设置它。需要与 Beam 中相同的参数。WindowFnTriggerWindowingStrategy 等。在构建运算符时,用户会被指导设置所有必需参数和几个可选参数,或者根本不设置任何参数。窗口化会通过 Pipeline 向下传播。

PCollection<KV<Integer, Long>> countedElements =
  CountByKey.of(input)
      .keyBy(e -> e)
      .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
      .triggeredBy(DefaultTrigger.of())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardSeconds(5))
      .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
      .withTimestampCombiner(TimestampCombiner.EARLIEST)
      .output();

如何获取 Euphoria

Euphoria 位于 Apache Beam 项目的 dsl-euphoria 分支、beam-sdks-java-extensions-euphoria 模块中。要构建 euphoria 子项目,请调用

./gradlew beam-sdks-java-extensions-euphoria:build

运算符参考

运算符基本上是更高级的数据转换,允许您以简单的方式构建数据处理作业的业务逻辑。所有 Euphoria 运算符都在本节中进行了介绍,包括示例。为了简单起见,没有使用 窗口化 的示例。有关更多详细信息,请参阅 窗口化部分

CountByKey

计算具有相同键的元素数量。要求输入数据集由给定的键提取器 (UnaryFunction) 映射到键,然后对这些键进行计数。输出以 KV<K, Long>K 是键类型)的形式发出,其中每个 KV 都包含键和输入数据集对该键的元素数量。

// suppose input: [1, 2, 4, 1, 1, 3]
PCollection<KV<Integer, Long>> output =
  CountByKey.of(input)
    .keyBy(e -> e)
    .output();
// Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]

Distinct

输出不同的(基于 equals 方法)元素。它接受可选的 UnaryFunction 映射器参数,该参数将元素映射到输出类型。

// suppose input: [1, 2, 3, 3, 2, 1]
Distinct.named("unique-integers-only")
  .of(input)
  .output();
// Output will contain:  1, 2, 3
 
带有映射器的 Distinct
// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
Distinct.named("unique-keys-only")
  .of(keyValueInput)
  .projected(KV::getKey)
  .output();
// Output will contain kvs with keys:  1, 3, 42 with some arbitrary values associated with given keys

Join

表示两个(左和右)数据集在给定键上的内连接,生成一个新的数据集。键是从两个数据集都提取的,因此左边的元素和右边的元素可以具有不同的类型,表示为 LeftTRightT。连接本身是由用户提供的 BinaryFunctor 执行的,该函数消耗来自两个数据集共享相同键的元素。并输出连接的结果 (OutputT)。运算符会发出 KV<K, OutputT> 类型的输出数据集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  Join.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]

LeftJoin

表示在给定键上对两个(左和右)数据集进行左连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftTRightT。连接本身由用户提供的BinaryFunctor执行,它从两个数据集中分别消费一个元素,其中右元素是可选的,共享同一个键。并输出连接的结果(OutputT)。该操作符输出类型为KV<K, OutputT>的数据集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  LeftJoin.named("left-join-length-to-words")
      .of(left, right)
      .by(le -> le, String::length) // key extractors
      .using(
          (Integer l, Optional<String> r, Collector<String> c) ->
              c.collect(l + "+" + r.orElse(null)))
      .output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
Euphoria支持针对LeftJoin的称为“BroadcastHashJoin”的性能优化。当连接两个数据集时,其中一个数据集适合内存(在LeftJoin中,右侧数据集必须适合内存),广播连接效率非常高。如何在翻译部分中使用“广播哈希连接”。

RightJoin

表示在给定键上对两个(左和右)数据集进行右连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftTRightT。连接本身由用户提供的BinaryFunctor执行,它从两个数据集中分别消费一个元素,其中左元素是可选的,共享同一个键。并输出连接的结果(OutputT)。该操作符输出类型为KV<K, OutputT>的数据集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  RightJoin.named("right-join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, String r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r))
    .output();
    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
    // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
    // KV(8, "null+elephant"), KV(5, "null+mouse")]
Euphoria支持针对RightJoin的称为“BroadcastHashJoin”的性能优化。当连接两个数据集时,其中一个数据集适合内存(在RightJoin中,左侧数据集必须适合内存),广播连接效率非常高。如何在翻译部分中使用“广播哈希连接”。

FullJoin

表示在给定键上对两个(左和右)数据集进行完全外部连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftTRightT。连接本身由用户提供的BinaryFunctor执行,它从两个数据集中分别消费一个元素,其中两个元素都是可选的,共享同一个键。并输出连接的结果(OutputT)。该操作符输出类型为KV<K, OutputT>的数据集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  FullJoin.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, Optional<String> r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r.orElse(null)))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
//  KV(1, "null+elephant"), KV(5, "null+mouse")]

MapElements

将一个输入类型为InputT的输入元素转换为另一个(可能相同)类型为OutputT的输出元素。转换通过用户指定的UnaryFunction完成。

// suppose inputs contains: [ 0, 1, 2, 3, 4, 5]
PCollection<String> strings =
  MapElements.named("int2str")
    .of(input)
    .using(i -> "#" + i)
    .output();
// strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]

FlatMap

将一个输入类型为InputT的输入元素转换为零个或多个另一个(可能相同)类型为OutputT的输出元素。转换通过用户指定的UnaryFunctor完成,其中Collector<OutputT>用于输出元素。注意与MapElements的相似性,它始终只能输出一个元素。

// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
  FlatMap.named("str2char")
    .of(words)
    .using(
      (String s, Collector<String> collector) -> {
        for (int i = 0; i < s.length(); i++) {
          char c = s.charAt(i);
          collector.collect(String.valueOf(c));
        }
      })
    .output();
// characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]
FlatMap可用于确定元素的时间戳。它通过在构建时提供ExtractEventTime时间提取器的实现来完成。有一个专门的AssignEventTime操作符用于为元素分配时间戳。考虑使用它,您的代码可能更易读。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  FlatMap.named("extract-event-time")
    .of(events)
    .using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
    .eventTimeBy(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

Filter

Filter丢弃所有不满足给定条件的元素。条件由用户作为UnaryPredicate的实现提供。输入和输出元素具有相同的类型。

// suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
PCollection<Integer> divisibleBythree =
  Filter.named("divisibleByThree").of(nums).by(e -> e % 3 == 0).output();
//divisibleBythree will contain: [ 0, 3, 6, 9]

ReduceByKey

通过用户提供的reduce函数,对具有相同键的InputT类型元素进行聚合。键通过UnaryFunction从每个元素中提取,它接受输入元素并输出其类型为K的键。元素可以选择映射到类型为V的值,它发生在元素洗牌之前,因此它会对性能产生积极的影响。

最后,具有相同键的元素由用户定义的ReduceFunctorReduceFunctionCombinableReduceFunction聚合。它们在它们所接受的参数数量和输出解释方式上有所不同。ReduceFunction基本上是一个将Stream的元素作为输入并输出一个聚合结果的函数。ReduceFunctor接受第二个Collector,它允许访问Context。当提供CombinableReduceFunction时,在洗牌之前执行部分约简,因此需要通过网络传输的数据更少。

以下示例展示了ReduceByKey操作符的基本用法,包括值提取。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-counts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(Stream::count)
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

现在假设我们想使用计数器跟踪我们的ReduceByKey内部。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(
      (Stream<Integer> s, Collector<Long> collector) -> {
        collector.collect(s.count());
        collector.asContext().getCounter("num-of-keys").increment();
      })
      .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

再次使用优化的组合输出的相同示例。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will e used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
请注意,提供的CombinableReduceFunction必须是关联的和可交换的才能真正组合。因此它可以用于在洗牌之前计算部分结果。然后将部分结果合并为一个。这就是为什么简单的Stream::count在本示例中无法像上一个示例那样工作的原因。

Euphoria旨在使代码易于编写和阅读。因此,已经有一些支持以Fold或折叠函数的形式编写可组合约简函数。它允许用户只提供约简逻辑(BinaryFunction)并从中创建CombinableReduceFunction。提供的BinaryFunction仍然必须是关联的。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(Fold.of((l1, l2) -> l1 + l2))
    .output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

ReduceWindow

约简窗口中的所有元素。该操作符对应于ReduceByKey,所有元素的键都相同,因此实际的键仅由窗口定义。

//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();

PCollection<Integer> output =
  ReduceWindow.of(withEventTime)
    .combineBy(Fold.of((i1, i2) -> i1 + i2))
    .windowBy(FixedWindows.of(Duration.millis(5000)))
    .triggeredBy(DefaultTrigger.of())
    .discardingFiredPanes()
    .output();
//output will contain: [ 10, 26 ]

SumByKey

对具有相同键的元素求和。要求输入数据集通过给定的键提取器(UnaryFunction)映射到键。通过值提取器,同样是输出到LongUnaryFunction,到值。然后这些值按键分组并求和。输出以KV<K, Long>K是键类型)的形式发出,其中每个KV包含键和输入数据集中该键的元素数量。

//suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
PCollection<KV<Integer, Long>> output =
  SumByKey.named("sum-odd-and-even")
    .of(input)
    .keyBy(e -> e % 2)
    .valueBy(e -> (long) e)
    .output();
// output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]

Union

合并至少两个相同类型的数据集,没有任何关于元素排序的保证。

//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
  Union.named("to-animals")
    .of(cats, rodents)
    .output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"

TopPerKey

对每个键发出一个评级最高的元素。类型为K的键由给定的UnaryFunction提取。另一个UnaryFunction提取器允许将输入元素转换为类型为V的值。选择顶层元素基于分数,该分数通过用户提供的名为分数计算器(UnaryFunction)从每个元素中获得。分数类型表示为ScoreT,它需要扩展Comparable<ScoreT>,以便可以直接比较两个元素的分数。输出数据集元素的类型为Triple<K, V, ScoreT>

// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
  TopPerKey.named("longest-animal-names")
    .of(animals)
    .keyBy(name -> name.charAt(0)) // first character is the key
    .valueBy(UnaryFunction.identity()) // value type is the same as input element type
    .scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
    .output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey是一个洗牌操作符,因此它允许定义窗口。

AssignEventTime

当应用窗口时,Euphoria需要知道如何从元素中提取时间戳。AssignEventTime通过给定的ExtractEventTime函数的实现来告诉 Euphoria 如何执行此操作。

// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  AssignEventTime.named("extract-event-time")
    .of(events)
    .using(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

翻译

Euphoria API 建立在 Beam Java SDK 之上。该 API 在后台被透明地转换为 Beam 的PTransforms

Euphoria API 被转换为 Beam Java SDK 的事实给了我们微调翻译本身的选项。Operator的翻译通过OperatorTranslator的实现来实现。Euphoria 使用TranslationProvider来决定应该使用哪个翻译器。Euphoria API 的用户可以通过TranslationProvider提供自己的OperatorTranslator,方法是扩展EuphoriaOptions。Euphoria 已经包含了一些有用的实现。

翻译提供者

GenericTranslatorProvider

一般的TranslationProvider。允许以三种不同的方式注册OperatorTranslator

GenericTranslatorProvider.newBuilder()
  .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
  .register(
    Join.class,
    (Join op) -> {
      String name = ((Optional<String>) op.getName()).orElse("");
      return name.toLowerCase().startsWith("broadcast");
    },
    new BroadcastHashJoinTranslator<>()) // register by class and predicate
  .register(
    op -> op instanceof CompositeOperator,
    new CompositeOperatorTranslator<>()) // register by predicate only
  .build();

GenericTranslatorProvider是默认提供者,请参见GenericTranslatorProvider.createWithDefaultTranslators()

CompositeProvider

以给定的顺序实现TranslationProvider的链接。这反过来允许将用户定义的TranslationProvider与 Euphoria API 已经提供的TranslationProvider组合。

CompositeProvider.of(
  CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for translator
  GenericTranslatorProvider.createWithDefaultTranslators()); // then ask default provider if needed

运算符翻译器

每个Operator都需要转换为 Java Beam SDK。这由OperatorTranslator的实现完成。Euphoria API 包含与它提供的每个Operator实现一起提供的翻译器。某些操作符可能具有适合某些情况的替代翻译。Join通常可能有很多实现。我们只在这里描述最有趣的实现。

BroadcastHashJoinTranslator

当其中一侧的整个数据集适合目标执行器的内存时,能够翻译LeftJoinRightJoin。因此可以使用 Beam 的边输入进行分发。导致更好的性能。

CompositeOperatorTranslator

某些操作符是复合的。这意味着它们实际上是其他操作符的包装链。CompositeOperatorTranslator确保它们在翻译过程中分解为基本操作符。

详细信息

大多数翻译发生在org.apache.beam.sdk.extensions.euphoria.core.translate包中。其中最有趣的类是

该包还包含每个支持的操作符类型的OperatorTranslator的实现(JoinTranslatorFlatMapTranslatorReduceByKeyTranslator)。并非每个操作符都需要拥有自己的翻译器。其中一些可以由其他操作符组成。这就是为什么操作符可以实现CompositeOperator,它使它们可以选择扩展到一组其他 Euphoria 操作符。

翻译过程的设计考虑了灵活性。我们希望允许以不同的方式将高级 Euphoria 操作符转换为 Beam SDK 的基元。它允许根据用户选择或根据自动获取的有关数据的某些知识进行进一步的性能优化。

不支持的功能

原始 Euphoria包含一些 Beam 端口中尚未支持的功能和操作符。以下列出了尚未支持的功能