Beam SQL 入门指南
此页面通过示例代码演示了 Beam SQL 的用法。
Beam 架构和行
SQL 查询只能应用于 PCollection<T>
,其中 T
已注册架构,或 PCollection<Row>
。有关为类型 T
注册架构的详细信息,请参阅 Beam 编程指南中的 架构文档。
如果您没有现有的类型 T
,可以通过多种方式获取 PCollection<Row>
,例如
**从内存中数据**(通常用于单元测试)。
**注意:**您必须显式指定
Row
编码器。在本例中,我们通过调用Create.of(..)
来实现。// Define the schema for the records. Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build(); // Create a concrete row with that type. Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build(); // Create a source PCollection containing only that row PCollection<Row> testApps = PBegin .in(p) .apply(Create .of(row) .withCoder(RowCoder.of(appSchema)));
**从某种其他类型的记录的
PCollection<T>
**(即T
尚未为Row
)中获取,通过应用将输入记录转换为Row
格式的ParDo
// An example POJO class. class AppPojo { Integer appId; String description; Date timestamp; } // Acquire a collection of POJOs somehow. PCollection<AppPojo> pojos = ... // Convert them to Rows with the same schema as defined above via a DoFn. PCollection<Row> apps = pojos .apply( ParDo.of(new DoFn<AppPojo, Row>() { @ProcessElement public void processElement(ProcessContext c) { // Get the current POJO instance AppPojo pojo = c.element(); // Create a Row with the appSchema schema // and values from the current POJO Row appRow = Row .withSchema(appSchema) .addValues( pojo.appId, pojo.description, pojo.timestamp) .build(); // Output the Row representing the current POJO c.output(appRow); } })).setRowSchema(appSchema);
**作为另一个
SqlTransform
的输出**。下一节将详细介绍。
一旦您获得了 PCollection<Row>
,就可以使用 SqlTransform
对其应用 SQL 查询。
SqlTransform
SqlTransform.query(queryString)
方法是唯一一个从 SQL 查询的字符串表示创建 PTransform
的 API。您可以将此 PTransform
应用于单个 PCollection
或包含多个 PCollections
的 PCollectionTuple
当应用于单个
PCollection
时,它可以在查询中通过表名PCOLLECTION
来引用当应用于
PCollectionTuple
时,元组中每个PCollection
的元组标签定义了可用于查询它的表名。请注意,表名绑定到特定的PCollectionTuple
,因此仅在应用于它的查询上下文中有效。例如,您可以联接两个
PCollections
// Create the schema for reviews Schema reviewSchema = Schema .builder() .addInt32Field("appId") .addInt32Field("reviewerId") .addFloatField("rating") .addDateTimeField("rowtime") .build(); // Obtain the reviews records with this schema PCollection<Row> reviewsRows = ... // Create a PCollectionTuple containing both PCollections. // TupleTags IDs will be used as table names in the SQL query PCollectionTuple namesAndFoods = PCollectionTuple .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example .and(new TupleTag<>("Reviews"), reviewsRows); // Compute the total number of reviews // and average rating per app // by joining two PCollections PCollection<Row> output = namesAndFoods.apply( SqlTransform.query( "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) " + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId " + "GROUP BY Apps.appId"));
代码库中的 BeamSqlExample 演示了这两个 API 的基本用法。