博客
2019/06/04
在 Beam SQL CLI 中添加新的数据源
Apache Beam 中一项令人兴奋的新功能是能够在管道中使用 SQL。这是通过在 Java 管道中使用 Beam 的 SqlTransform
来实现的。
Beam 还提供了一个炫酷的 SQL 命令行界面,您可以使用它来交互式地查询您的数据,无论是批处理还是流式处理。如果您还没有尝试过,可以查看 https://bit.ly/ExploreBeamSQL。
SQL CLI 的一个很酷的功能是您可以使用 CREATE EXTERNAL TABLE
命令来添加要在 CLI 中访问的数据源。目前,CLI 支持从 BigQuery、PubSub、Kafka 和文本文件创建表。在这篇文章中,我们将探讨如何添加新的数据源,以便您可以从其他 Beam 源消费数据。
我们将在这篇文章中实现的表提供程序将生成一个持续的无界整数流。它将基于 Beam SDK 中的 GenerateSequence
PTransform。最终,我们将能够像这样在 SQL 中定义和使用序列生成器
CREATE EXTERNAL TABLE -- all tables in Beam are external, they are not persisted
sequenceTable -- table alias that will be used in queries
(
sequence BIGINT, -- sequence number
event_timestamp TIMESTAMP -- timestamp of the generated event
)
TYPE sequence -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated
我们将能够像这样在查询中使用它
SELECT sequence FROM sequenceTable;
让我们深入了解!
实现 TableProvider
Beam 的 SqlTransform
通过依赖 TableProvider
来工作,当使用 CREATE EXTERNAL TABLE
语句时,它会使用 TableProvider
。如果您想在 Beam SQL CLI 中添加一个新的数据源,那么您将需要添加一个 TableProvider
来完成此操作。在这篇文章中,我将展示为 Java SDK 中可用的 GenerateSequence
变换 创建一个新的表提供程序所需的步骤。
TableProvider
类位于 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/
。如果您查看该目录,您会找到所有可用数据源的提供程序及其实现。因此,您只需添加您想要的提供程序,以及一个 BaseBeamTable
的实现。
GenerateSequenceTableProvider
我们的表提供程序如下所示
它所做的就是为表提供一个类型 - 并且它实现了 buildBeamSqlTable
方法,该方法只返回由我们的 GenerateSequenceTable
实现定义的 BeamSqlTable
。
GenerateSequenceTable
我们想要一个支持流式处理的表实现,因此我们将允许用户定义每秒要发射的元素数量。我们将定义一个简单的表,该表以流式方式发射顺序整数。如下所示
class GenerateSequenceTable extends BaseBeamTable implements Serializable {
public static final Schema TABLE_SCHEMA =
Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));
Integer elementsPerSecond = 5;
GenerateSequenceTable(Table table) {
super(TABLE_SCHEMA);
if (table.getProperties().containsKey("elementsPerSecond")) {
elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
}
}
@Override
public PCollection.IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
return begin
.apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
.apply(
MapElements.into(TypeDescriptor.of(Row.class))
.via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
.setRowSchema(getSchema());
}
@Override
public POutput buildIOWriter(PCollection<Row> input) {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
真正的乐趣
现在我们已经实现了两个基本类(一个 BaseBeamTable
和一个 TableProvider
),我们可以开始使用它们了。在构建了 SQL CLI 之后,我们现在可以在表上执行选择操作
0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . > sequence BIGINT COMMENT 'this is the primary key',
. . . . . > event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)
让我们选择几行
0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
| sequence | event_time |
+---------------------+------------+
| 0 | 2019-05-21 00:36:33 |
| 1 | 2019-05-21 00:36:33 |
| 2 | 2019-05-21 00:36:33 |
| 3 | 2019-05-21 00:36:33 |
| 4 | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)
现在让我们尝试一些更有趣的事情。例如,分组。这也将让我们确保我们正在为每一行正确地提供时间戳
0: BeamSQL> SELECT
. . . . . > COUNT(sequence) as elements,
. . . . . > TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
| elements | window_start |
+---------------------+--------------+
| 6 | 2019-06-05 00:39:24 |
| 10 | 2019-06-05 00:39:26 |
| 10 | 2019-06-05 00:39:28 |
| 10 | 2019-06-05 00:39:30 |
| 10 | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)
瞧!我们可以开始对我们的序列生成器进行一些有趣的流式查询。