Beam SQL 扩展:CREATE EXTERNAL TABLE
Beam SQL 的 CREATE EXTERNAL TABLE
语句注册一个映射到 外部存储系统 的虚拟表。对于某些存储系统,CREATE EXTERNAL TABLE
不会在写入发生之前创建物理表。物理表存在后,您可以使用 SELECT
、JOIN
和 INSERT INTO
语句访问该表。
CREATE EXTERNAL TABLE
语句包含模式和扩展子句。
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE type [LOCATION location] [TBLPROPERTIES tblProperties] simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*> tableElement: columnName fieldType [ NOT NULL ]
IF NOT EXISTS
:可选。如果该表已注册,Beam SQL 会忽略该语句,而不是返回错误。tableName
:要创建和注册的表的区分大小写的名称,指定为 标识符。表名不需要与底层数据存储系统中的名称匹配。tableElement
:columnName
fieldType
[ NOT NULL ]
columnName
:列的区分大小写的名称,指定为反引号引用的表达式。fieldType
:字段的类型,指定为以下类型之一simpleType
:TINYINT
、SMALLINT
、INTEGER
、BIGINT
、FLOAT
、DOUBLE
、DECIMAL
、BOOLEAN
、DATE
、TIME
、TIMESTAMP
、CHAR
、VARCHAR
MAP<simpleType, fieldType>
ARRAY<fieldType>
ROW<tableElement [, tableElement ]*>
NOT NULL
:可选。表示该列不可为空。
type
:支持虚拟表的 I/O 变换,指定为 标识符,其值为以下值之一bigquery
bigtable
pubsub
kafka
text
location
:底层表的 I/O 特定位置,指定为 字符串文字。有关location
格式要求,请参阅 I/O 特定部分。tblProperties
:包含额外配置的 I/O 特定引号引用的键值 JSON 对象,指定为 字符串文字。有关tblProperties
格式要求,请参阅 I/O 特定部分。
BigQuery
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE bigquery LOCATION '[PROJECT_ID]:[DATASET].[TABLE]' TBLPROPERTIES '{"method": "DIRECT_READ"}'
LOCATION
:BigQuery CLI 格式中表的路径。PROJECT_ID
:Google Cloud 项目的 ID。DATASET
:BigQuery 数据集 ID。TABLE
:BigQuery 数据集中表的 ID。
TBLPROPERTIES
:method
:可选。要使用的读取方法。提供以下选项DIRECT_READ
:使用 BigQuery 存储 API。EXPORT
:将数据导出到 Google Cloud Storage 中的 Avro 格式,并从该位置读取数据文件。- Beam 2.21+ 的默认值为
DIRECT_READ
(较早版本使用EXPORT
)。
读取模式
Beam SQL 支持读取具有简单类型(simpleType
)和简单类型数组(ARRAY<simpleType>
)的列。
使用 EXPORT
方法读取时,应设置以下管道选项
project
:Google Cloud 项目的 ID。tempLocation
:存储中间数据的存储桶。例如:gs://temp-storage/temp
。
使用 DIRECT_READ
方法读取时,优化器将尝试执行项目和谓词下推,这可能会减少从 BigQuery 读取数据所需的时间。
有关 BigQuery 存储 API 的更多信息,请参见 此处。
写入模式
如果该表不存在,则当写入第一条记录时,Beam 会创建 location
中指定的表。如果该表已存在,则指定的列必须与现有表匹配。
模式
与模式相关的错误会导致管道崩溃。不支持 Map 类型。Beam SQL 类型映射到 BigQuery 标准 SQL 类型,如下所示
Beam SQL 类型 | BigQuery 标准 SQL 类型 |
TINYINT、SMALLINT、INTEGER、BIGINT | INT64 |
FLOAT、DOUBLE、DECIMAL | FLOAT64 |
BOOLEAN | BOOL |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
CHAR、VARCHAR | STRING |
MAP | (不支持) |
ARRAY | ARRAY |
ROW | STRUCT |
示例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR) TYPE bigquery LOCATION 'testing-integration:apache.users'
Cloud Bigtable
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( key VARCHAR NOT NULL, family ROW<qualifier cells [, qualifier cells ]* > [, family ROW< qualifier cells [, qualifier cells ]* > ]* ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
key
:Bigtable 行的键family
:列族名称qualifier
:列限定符cells
:每个值TYPE
ARRAY<SIMPLE_TYPE>
LOCATION
:PROJECT_ID
:Google Cloud 项目的 ID。INSTANCE_ID
:Bigtable 实例 ID。TABLE
:Bigtable 表 ID。
TYPE
:SIMPLE_TYPE
或CELL_ROW
CELL_ROW
:ROW<val SIMPLE_TYPE [, timestampMicros BIGINT [NOT NULL]] [, labels ARRAY<VARCHAR> [NOT NULL]]
SIMPLE_TYPE
:以下之一BINARY
VARCHAR
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
FLOAT
BOOLEAN
TIMESTAMP
具有扁平模式的备用语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( key VARCHAR NOT NULL, qualifier SIMPLE_TYPE [, qualifier SIMPLE_TYPE ]* ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]' TBLPROPERTIES '{ "columnsMapping": "family:qualifier[,family:qualifier]*" }'
key
:Bigtable 行的键family
:列族名称qualifier
:列限定符LOCATION
:PROJECT_ID
:Google Cloud 项目的 ID。INSTANCE_ID
:Bigtable 实例 ID。TABLE
:Bigtable 表 ID。
TBLPROPERTIES
:包含columnsMapping
键的 JSON 对象,其中包含以冒号分隔的逗号分隔的键值对SIMPLE_TYPE
:与前一种语法相同
读取模式
Beam SQL 支持读取具有强制性 key
字段的行,至少一个 family
具有至少一个 qualifier
。单元格表示为简单类型 (SIMPLE_TYPE
) 或具有强制性 val
字段、可选 timestampMicros
和可选 labels
的 ROW 类型。两者都读取列中的最新单元格。指定为简单类型数组 (ARRAY<simpleType>
) 的单元格允许读取列的所有值。
对于扁平模式,仅允许使用 SIMPLE_TYPE
值。除了 key
之外的每个字段都必须与 columnsMapping
中指定的键值对相对应。
不需要提供所有现有的列族和限定符到模式中。
过滤器仅允许使用具有单个 LIKE
语句的 key
字段,该语句使用 RE2 语法 正则表达式,例如 SELECT * FROM table WHERE key LIKE '^key[012]{1}'
写入模式
仅支持扁平模式。
示例
CREATE EXTERNAL TABLE beamTable( key VARCHAR NOT NULL, beamFamily ROW< boolLatest BOOLEAN NOT NULL, longLatestWithTs ROW< val BIGINT NOT NULL, timestampMicros BIGINT NOT NULL > NOT NULL, allStrings ARRAY<VARCHAR> NOT NULL, doubleLatestWithTsAndLabels ROW< val DOUBLE NOT NULL, timestampMicros BIGINT NOT NULL, labels ARRAY<VARCHAR> NOT NULL > NOT NULL, binaryLatestWithLabels ROW< val BINARY NOT NULL, labels ARRAY<VARCHAR> NOT NULL > NOT NULL > NOT NULL ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'
扁平模式示例
CREATE EXTERNAL TABLE flatTable( key VARCHAR NOT NULL, boolColumn BOOLEAN NOT NULL, longColumn BIGINT NOT NULL, stringColumn VARCHAR NOT NULL, doubleColumn DOUBLE NOT NULL, binaryColumn BINARY NOT NULL ) TYPE bigtable LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable' TBLPROPERTIES '{ "columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn" }'
写入示例
INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn) VALUES ('key', TRUE, 10, 'stringValue', 5.5)
Pub/Sub
语法
嵌套模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName( event_timestamp TIMESTAMP, attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>], payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE pubsub LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
扁平化模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*) TYPE pubsub LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
在嵌套模式下,以下字段保存主题元数据。attributes
字段的存在会触发嵌套模式的使用。
event_timestamp
:与 Pub/Sub 消息关联的 PubsubIO 事件时间戳。它可以是以下之一- 消息发布时间,由 Pub/Sub 提供。如果未提供任何额外的配置,则这是默认值。
- 用户提供的消息属性中指定的时间戳。属性键由
tblProperties
blob 的timestampAttributeKey
字段配置。属性的值应符合 PubsubIO 的要求,即自 Unix 纪元以来的毫秒数或 RFC 339 日期字符串。
attributes
:来自 Pub/Sub 消息的用户提供属性映射;payload
:Pub/Sub 消息有效负载的模式。如果记录无法反序列化,则该记录将写入tblProperties
blob 的deadLeaderQueue
字段中指定的主题。如果在这种情况下没有指定死信队列,则会抛出异常,管道将崩溃。LOCATION
:PROJECT
:Google Cloud 项目的 IDTOPIC
:Pub/Sub 主题名称。订阅将自动创建,但不会自动清理。不支持指定现有订阅。
TBLPROPERTIES
:timestampAttributeKey
:可选。包含与 Pub/Sub 消息关联的事件时间戳的键。如果未指定,则消息发布时间将用作窗口化/水印的事件时间戳。deadLetterQueue
:如果有效负载未解析,则将消息写入其中的主题。如果未指定,则会因解析失败而抛出异常。format
:可选。允许您指定 Pubsub 有效负载格式。
读取模式
PubsubIO 支持通过创建新订阅从主题读取。
写入模式
PubsubIO 支持写入主题。
模式
Pub/Sub 消息包含与其关联的元数据,您可以在查询中引用此元数据。对于每条消息,除了有效负载(在一般情况下是非结构化的)之外,Pub/Sub 会公开其发布时间和用户提供的属性映射。此信息必须保留并可从 SQL 语句中访问。目前,这意味着 PubsubIO 表需要您声明一组特殊的列,如下所示。
支持的有效负载
- Pub/Sub 支持 通用有效负载处理。
示例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>) TYPE pubsub LOCATION 'projects/testing-integration/topics/user-location'
Pub/Sub Lite
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName( publish_timestamp DATETIME, event_timestamp DATETIME, message_key BYTES, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE pubsublite // For writing LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]' // For reading LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
LOCATION
:PROJECT
:Google Cloud 项目的 IDTOPIC
: Pub/Sub Lite 主题名称。SUBSCRIPTION
: Pub/Sub Lite 订阅名称。GCP-LOCATION
: 此 Pub/Sub Lite 主题或订阅的位置。
TBLPROPERTIES
:timestampAttributeKey
:可选。包含与 Pub/Sub 消息关联的事件时间戳的键。如果未指定,则消息发布时间将用作窗口化/水印的事件时间戳。deadLetterQueue
: 可选,支持 通用 DLQ 处理format
: 可选。允许您指定有效负载格式。
读取模式
PubsubLiteIO 支持从订阅中读取。
写入模式
PubsubLiteIO 支持写入主题。
支持的有效负载
- Pub/Sub Lite 支持 通用有效负载处理。
示例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>) TYPE pubsublite LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
Kafka
KafkaIO 在 Beam SQL 中处于实验阶段。
语法
扁平化模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE kafka LOCATION 'my.company.url.com:2181/topic1' TBLPROPERTIES '{ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"], "topics": ["topic2", "topic3"], "format": "json" }'
嵌套模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName ( event_timestamp DATETIME, message_key BYTES, headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload [BYTES, ROW<tableElement [, tableElement ]*>] ) TYPE kafka LOCATION 'my.company.url.com:2181/topic1' TBLPROPERTIES '{ "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"], "topics": ["topic2", "topic3"], "format": "json" }'
headers
字段的存在会触发嵌套模式的使用。
LOCATION
: 包含要使用的初始引导程序代理的 URL,以及作为路径提供的初始主题名称。TBLPROPERTIES
:bootstrap_servers
: 可选。允许您指定其他引导程序服务器,这些服务器将与LOCATION
中的服务器一起使用。topics
: 可选。允许您指定其他主题,这些主题将与LOCATION
中的主题一起使用。format
: 可选。允许您指定 Kafka 值格式。可能的值包括 {csv
,avro
,json
,proto
,thrift
}。在扁平模式下默认为csv
,在嵌套模式下默认为json
。csv
不支持嵌套模式。
读取模式
读取模式支持从主题读取。
写入模式
写入模式支持写入主题。
支持的格式
- CSV(默认)
- Beam 会解析消息,并尝试根据架构中指定的类型解析字段。
- Kafka 支持所有 通用有效负载处理 格式。
模式
对于 CSV,仅支持简单类型。
MongoDB
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE mongodb LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'
LOCATION
: 集合的位置。HOST
: MongoDB 服务器的位置。可以是 localhost 或 IP 地址。如果需要身份验证,则可以按如下方式指定用户名和密码:username:password@localhost
。PORT
: MongoDB 服务器监听的端口。DATABASE
: 要连接的数据库。COLLECTION
: 数据库中的集合。
读取模式
读取模式支持从集合读取。
写入模式
写入模式支持写入集合。
模式
仅支持简单类型。MongoDB 文档通过 JsonToRow
变换映射到 Beam SQL 类型。
示例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR) TYPE mongodb LOCATION 'mongodb://127.0.0.1:27017/apache/users'
文本
TextIO 在 Beam SQL 中处于实验阶段。读取模式和写入模式目前不访问相同的基础数据。
语法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) TYPE text LOCATION '/home/admin/orders' TBLPROPERTIES '{"format: "Excel"}'
LOCATION
: 读取模式的文件路径。写入模式的前缀。TBLPROPERTIES
:format
: 可选。允许您指定 CSV 格式,该格式控制字段分隔符、引号字符、记录分隔符和其他属性。请参见下表
format 的值 | 字段分隔符 | 引号 | 记录分隔符 | 忽略空行? | 允许缺少列名? |
---|---|---|---|---|---|
默认 | , | " | \r\n | 是 | 否 |
rfc4180 | , | " | \r\n | 否 | 否 |
excel | , | " | \r\n | 否 | 是 |
tdf | \t | " | \r\n | 是 | 否 |
mysql | \t | 无 | \n | 否 | 否 |
读取模式
读取模式支持从文件读取。
写入模式
写入模式支持写入一组文件。TextIO 在写入时创建文件。
支持的有效负载
- CSV
- Beam 会解析消息,并尝试使用 org.apache.commons.csv 根据架构中指定的类型解析字段。
模式
仅支持简单类型。
示例
CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER) TYPE text LOCATION '/home/admin/orders'
通用有效负载处理
某些数据源和接收器支持通用有效负载处理。此处理会将字节数组有效负载字段解析为表架构。此处理支持以下架构。所有架构至少都需要设置 "format": "<type>"
,并且可能还需要其他属性。
avro
: Avro- Avro 架构会从指定的字段类型自动生成。它用于解析传入消息并格式化传出消息。
json
: JSON 对象- Beam 会尝试将字节数组解析为 UTF-8 JSON 以匹配架构。
proto
: Protocol Buffers- Beam 会找到等效的 Protocol Buffer 类并使用它解析有效负载
protoClass
: 必需。要使用的 proto 类名。必须内置到已部署的 JAR 中。- 架构中的字段必须与给定
protoClass
的字段匹配。
thrift
: Thrift- 架构中的字段必须与给定
thriftClass
的字段匹配。 thriftClass
: 必需。允许您指定完整的 thrift java 类名。必须内置到已部署的 JAR 中。thriftProtocolFactoryClass
: 必需。允许您指定要用于 thrift 序列化的TProtocolFactory
的完整类名。必须内置到已部署的 JAR 中。- 用于 thrift 序列化的
TProtocolFactory
必须与提供的thriftProtocolFactoryClass
匹配。
- 架构中的字段必须与给定
通用 DLQ 处理
支持通用 DLQ 处理的源和接收器会指定格式为 "<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"
的参数。支持以下类型的 DLQ 处理
bigquery
: BigQuery- DLQ_ID 是具有“error”字符串字段和“payload”字节数组字段的输出表的表规范。
pubsub
: Pub/Sub 主题- DLQ_ID 是 Pub/Sub 主题的完整路径。
pubsublite
: Pub/Sub Lite 主题- DLQ_ID 是 Pub/Sub Lite 主题的完整路径。