Beam SQL Shell
概述
从版本 2.6.0 开始,Beam SQL 包含一个交互式 shell,称为 Beam SQL shell。该 shell 允许您将管道编写为 SQL 查询,而无需使用 Java SDK。默认情况下,Beam 使用 DirectRunner
将查询作为 Beam 管道执行。
本页介绍如何使用 shell,但不会关注 Beam SQL 的特定功能。有关本页示例中使用功能的更详细概述,请参阅 Beam SQL 文档 中的相关部分。
快速入门
要使用 Beam SQL shell,您必须先克隆 Beam SDK 存储库。然后,从存储库克隆的根目录执行以下命令以运行 shell
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
./sdks/java/extensions/sql/shell/build/install/shell/bin/shell
运行命令后,SQL shell 将启动,您可以键入查询
Welcome to Beam SQL 2.6.0-SNAPSHOT (based on sqlline version 1.4.0)
0: BeamSQL>
注意:如果您在运行 Gradle 命令之前没有构建项目,则该命令将需要几分钟时间,因为 Gradle 必须首先构建所有依赖项。
shell 将查询转换为 Beam 管道,使用 DirectRunner
运行它们,并在管道完成后将结果作为表返回
0: BeamSQL> SELECT 'foo' AS NAME, 'bar' AS TYPE, 'num' AS NUMBER;
+------+------+--------+
| NAME | TYPE | NUMBER |
+------+------+--------+
| foo | bar | num |
+------+------+--------+
1 row selected (0.826 seconds)
声明表
在从源读取数据或将数据写入目标之前,您必须使用 CREATE EXTERNAL TABLE
语句声明一个虚拟表。例如,如果您在当前文件夹中有一个本地 CSV 文件 "test-file.csv"
,则可以使用以下语句创建一个表
0: BeamSQL> CREATE EXTERNAL TABLE csv_file (field1 VARCHAR, field2 INTEGER) TYPE text LOCATION 'test-file.csv';
No rows affected (0.042 seconds)
CREATE EXTERNAL TABLE
语句将 CSV 文件注册为 Beam SQL 中的表,并指定表的模式。该语句不会直接创建持久物理表;它只是向 Beam SQL 描述源/接收器,以便您可以在读取数据和写入数据的查询中使用该表。
有关 CREATE EXTERNAL TABLE
语法和支持的表类型的更多信息,请参阅 CREATE EXTERNAL TABLE 参考页面。
读取和写入数据
要从上一节中声明的本地 CSV 文件中读取数据,请执行以下查询
0: BeamSQL> SELECT field1 AS field FROM csv_file;
+--------+
| field |
+--------+
| baz |
| foo |
| bar |
| bar |
| foo |
+--------+
有关 SELECT
语法的更多信息,请参阅 查询语法页面。
要将数据写入 CSV 文件,请使用 INSERT INTO … SELECT ...
语句
0: BeamSQL> INSERT INTO csv_file SELECT 'foo', 'bar';
读取和写入行为取决于表的类型。例如
text
表类型使用TextIO
实现,因此写入text
表可能会生成多个编号文件。pubsub
表类型是无界源,因此从pubsub
表读取永远不会完成。
使用无界源进行开发
当您想在开发过程中检查来自无界源的数据时,您必须在 SELECT
语句的末尾指定 LIMIT x
子句,以将输出限制为 x
个记录。否则,管道将永远不会完成。
0: BeamSQL> SELECT field1 FROM unbounded_source LIMIT 10 ;
到目前为止显示的示例查询是快速查询,它们在本地执行。这些查询在您调查数据并迭代地设计管道时非常有用。理想情况下,您希望查询快速完成并在完成时返回输出。
当您对 SQL 语句的逻辑感到满意时,您可以通过删除 LIMIT x
语句将语句作为长期运行的作业提交。然后,如果其中一个表代表无界源,则管道可能会永远运行。
指定运行器
默认情况下,Beam 使用 DirectRunner
在您执行命令的机器上运行管道。如果您想使用不同的运行器运行管道,则必须执行两个步骤
确保 SQL shell 包含所需的运行器。将相应的项目 ID 添加到 Gradle 调用的
-Pbeam.sql.shell.bundled
参数中 (源代码,项目 ID)。例如,使用以下命令包含 Flink 运行器和 KafkaIO./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
注意:您可以以相同的方式捆绑多个运行器(使用逗号分隔的列表)或其他附加组件。例如,您可以添加对更多 I/O 的支持。
然后,使用
SET
命令 (参考页面) 指定运行器0: BeamSQL> SET runner='FlinkRunner';
Beam 将所有将来的 INSERT
语句作为管道提交到指定的运行器。在这种情况下,Beam SQL shell 不会显示查询结果。您必须通过相应运行器的 UI 管理提交的作业(例如,使用 Flink UI 或命令行)。
指定 PipelineOptions
要配置运行器,您必须使用 SET
命令 (详细信息) 指定 PipelineOptions
0: BeamSQL> SET projectId='gcpProjectId';
0: BeamSQL> SET tempLocation='/tmp/tempDir';
打包 SQL Shell
您还可以使用 distZip
或 distTar
任务构建您自己的 SQL shell 独立包。例如
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip
ls ./sdks/java/extensions/sql/shell/build/distributions/
beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip