Beam SQL 扩展:CREATE EXTERNAL TABLE

Beam SQL 的 CREATE EXTERNAL TABLE 语句注册一个映射到 外部存储系统 的虚拟表。对于某些存储系统,CREATE EXTERNAL TABLE 不会在写入发生之前创建物理表。物理表存在后,您可以使用 SELECTJOININSERT INTO 语句访问该表。

CREATE EXTERNAL TABLE 语句包含模式和扩展子句。

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE type [LOCATION location] [TBLPROPERTIES tblProperties] simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*> tableElement: columnName fieldType [ NOT NULL ]

BigQuery

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE bigquery LOCATION '[PROJECT_ID]:[DATASET].[TABLE]' TBLPROPERTIES '{"method": "DIRECT_READ"}'

读取模式

Beam SQL 支持读取具有简单类型(simpleType)和简单类型数组(ARRAY<simpleType>)的列。

使用 EXPORT 方法读取时,应设置以下管道选项

使用 DIRECT_READ 方法读取时,优化器将尝试执行项目和谓词下推,这可能会减少从 BigQuery 读取数据所需的时间。

有关 BigQuery 存储 API 的更多信息,请参见 此处

写入模式

如果该表不存在,则当写入第一条记录时,Beam 会创建 location 中指定的表。如果该表已存在,则指定的列必须与现有表匹配。

模式

与模式相关的错误会导致管道崩溃。不支持 Map 类型。Beam SQL 类型映射到 BigQuery 标准 SQL 类型,如下所示

Beam SQL 类型BigQuery 标准 SQL 类型
TINYINT、SMALLINT、INTEGER、BIGINT  INT64
FLOAT、DOUBLE、DECIMALFLOAT64
BOOLEANBOOL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
CHAR、VARCHARSTRING
MAP(不支持)
ARRAYARRAY
ROWSTRUCT

示例

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR) TYPE bigquery LOCATION 'testing-integration:apache.users'

Cloud Bigtable

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( key VARCHAR NOT NULL, family ROW<qualifier cells [, qualifier cells ]* > [, family ROW< qualifier cells [, qualifier cells ]* > ]* ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'

具有扁平模式的备用语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( key VARCHAR NOT NULL, qualifier SIMPLE_TYPE [, qualifier SIMPLE_TYPE ]* ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]' TBLPROPERTIES '{ "columnsMapping": "family:qualifier[,family:qualifier]*" }'

读取模式

Beam SQL 支持读取具有强制性 key 字段的行,至少一个 family 具有至少一个 qualifier。单元格表示为简单类型 (SIMPLE_TYPE) 或具有强制性 val 字段、可选 timestampMicros 和可选 labels 的 ROW 类型。两者都读取列中的最新单元格。指定为简单类型数组 (ARRAY<simpleType>) 的单元格允许读取列的所有值。

对于扁平模式,仅允许使用 SIMPLE_TYPE 值。除了 key 之外的每个字段都必须与 columnsMapping 中指定的键值对相对应。

不需要提供所有现有的列族和限定符到模式中。

过滤器仅允许使用具有单个 LIKE 语句的 key 字段,该语句使用 RE2 语法 正则表达式,例如 SELECT * FROM table WHERE key LIKE '^key[012]{1}'

写入模式

仅支持扁平模式。

示例

CREATE EXTERNAL TABLE beamTable( key VARCHAR NOT NULL, beamFamily ROW< boolLatest BOOLEAN NOT NULL, longLatestWithTs ROW< val BIGINT NOT NULL, timestampMicros BIGINT NOT NULL > NOT NULL, allStrings ARRAY<VARCHAR> NOT NULL, doubleLatestWithTsAndLabels ROW< val DOUBLE NOT NULL, timestampMicros BIGINT NOT NULL, labels ARRAY<VARCHAR> NOT NULL > NOT NULL, binaryLatestWithLabels ROW< val BINARY NOT NULL, labels ARRAY<VARCHAR> NOT NULL > NOT NULL > NOT NULL ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'

扁平模式示例

CREATE EXTERNAL TABLE flatTable( key VARCHAR NOT NULL, boolColumn BOOLEAN NOT NULL, longColumn BIGINT NOT NULL, stringColumn VARCHAR NOT NULL, doubleColumn DOUBLE NOT NULL, binaryColumn BINARY NOT NULL ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable' TBLPROPERTIES '{ "columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn" }'

写入示例

INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn) VALUES ('key', TRUE, 10, 'stringValue', 5.5)

Pub/Sub

语法

嵌套模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName( event_timestamp TIMESTAMP, attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>], payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE pubsub LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

扁平化模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*) TYPE pubsub LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

在嵌套模式下,以下字段保存主题元数据。attributes 字段的存在会触发嵌套模式的使用。

读取模式

PubsubIO 支持通过创建新订阅从主题读取。

写入模式

PubsubIO 支持写入主题。

模式

Pub/Sub 消息包含与其关联的元数据,您可以在查询中引用此元数据。对于每条消息,除了有效负载(在一般情况下是非结构化的)之外,Pub/Sub 会公开其发布时间和用户提供的属性映射。此信息必须保留并可从 SQL 语句中访问。目前,这意味着 PubsubIO 表需要您声明一组特殊的列,如下所示。

支持的有效负载

示例

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>) TYPE pubsub LOCATION 'projects/testing-integration/topics/user-location'

