Euphoria Java 8 DSL
什么是 Euphoria
易于使用的 Java 8 API,构建在 Beam 的 Java SDK 之上。API 提供了对数据转换的 高级抽象,专注于 Java 8 语言特性(例如 lambda 和流)。它与现有的 Beam SDK 完全互操作,可以来回转换。它允许通过使用(可选的)Kryo 基于编码器、lambda 和高级运算符来快速原型设计,并且可以无缝集成到现有的 Beam Pipelines
中。
Euphoria API 项目始于 2014 年,其明确目标是为 Seznam.cz 的数据基础设施提供主要构建块。在 2015 年,DataFlow 白皮书 启发了最初的作者更进一步,还为流处理和批处理提供统一的 API。该 API 于 2016 年开源,目前仍在积极开发中。由于 Beam 社区的目标非常相似,我们决定将该 API 贡献为 Beam Java SDK 之上的高级 DSL,并将我们的努力与社区分享。
Euphoria DSL 集成仍在进行中,并作为 BEAM-3900 的一部分进行跟踪。
WordCount 示例
让我们从一个小例子开始。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);
// Source of data loaded from Beam IO.
PCollection<String> input =
pipeline
.apply(Create.of(textLineByLine))
.setTypeDescriptor(TypeDescriptor.of(String.class));
// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
FlatMap.named("TOKENIZER")
.of(lines)
.using(
(String line, Collector<String> context) -> {
for (String word : Splitter.onPattern("\\s+").split(line)) {
context.collect(word);
}
})
.output();
// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
CountByKey.named("COUNT")
.of(words)
.keyBy(w -> w)
.output();
// Format output.
PCollection<String> output =
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getKey() + ": " + p.getValue())
.output();
// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
.apply(TextIO.write()
.to("counted_words"));
pipeline.run();
Euphoria 指南
Euphoria API 由一组运算符组成,这些运算符允许您根据应用程序需求构建 Pipeline
。
输入和输出
输入数据可以通过 Beams IO 提供到 PCollection
中,与 Beam 中的方式相同。
添加运算符
Euphoria API 的真正强大之处在于它的 运算符套件。每个运算符都会消耗一个或多个输入,并生成一个输出 PCollection
。让我们看一个简单的 MapElements
示例。
input
,它会对 input
中的每个元素应用给定的 lambda 表达式 (String::valueOf
),并返回映射的 PCollection
。在创建运算符时,开发人员会通过一系列步骤进行指导,因此运算符的声明非常直观。要开始构建运算符,只需编写它的名称和 ‘.’(点)。您的 IDE 会提供提示。构建任何运算符的第一步是通过 named()
方法为其命名。名称会通过系统传播,以后可以在调试时使用。
编码器和类型
Beam 的 Java SDK 要求开发人员为自定义元素类型提供 Coder
,以便能够对元素进行物化。Euphoria 允许使用 Kryo 作为一种序列化方式。Kryo 位于 :sdks:java:extensions:kryo
模块中。
//gradle
dependencies {
compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-kryo</artifactId>
<version>${beam.version}</version>
</dependency>
您只需要创建 KryoCoderProvider
并将其注册到您的 Pipeline
中。有两种方法可以做到这一点。
在原型设计时,您可能决定不关心编码器,然后创建一个没有任何类注册到 Kryo 的 KryoCoderProvider
。
KryoCoderProvider
将为每个非基本元素类型返回 KryoCoder
。这当然会降低性能,因为 Kryo 无法有效地序列化未知类型的实例。但这会加快管道开发速度。此行为默认启用,可以在创建 Pipeline
时通过 KryoOptions
禁用。第二种更注重性能的方法是注册 Kryo 将序列化的所有类型。有时,注册 Kryo 自身的序列化器也是一个好主意。Euphoria 允许您通过实现自己的 KryoRegistrar
并将其用于创建 KryoCoderProvider
来做到这一点。
TypeDescriptor
。TypeDescriptors
时,Euphoria 运算符将使用 TypeDescriptor<Object>
。因此,如果 KryoOptions
允许,KryoCoderProvider
可能会为每个类型未知的元素返回 KryoCoder<Object>
。当使用 .setKryoRegistrationRequired(true)
时,提供 TypeDescriptors
变得强制性。指标和累加器
有关作业内部情况的统计信息在开发分布式作业时非常有用。Euphoria 将它们称为累加器。可以通过环境 Context
访问它们,Context
可以从 Collector
中获取,只要使用它即可。通常在从运算符预期零到多个输出元素的情况下存在。例如,在 FlatMap
的情况下。
MapElements
还允许通过提供 UnaryFunctionEnv
的实现(添加第二个上下文参数)而不是 UnaryFunctor
来访问 Context
。窗口化
Euphoria 遵循与 Beam Java SDK 相同的 窗口化原则。每个混洗运算符(需要通过网络混洗数据的运算符)都允许您设置它。需要与 Beam 中相同的参数。WindowFn
、Trigger
、WindowingStrategy
等。在构建运算符时,用户会被指导设置所有必需参数和几个可选参数,或者根本不设置任何参数。窗口化会通过 Pipeline
向下传播。
PCollection<KV<Integer, Long>> countedElements =
CountByKey.of(input)
.keyBy(e -> e)
.windowBy(FixedWindows.of(Duration.standardSeconds(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(5))
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.output();
如何获取 Euphoria
Euphoria 位于 Apache Beam 项目的 dsl-euphoria
分支、beam-sdks-java-extensions-euphoria
模块中。要构建 euphoria
子项目,请调用
./gradlew beam-sdks-java-extensions-euphoria:build
运算符参考
运算符基本上是更高级的数据转换,允许您以简单的方式构建数据处理作业的业务逻辑。所有 Euphoria 运算符都在本节中进行了介绍,包括示例。为了简单起见,没有使用 窗口化 的示例。有关更多详细信息,请参阅 窗口化部分。
CountByKey
计算具有相同键的元素数量。要求输入数据集由给定的键提取器 (UnaryFunction
) 映射到键,然后对这些键进行计数。输出以 KV<K, Long>
(K
是键类型)的形式发出,其中每个 KV
都包含键和输入数据集对该键的元素数量。
Distinct
输出不同的(基于 equals 方法)元素。它接受可选的 UnaryFunction
映射器参数,该参数将元素映射到输出类型。
Distinct
。Join
表示两个(左和右)数据集在给定键上的内连接,生成一个新的数据集。键是从两个数据集都提取的,因此左边的元素和右边的元素可以具有不同的类型,表示为 LeftT
和 RightT
。连接本身是由用户提供的 BinaryFunctor
执行的,该函数消耗来自两个数据集共享相同键的元素。并输出连接的结果 (OutputT
)。运算符会发出 KV<K, OutputT>
类型的输出数据集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
Join.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
表示在给定键上对两个(左和右)数据集进行左连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftT
和RightT
。连接本身由用户提供的BinaryFunctor
执行,它从两个数据集中分别消费一个元素,其中右元素是可选的,共享同一个键。并输出连接的结果(OutputT
)。该操作符输出类型为KV<K, OutputT>
的数据集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
LeftJoin.named("left-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Integer l, Optional<String> r, Collector<String> c) ->
c.collect(l + "+" + r.orElse(null)))
.output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
的称为“BroadcastHashJoin”的性能优化。当连接两个数据集时,其中一个数据集适合内存(在LeftJoin
中,右侧数据集必须适合内存),广播连接效率非常高。如何在翻译部分中使用“广播哈希连接”。RightJoin
表示在给定键上对两个(左和右)数据集进行右连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftT
和RightT
。连接本身由用户提供的BinaryFunctor
执行,它从两个数据集中分别消费一个元素,其中左元素是可选的,共享同一个键。并输出连接的结果(OutputT
)。该操作符输出类型为KV<K, OutputT>
的数据集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
RightJoin.named("right-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, String r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
// KV(8, "null+elephant"), KV(5, "null+mouse")]
RightJoin
的称为“BroadcastHashJoin”的性能优化。当连接两个数据集时,其中一个数据集适合内存(在RightJoin
中,左侧数据集必须适合内存),广播连接效率非常高。如何在翻译部分中使用“广播哈希连接”。FullJoin
表示在给定键上对两个(左和右)数据集进行完全外部连接,生成一个新的数据集。键通过单独的提取器从两个数据集中提取,因此左和右中的元素可以具有不同的类型,表示为LeftT
和RightT
。连接本身由用户提供的BinaryFunctor
执行,它从两个数据集中分别消费一个元素,其中两个元素都是可选的,共享同一个键。并输出连接的结果(OutputT
)。该操作符输出类型为KV<K, OutputT>
的数据集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
FullJoin.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, Optional<String> r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r.orElse(null)))
.output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
// KV(1, "null+elephant"), KV(5, "null+mouse")]
MapElements
将一个输入类型为InputT
的输入元素转换为另一个(可能相同)类型为OutputT
的输出元素。转换通过用户指定的UnaryFunction
完成。
FlatMap
将一个输入类型为InputT
的输入元素转换为零个或多个另一个(可能相同)类型为OutputT
的输出元素。转换通过用户指定的UnaryFunctor
完成,其中Collector<OutputT>
用于输出元素。注意与MapElements
的相似性,它始终只能输出一个元素。
// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
FlatMap.named("str2char")
.of(words)
.using(
(String s, Collector<String> collector) -> {
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
collector.collect(String.valueOf(c));
}
})
.output();
// characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."]
FlatMap
可用于确定元素的时间戳。它通过在构建时提供ExtractEventTime
时间提取器的实现来完成。有一个专门的AssignEventTime
操作符用于为元素分配时间戳。考虑使用它,您的代码可能更易读。// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
FlatMap.named("extract-event-time")
.of(events)
.using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
.eventTimeBy(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
Filter
Filter
丢弃所有不满足给定条件的元素。条件由用户作为UnaryPredicate
的实现提供。输入和输出元素具有相同的类型。
ReduceByKey
通过用户提供的reduce函数,对具有相同键的InputT
类型元素进行聚合。键通过UnaryFunction
从每个元素中提取,它接受输入元素并输出其类型为K
的键。元素可以选择映射到类型为V
的值,它发生在元素洗牌之前,因此它会对性能产生积极的影响。
最后,具有相同键的元素由用户定义的ReduceFunctor
、ReduceFunction
或CombinableReduceFunction
聚合。它们在它们所接受的参数数量和输出解释方式上有所不同。ReduceFunction
基本上是一个将Stream
的元素作为输入并输出一个聚合结果的函数。ReduceFunctor
接受第二个Collector
,它允许访问Context
。当提供CombinableReduceFunction
时,在洗牌之前执行部分约简,因此需要通过网络传输的数据更少。
以下示例展示了ReduceByKey
操作符的基本用法,包括值提取。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-counts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(Stream::count)
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
现在假设我们想使用计数器跟踪我们的ReduceByKey
内部。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(
(Stream<Integer> s, Collector<Long> collector) -> {
collector.collect(s.count());
collector.asContext().getCounter("num-of-keys").increment();
})
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
再次使用优化的组合输出的相同示例。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will e used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
CombinableReduceFunction
必须是关联的和可交换的才能真正组合。因此它可以用于在洗牌之前计算部分结果。然后将部分结果合并为一个。这就是为什么简单的Stream::count
在本示例中无法像上一个示例那样工作的原因。Euphoria旨在使代码易于编写和阅读。因此,已经有一些支持以Fold
或折叠函数的形式编写可组合约简函数。它允许用户只提供约简逻辑(BinaryFunction
)并从中创建CombinableReduceFunction
。提供的BinaryFunction
仍然必须是关联的。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(Fold.of((l1, l2) -> l1 + l2))
.output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
ReduceWindow
约简窗口中的所有元素。该操作符对应于ReduceByKey
,所有元素的键都相同,因此实际的键仅由窗口定义。
//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();
PCollection<Integer> output =
ReduceWindow.of(withEventTime)
.combineBy(Fold.of((i1, i2) -> i1 + i2))
.windowBy(FixedWindows.of(Duration.millis(5000)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.output();
//output will contain: [ 10, 26 ]
SumByKey
对具有相同键的元素求和。要求输入数据集通过给定的键提取器(UnaryFunction
)映射到键。通过值提取器,同样是输出到Long
的UnaryFunction
,到值。然后这些值按键分组并求和。输出以KV<K, Long>
(K
是键类型)的形式发出,其中每个KV
包含键和输入数据集中该键的元素数量。
Union
合并至少两个相同类型的数据集,没有任何关于元素排序的保证。
//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
Union.named("to-animals")
.of(cats, rodents)
.output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
TopPerKey
对每个键发出一个评级最高的元素。类型为K
的键由给定的UnaryFunction
提取。另一个UnaryFunction
提取器允许将输入元素转换为类型为V
的值。选择顶层元素基于分数,该分数通过用户提供的名为分数计算器(UnaryFunction
)从每个元素中获得。分数类型表示为ScoreT
,它需要扩展Comparable<ScoreT>
,以便可以直接比较两个元素的分数。输出数据集元素的类型为Triple<K, V, ScoreT>
。
// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
.scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
.output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey
是一个洗牌操作符,因此它允许定义窗口。AssignEventTime
当应用窗口时,Euphoria需要知道如何从元素中提取时间戳。AssignEventTime
通过给定的ExtractEventTime
函数的实现来告诉 Euphoria 如何执行此操作。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
AssignEventTime.named("extract-event-time")
.of(events)
.using(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
翻译
Euphoria API 建立在 Beam Java SDK 之上。该 API 在后台被透明地转换为 Beam 的PTransforms
。
Euphoria API 被转换为 Beam Java SDK 的事实给了我们微调翻译本身的选项。Operator
的翻译通过OperatorTranslator
的实现来实现。Euphoria 使用TranslationProvider
来决定应该使用哪个翻译器。Euphoria API 的用户可以通过TranslationProvider
提供自己的OperatorTranslator
,方法是扩展EuphoriaOptions
。Euphoria 已经包含了一些有用的实现。
翻译提供者
GenericTranslatorProvider
一般的TranslationProvider
。允许以三种不同的方式注册OperatorTranslator
- 通过操作符类注册操作符特定的翻译器。
- 通过操作符类和额外的用户定义谓词注册操作符特定的翻译器。
- 使用用户定义的谓词注册通用(不特定于一种操作符类型)翻译器。注册顺序很重要,因为
GenericTranslatorProvider
返回第一个合适的翻译器。
GenericTranslatorProvider.newBuilder()
.register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
.register(
Join.class,
(Join op) -> {
String name = ((Optional<String>) op.getName()).orElse("");
return name.toLowerCase().startsWith("broadcast");
},
new BroadcastHashJoinTranslator<>()) // register by class and predicate
.register(
op -> op instanceof CompositeOperator,
new CompositeOperatorTranslator<>()) // register by predicate only
.build();
GenericTranslatorProvider
是默认提供者,请参见GenericTranslatorProvider.createWithDefaultTranslators()
。
CompositeProvider
以给定的顺序实现TranslationProvider
的链接。这反过来允许将用户定义的TranslationProvider
与 Euphoria API 已经提供的TranslationProvider
组合。
运算符翻译器
每个Operator
都需要转换为 Java Beam SDK。这由OperatorTranslator
的实现完成。Euphoria API 包含与它提供的每个Operator
实现一起提供的翻译器。某些操作符可能具有适合某些情况的替代翻译。Join
通常可能有很多实现。我们只在这里描述最有趣的实现。
BroadcastHashJoinTranslator
当其中一侧的整个数据集适合目标执行器的内存时,能够翻译LeftJoin
和RightJoin
。因此可以使用 Beam 的边输入进行分发。导致更好的性能。
CompositeOperatorTranslator
某些操作符是复合的。这意味着它们实际上是其他操作符的包装链。CompositeOperatorTranslator
确保它们在翻译过程中分解为基本操作符。
详细信息
大多数翻译发生在org.apache.beam.sdk.extensions.euphoria.core.translate
包中。其中最有趣的类是
OperatorTranslator
- 定义 Euphoria 到 Beam 翻译内部 API 的接口。TranslatorProvider
- 提供自定义翻译器的方式。OperatorTransform
- 控制 Euphoria 操作符对 Beam 的PTransform
的实际翻译和/或扩展。EuphoriaOptions
- 一个PipelineOptions
,允许设置自定义TranslatorProvider
。
该包还包含每个支持的操作符类型的OperatorTranslator
的实现(JoinTranslator
、FlatMapTranslator
、ReduceByKeyTranslator
)。并非每个操作符都需要拥有自己的翻译器。其中一些可以由其他操作符组成。这就是为什么操作符可以实现CompositeOperator
,它使它们可以选择扩展到一组其他 Euphoria 操作符。
翻译过程的设计考虑了灵活性。我们希望允许以不同的方式将高级 Euphoria 操作符转换为 Beam SDK 的基元。它允许根据用户选择或根据自动获取的有关数据的某些知识进行进一步的性能优化。
不支持的功能
原始 Euphoria包含一些 Beam 端口中尚未支持的功能和操作符。以下列出了尚未支持的功能
- 原始 Euphoria 中的
ReduceByKey
允许对输出值(按键)进行排序。这也无法转换为 Beam,因此不支持。