文件处理模式

此页面描述了常见的文件处理任务。有关基于文件的 I/O 的更多信息,请参见 管道 I/O基于文件的数据输入和输出

处理到达的文件

本节将向您展示如何处理文件系统或对象存储(如 Google Cloud Storage)中到达的文件。您可以连续读取文件,或者在文件到达时触发流和处理管道。

连续读取模式

您可以使用 FileIOTextIO 连续读取源以查找新文件。

使用 FileIO 类连续监视单个文件模式。以下示例每 30 秒重复匹配一次文件模式,持续返回新匹配的文件作为无界 PCollection<Metadata>,如果一小时内没有新文件出现,则停止

// This produces PCollection<MatchResult.Metadata>
p.apply(
    FileIO.match()
        .filepattern("...")
        .continuously(
            Duration.standardSeconds(30),
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

TextIOwatchForNewFiles 属性会流式传输新文件匹配项。

// This produces PCollection<String>
p.apply(
    TextIO.read()
        .from("<path-to-files>/*")
        .watchForNewFiles(
            // Check for new files every minute.
            Duration.standardMinutes(1),
            // Stop watching the file pattern if no new files appear for an hour.
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

某些运行器可能会在更新期间保留文件列表,但文件列表在您重新启动管道时不会持久化。您可以通过以下方式保存文件列表:

Python 不提供连续读取选项。

由外部源触发的流处理

流式管道可以处理来自无界源的数据。例如,要使用 Google Cloud Pub/Sub 触发流处理,请:

  1. 使用外部进程检测新文件何时到达。
  2. 使用包含文件 URI 的 Google Cloud Pub/Sub 消息。
  3. DoFn 访问 URI,该 DoFn 位于 Google Cloud Pub/Sub 源之后。
  4. 处理文件。

由外部源触发的批处理

要在一个文件到达时启动或调度批处理管道作业,请将触发事件写入源文件本身。这将产生最大的延迟,因为管道必须在处理之前初始化。它最适合低频、大型、文件大小更新。

访问文件名

使用 FileIO 类在管道作业中读取文件名。FileIO 返回一个 PCollection<ReadableFile> 对象,ReadableFile 实例包含文件名。

要访问文件名,请:

  1. 使用 FileIO 创建一个 ReadableFile 实例。FileIO 返回一个 PCollection<ReadableFile> 对象。ReadableFile 类包含文件名。
  2. 调用 readFullyAsUTF8String() 方法将文件读取到内存中,并返回文件名作为 String 对象。如果内存有限,可以使用诸如 FileSystems 之类的实用程序类直接操作文件。

要在管道作业中读取文件名,请:

  1. 收集文件 URI 列表。您可以使用 FileSystems 模块获取与 glob 模式匹配的文件列表。
  2. 将文件 URI 传递给 PCollection

p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
    // The withCompression method is optional. By default, the Beam SDK detects compression from
    // the filename.
    .apply(FileIO.readMatches().withCompression(Compression.GZIP))
    .apply(
        ParDo.of(
            new DoFn<FileIO.ReadableFile, String>() {
              @ProcessElement
              public void process(@Element FileIO.ReadableFile file) {
                // We can now access the file and its metadata.
                LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
              }
            }));
with beam.Pipeline() as pipeline:
  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))