在 Beam SQL CLI 中添加新的数据源

Apache Beam 中一项令人兴奋的新功能是能够在管道中使用 SQL。这是通过在 Java 管道中使用 Beam 的 SqlTransform 来实现的。

Beam 还提供了一个炫酷的 SQL 命令行界面,您可以使用它来交互式地查询您的数据,无论是批处理还是流式处理。如果您还没有尝试过,可以查看 https://bit.ly/ExploreBeamSQL

SQL CLI 的一个很酷的功能是您可以使用 CREATE EXTERNAL TABLE 命令来添加要在 CLI 中访问的数据源。目前,CLI 支持从 BigQuery、PubSub、Kafka 和文本文件创建表。在这篇文章中,我们将探讨如何添加新的数据源,以便您可以从其他 Beam 源消费数据。

我们将在这篇文章中实现的表提供程序将生成一个持续的无界整数流。它将基于 Beam SDK 中的 GenerateSequence PTransform。最终,我们将能够像这样在 SQL 中定义和使用序列生成器

CREATE EXTERNAL TABLE                      -- all tables in Beam are external, they are not persisted
  sequenceTable                              -- table alias that will be used in queries
  (
         sequence BIGINT,                  -- sequence number
         event_timestamp TIMESTAMP         -- timestamp of the generated event
  )
TYPE sequence                              -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated

我们将能够像这样在查询中使用它

SELECT sequence FROM sequenceTable;

让我们深入了解!

实现 TableProvider

Beam 的 SqlTransform 通过依赖 TableProvider 来工作,当使用 CREATE EXTERNAL TABLE 语句时,它会使用 TableProvider。如果您想在 Beam SQL CLI 中添加一个新的数据源,那么您将需要添加一个 TableProvider 来完成此操作。在这篇文章中,我将展示为 Java SDK 中可用的 GenerateSequence 变换 创建一个新的表提供程序所需的步骤。

TableProvider 类位于 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/。如果您查看该目录,您会找到所有可用数据源的提供程序及其实现。因此,您只需添加您想要的提供程序,以及一个 BaseBeamTable 的实现。

GenerateSequenceTableProvider

我们的表提供程序如下所示

@AutoService(TableProvider.class)
public class GenerateSequenceTableProvider extends InMemoryMetaTableProvider {

  @Override
  public String getTableType() {
    return "sequence";
  }

  @Override
  public BeamSqlTable buildBeamSqlTable(Table table) {
    return new GenerateSequenceTable(table);
  }
}

它所做的就是为表提供一个类型 - 并且它实现了 buildBeamSqlTable 方法,该方法只返回由我们的 GenerateSequenceTable 实现定义的 BeamSqlTable

GenerateSequenceTable

我们想要一个支持流式处理的表实现,因此我们将允许用户定义每秒要发射的元素数量。我们将定义一个简单的表,该表以流式方式发射顺序整数。如下所示

class GenerateSequenceTable extends BaseBeamTable implements Serializable {
  public static final Schema TABLE_SCHEMA =
      Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));

  Integer elementsPerSecond = 5;

  GenerateSequenceTable(Table table) {
    super(TABLE_SCHEMA);
    if (table.getProperties().containsKey("elementsPerSecond")) {
      elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
    }
  }

  @Override
  public PCollection.IsBounded isBounded() {
    return IsBounded.UNBOUNDED;
  }

  @Override
  public PCollection<Row> buildIOReader(PBegin begin) {
    return begin
        .apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
        .apply(
            MapElements.into(TypeDescriptor.of(Row.class))
                .via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
        .setRowSchema(getSchema());
  }

  @Override
  public POutput buildIOWriter(PCollection<Row> input) {
    throw new UnsupportedOperationException("buildIOWriter unsupported!");
  }
}

真正的乐趣

现在我们已经实现了两个基本类(一个 BaseBeamTable 和一个 TableProvider),我们可以开始使用它们了。在构建了 SQL CLI 之后,我们现在可以在表上执行选择操作

0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . >   sequence BIGINT COMMENT 'this is the primary key',
. . . . . >   event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)

让我们选择几行

0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
|      sequence       | event_time |
+---------------------+------------+
| 0                   | 2019-05-21 00:36:33 |
| 1                   | 2019-05-21 00:36:33 |
| 2                   | 2019-05-21 00:36:33 |
| 3                   | 2019-05-21 00:36:33 |
| 4                   | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)

现在让我们尝试一些更有趣的事情。例如,分组。这也将让我们确保我们正在为每一行正确地提供时间戳

0: BeamSQL> SELECT
. . . . . >   COUNT(sequence) as elements,
. . . . . >   TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
|      elements       | window_start |
+---------------------+--------------+
| 6                   | 2019-06-05 00:39:24 |
| 10                  | 2019-06-05 00:39:26 |
| 10                  | 2019-06-05 00:39:28 |
| 10                  | 2019-06-05 00:39:30 |
| 10                  | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)

瞧!我们可以开始对我们的序列生成器进行一些有趣的流式查询。