Pub/Sub Lite

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName( publish_timestamp DATETIME, event_timestamp DATETIME, message_key BYTES, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE pubsublite // For writing LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]' // For reading LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'

读取模式

PubsubLiteIO 支持从订阅中读取。

写入模式

PubsubLiteIO 支持写入主题。

支持的有效负载

示例

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>) TYPE pubsublite LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'

Kafka

KafkaIO 在 Beam SQL 中处于实验阶段。

语法

扁平化模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE kafka LOCATION 'my.company.url.com:2181/topic1' TBLPROPERTIES '{ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"], "topics": ["topic2", "topic3"], "format": "json" }'

嵌套模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( event_timestamp DATETIME, message_key BYTES, headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE kafka LOCATION 'my.company.url.com:2181/topic1' TBLPROPERTIES '{ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"], "topics": ["topic2", "topic3"], "format": "json" }'

headers 字段的存在会触发嵌套模式的使用。

读取模式

读取模式支持从主题读取。

写入模式

写入模式支持写入主题。

支持的格式

模式

对于 CSV,仅支持简单类型。

MongoDB

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE mongodb LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'

读取模式

读取模式支持从集合读取。

写入模式

写入模式支持写入集合。

模式

仅支持简单类型。MongoDB 文档通过 JsonToRow 变换映射到 Beam SQL 类型。

示例

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR) TYPE mongodb LOCATION 'mongodb://127.0.0.1:27017/apache/users'

文本

TextIO 在 Beam SQL 中处于实验阶段。读取模式和写入模式目前不访问相同的基础数据。

语法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE text LOCATION '/home/admin/orders' TBLPROPERTIES '{"format: "Excel"}'
format 的值字段分隔符引号记录分隔符忽略空行?允许缺少列名?
默认,"\r\n
rfc4180,"\r\n
excel,"\r\n
tdf\t"\r\n
mysql\t\n

读取模式

读取模式支持从文件读取。

写入模式

写入模式支持写入一组文件。TextIO 在写入时创建文件。

支持的有效负载

模式

仅支持简单类型。

示例

CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER) TYPE text LOCATION '/home/admin/orders'

通用有效负载处理

某些数据源和接收器支持通用有效负载处理。此处理会将字节数组有效负载字段解析为表架构。此处理支持以下架构。所有架构至少都需要设置 "format": "<type>",并且可能还需要其他属性。

通用 DLQ 处理

支持通用 DLQ 处理的源和接收器会指定格式为 "<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]" 的参数。支持以下类型的 DLQ 处理