文件处理模式
此页面描述了常见的文件处理任务。有关基于文件的 I/O 的更多信息,请参见 管道 I/O 和 基于文件的数据输入和输出。
- Java SDK
- Python SDK
处理到达的文件
本节将向您展示如何处理文件系统或对象存储(如 Google Cloud Storage)中到达的文件。您可以连续读取文件,或者在文件到达时触发流和处理管道。
连续读取模式
您可以使用 FileIO
或 TextIO
连续读取源以查找新文件。
使用 FileIO
类连续监视单个文件模式。以下示例每 30 秒重复匹配一次文件模式,持续返回新匹配的文件作为无界 PCollection<Metadata>
,如果一小时内没有新文件出现,则停止
TextIO
类 watchForNewFiles
属性会流式传输新文件匹配项。
// This produces PCollection<String>
p.apply(
TextIO.read()
.from("<path-to-files>/*")
.watchForNewFiles(
// Check for new files every minute.
Duration.standardMinutes(1),
// Stop watching the file pattern if no new files appear for an hour.
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
某些运行器可能会在更新期间保留文件列表,但文件列表在您重新启动管道时不会持久化。您可以通过以下方式保存文件列表:
- 将已处理的文件名存储在外部文件中,并在下一个转换中对列表进行去重
- 向文件名添加时间戳,写入一个 glob 模式以仅提取新文件,并在管道重新启动时匹配该模式
Python 不提供连续读取选项。
由外部源触发的流处理
流式管道可以处理来自无界源的数据。例如,要使用 Google Cloud Pub/Sub 触发流处理,请:
- 使用外部进程检测新文件何时到达。
- 使用包含文件 URI 的 Google Cloud Pub/Sub 消息。
- 从
DoFn
访问 URI,该DoFn
位于 Google Cloud Pub/Sub 源之后。 - 处理文件。
由外部源触发的批处理
要在一个文件到达时启动或调度批处理管道作业,请将触发事件写入源文件本身。这将产生最大的延迟,因为管道必须在处理之前初始化。它最适合低频、大型、文件大小更新。
访问文件名
使用 FileIO
类在管道作业中读取文件名。FileIO
返回一个 PCollection<ReadableFile>
对象,ReadableFile
实例包含文件名。
要访问文件名,请:
- 使用
FileIO
创建一个ReadableFile
实例。FileIO
返回一个PCollection<ReadableFile>
对象。ReadableFile
类包含文件名。 - 调用
readFullyAsUTF8String()
方法将文件读取到内存中,并返回文件名作为String
对象。如果内存有限,可以使用诸如FileSystems
之类的实用程序类直接操作文件。
要在管道作业中读取文件名,请:
- 收集文件 URI 列表。您可以使用
FileSystems
模块获取与 glob 模式匹配的文件列表。 - 将文件 URI 传递给
PCollection
。
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// The withCompression method is optional. By default, the Beam SDK detects compression from
// the filename.
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We can now access the file and its metadata.
LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
}
}));
最后更新于 2024/10/31
您是否找到了您要查找的所有内容?
所有内容是否有用且清晰?您想更改任何内容吗?请告诉我们!