模式模式
此页面上的示例描述了使用模式的常见模式。模式为我们提供了 Beam 记录的类型系统,它独立于任何特定的编程语言类型。可能有多个 Java 类都具有相同的模式(例如 Protocol-Buffer 类或 POJO 类),并且 Beam 允许我们无缝地在这些类型之间转换。模式还提供了一种简单的方法来推断跨不同编程语言 API 的类型。有关更多信息,请参见 编程指南中关于模式的部分.
- Java SDK
使用连接
Beam 支持对模式 PCollections
的等值连接,其中连接条件取决于字段子集的相等性。
如果你有多个集合提供有关相关事物的的信息,并且它们的结构已知,请考虑使用 Join
.
例如,假设我们有两个不同的集合包含用户数据:一个集合包含姓名和电子邮件地址;另一个集合包含姓名和电话号码。我们可以使用姓名作为公共键,将这两个集合连接起来,并将其他数据作为关联的值。连接后,我们得到一个集合,其中包含与每个姓名关联的所有信息(电子邮件地址和电话号码)。
以下概念示例使用两个输入集合来展示 Join
的机制。
首先,我们定义模式和用户数据。
// Define Schemas
Schema emailSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("email", Schema.FieldType.STRING));
Schema phoneSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("phone", Schema.FieldType.STRING));
// Create User Data Collections
final List<Row> emailUsers =
Arrays.asList(
Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
Row.withSchema(emailSchema).addValue("person4").addValue("person4@example.com").build(),
Row.withSchema(emailSchema)
.addValue("person6")
.addValue("person6@example.com")
.build());
final List<Row> phoneUsers =
Arrays.asList(
Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person5").addValue("777-333-4444").build());
然后我们为用户数据创建 Pcollections
,并使用 Join
对两个 PCollections
进行连接。
// Create/Read Schema PCollections
PCollection<Row> emailList =
p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));
PCollection<Row> phoneList =
p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));
// Perform Join
PCollection<Row> resultRow =
emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));
// Preview Result
resultRow.apply(
"Preview Result",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(x);
return "";
}));
/* Sample Output From the pipeline:
Row:[Row:[person1, person1@example.com], Row:[person1, 111-222-3333]]
Row:[Row:[person2, person2@example.com], Row:[person2, 222-333-4444]]
Row:[Row:[person4, person4@example.com], Row:[person4, 555-333-4444]]
Row:[Row:[person3, person3@example.com], Row:[person3, 444-333-4444]]
*/
结果 Row
的类型为 Row: [Row(emailSchema), Row(phoneSchema)]
,可以将其转换为所需格式,如下面的代码片段所示。
PCollection<String> result =
resultRow.apply(
"Format Output",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
String userInfo =
"Name: "
+ x.getRow(0).getValue("name")
+ " Email: "
+ x.getRow(0).getValue("email")
+ " Phone: "
+ x.getRow(1).getValue("phone");
System.out.println(userInfo);
return userInfo;
}));
/* Sample output From the pipeline
Name: person4 Email: person4@example.com Phone: 555-333-4444
Name: person2 Email: person2@example.com Phone: 222-333-4444
Name: person3 Email: person3@example.com Phone: 444-333-4444
Name: person1 Email: person1@example.com Phone: 111-222-3333
*/
上次更新于 2024/10/31
你找到了所有你需要的东西吗?
所有内容是否有用且清晰?有什么你想改变的吗?请告诉我们!