ParDo
Javadoc用于通用并行处理的转换。ParDo
转换会考虑输入PCollection
中的每个元素,对该元素执行一些处理函数(你的用户代码),并将零个或多个元素发射到输出 PCollection。
有关更多信息,请参见 Beam 编程指南.
示例
示例 1: 传递侧输入
// Pass side inputs to your ParDo transform by invoking .withSideInputs.
// Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.
// The input PCollection to ParDo.
PCollection<String> words = ...;
// A PCollection of word lengths that we'll combine into a single value.
PCollection<Integer> wordLengths = ...; // Singleton PCollection
// Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
final PCollectionView<Integer> maxWordLengthCutOffView =
wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());
// Apply a ParDo that takes maxWordLengthCutOffView as a side input.
PCollection<String> wordsBelowCutOff =
words.apply(ParDo
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String word = c.element();
// In our DoFn, access the side input.
int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
if (word.length() <= lengthCutOff) {
c.output(word);
}
}
}).withSideInputs(maxWordLengthCutOffView)
);
示例 2: 在你的DoFn
中发射到多个输出
// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection
// that your ParDo produces. For example, if your ParDo produces three output PCollections (the main output
// and two additional outputs), you must create three TupleTags. The following example code shows how to
// create TupleTags for a ParDo with three output PCollections.
// Input PCollection to our ParDo.
PCollection<String> words = ...;
// The ParDo will filter words whose length is below a cutoff and add them to
// the main output PCollection<String>.
// If a word is above the cutoff, the ParDo will add the word length to an
// output PCollection<Integer>.
// If a word starts with the string "MARKER", the ParDo will add that word to an
// output PCollection<String>.
final int wordLengthCutOff = 10;
// Create three TupleTags, one for each output PCollection.
// Output that contains words below the length cutoff.
final TupleTag<String> wordsBelowCutOffTag =
new TupleTag<String>(){};
// Output that contains word lengths.
final TupleTag<Integer> wordLengthsAboveCutOffTag =
new TupleTag<Integer>(){};
// Output that contains "MARKER" words.
final TupleTag<String> markedWordsTag =
new TupleTag<String>(){};
// Passing Output Tags to ParDo:
// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking
// .withOutputTags. You pass the tag for the main output first, and then the tags for any additional outputs
// in a TupleTagList. Building on our previous example, we pass the three TupleTags for our three output
// PCollections to our ParDo. Note that all of the outputs (including the main output PCollection) are
// bundled into the returned PCollectionTuple.
PCollectionTuple results =
words.apply(ParDo
.of(new DoFn<String, String>() {
// DoFn continues here.
...
})
// Specify the tag for the main output.
.withOutputTags(wordsBelowCutOffTag,
// Specify the tags for the two additional outputs as a TupleTagList.
TupleTagList.of(wordLengthsAboveCutOffTag)
.and(markedWordsTag)));
示例 3: 多个输出的标签
// Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by passing in the
// appropriate TupleTag when you call ProcessContext.output.
// After your ParDo, extract the resulting output PCollections from the returned PCollectionTuple.
// Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs.
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
String word = c.element();
if (word.length() <= wordLengthCutOff) {
// Emit short word to the main output.
// In this example, it is the output with tag wordsBelowCutOffTag.
c.output(word);
} else {
// Emit long word length to the output with tag wordLengthsAboveCutOffTag.
c.output(wordLengthsAboveCutOffTag, word.length());
}
if (word.startsWith("MARKER")) {
// Emit word to the output with tag markedWordsTag.
c.output(markedWordsTag, word);
}
}}));
示例 4: 应用一个简单的自定义函数
相关转换
- MapElements 对集合中的每个元素应用一个简单的 1 对 1 映射函数。
- Filter 如果你只想决定是否输出元素,则很有用。
上次更新于 2024/10/31
你找到你想要的一切了吗?
它是否全部有用且清晰?你希望更改什么?请告诉我们!