使用 Beam 和 Flink 构建可扩展的自管理流式基础设施

在本系列博客文章中,Talat Uyarer(架构师/高级首席工程师)Rishabh Kedia(首席工程师)David He(工程总监) 描述了我们如何使用 Apache Beam 和 Flink 构建自管理流式平台。在本系列文章的这一部分中,我们描述了我们为什么要以及如何通过从云管理流式服务迁移到 Flink 来构建一个大型可扩展的自管理流式基础设施和服务。我们还概述了操作可扩展性和可观察性、性能以及成本效益方面的经验教训。我们总结了在我们旅程中发现的有用技术。

使用 Flink 构建可扩展的自管理流式基础设施 - 第 1 部分

介绍

Palo Alto Networks (PANW) 是网络安全领域的领导者,为客户提供产品、服务和解决方案。数据是我们产品和服务的核心。我们在数据湖中流式传输和存储艾字节级的数据,具有近实时摄取、数据转换、数据插入到数据存储以及将数据转发到我们的内部基于 ML 的系统和外部 SIEM 的功能。我们支持每个组件中的多租户,以便我们可以隔离租户并提供最佳性能和 SLA。流式处理在管道中起着至关重要的作用。

在本系列文章的第二部分中,我们将更详细地描述流式基础设施的核心构建块,例如自动缩放器。我们还将提供更多关于我们定制的信息,这些定制使我们能够构建一个高性能的大型流式系统。最后,我们将解释我们如何解决具有挑战性的问题。

自管理流式基础设施的重要性

我们在 Google Cloud 上构建了一个大型数据平台。我们使用 Dataflow 作为管理流式服务。使用 Dataflow,我们使用流式引擎运行我们的应用程序,该应用程序使用 Apache Beam 和可观察性工具,例如 Cloud Logging 和 Cloud Monitoring。有关更多详细信息,请参见 [1]。该系统可以处理每秒 1500 万个事件和每天 1 万亿个事件,每天的数据量为 4 PB。我们运行大约 30,000 个 Dataflow 作业。每个作业可以有一个或数百个工作器,具体取决于客户的事件吞吐量。

我们支持使用不同端点的各种应用程序:BigQuery 数据存储、基于 HTTPS 的外部 SIEM 或内部端点、基于 Syslog 的 SIEM 和 Google Cloud Storage 端点。我们的客户和产品依赖于此数据平台来处理网络安全态势和反应。我们的流式基础设施非常灵活,可以通过流式作业订阅添加、更新和删除用例。例如,客户希望将来自防火墙设备的日志事件摄取到数据湖中,在 Kafka 主题中进行缓冲。一个流式作业订阅提取和过滤数据,转换数据格式,并实时将数据插入到我们的 BigQuery 数据仓库端点。客户可以使用我们的可视化和仪表板产品查看此防火墙捕获的流量或线程。下图说明了事件生成器、用例订阅工作流程以及流式平台的关键组件。

Streaming service design

这种基于 Dataflow 的管理流式基础设施运行良好,但存在一些注意事项

  1. 成本很高,因为它是一项管理服务。对于在 Dataflow 应用程序中使用的相同资源(例如 vCPU 和内存),成本比使用运行相同 Beam 应用程序代码的 Flink 等开源流式引擎高得多。
  2. 很难实现我们的延迟和 SLA 目标,因为很难扩展功能,例如基于不同应用程序、端点或一个应用程序中不同参数的自动缩放。
  3. 管道只能在 Google Cloud 上运行。

PANW 的流式用例的独特性是另一个原因,我们使用自管理服务。我们支持多租户。租户(客户)可以以非常高的速度(> 100k 个请求/秒)或非常低的速度(< 100 个请求/秒)摄取数据。Dataflow 作业在 VM 上运行,而不是在 Kubernetes 上运行,需要最少 1 个 vCPU 内核。对于小型租户,这会浪费资源。我们的流式基础设施支持数千个作业,如果我们不必为一个作业使用一个内核,CPU 利用率会更高。对于我们来说,使用在 Kubernetes 上运行的流式引擎是自然的,这样我们就可以为小型租户分配最少的资源,例如,使用带有 ½ 或更少 vCPU 内核的 Google Kubernetes Engine (GKE) Pod。

为了解决已经陈述的问题并找到最有效的解决方案,我们评估了各种流式框架,包括 Apache Samza、Apache Flink 和 Apache Spark,与 Dataflow 相比较。

性能

  • 一个值得注意的因素是 Apache Flink 对 Kubernetes 的原生支持。与缺乏原生 Kubernetes 支持并需要 Apache Zookeeper 进行协调的 Samza 不同,Flink 与 Kubernetes 无缝集成。这种集成消除了不必要的复杂性。在性能方面,Samza 和 Flink 是势均力敌的竞争对手。
  • Apache Spark 虽然很流行,但在我们的测试中证明速度明显更慢。在 Beam 峰会上的一次演讲中,透露了 Apache Beam 的 Spark Runner 比原生 Apache Spark 慢了大约 10 倍 [3]。我们无法承受如此巨大的性能损失。用原生 Spark 重写我们的整个 Beam 代码库不是一个可行的选择,尤其是考虑到我们在过去四年中使用 Apache Beam 构建的庞大代码库。

社区

社区支持的稳健性在我们决策过程中发挥了关键作用。Dataflow 提供了出色的支持,但我们需要在我们选择的开源框架中获得保证。Apache Flink 生机勃勃的社区以及来自多家公司的积极贡献提供了一种无与伦比的信心水平。这种协作环境意味着错误识别和修复是持续进行的过程。事实上,在我们自己的旅程中,我们已经使用社区提供的许多 Flink 修复程序来修补我们的系统。

  • 我们通过合并 Flink 1.15 开源修复程序 FLINK-26063(我们使用的是 1.13)修复了 Google Cloud Storage 文件读取异常。
  • 我们修复了来自 FLINK-31963 的有状态作业工作器重启问题。

在我们自己的旅程中,我们还通过发现和修复开源代码中的错误为社区做出贡献。有关详细信息,请参见 FLINK-32700 以了解 Flink Kubernetes Operator。我们还为 Kubernetes 客户端创建了新的 GKE Auth 支持,并将其合并到 GitHub 上的 [4] 中。

集成

Apache Flink 与 Kubernetes 的无缝集成为我们提供了用于编排的灵活且可扩展的平台。Apache Flink 和 Kubernetes 之间的协同作用不仅优化了我们的数据处理工作流程,而且使我们的系统具有未来可扩展性。

架构和部署工作流程

在实时数据处理和分析领域,Apache Flink 以其强大的多功能框架而著称。与 Kubernetes(行业标准的容器编排系统)结合使用时,Flink 应用程序可以横向扩展并具有强大的管理功能。我们探索了一种尖端的架构,其中 Apache Flink 和 Kubernetes 由于 Apache Flink Kubernetes Operator 而无缝协同工作。

Flink Kubernetes Operator 位于其核心,充当控制平面,镜像管理 Flink 部署的人工操作员的知识和操作。与传统方法不同,Operator 自动化了关键活动,从启动和停止应用程序到处理升级和错误。其多功能功能集包括全自动作业生命周期管理、支持不同 Flink 版本以及多种部署模式,例如应用程序集群和会话作业。此外,Operator 的操作能力扩展到指标、日志记录,甚至使用作业自动缩放器进行动态缩放。

构建无缝的部署工作流程

想象一个强大的系统,其中 Flink 作业可以轻松地部署、勤勉地监控并主动地管理。我们的团队通过集成 Apache Flink、Apache Flink Kubernetes Operator 和 Kubernetes 创建了此工作流程。此设置的核心是我们定制构建的 Apache Flink Kubernetes Operator 客户端库。该库充当桥梁,使启动、停止、更新和取消 Flink 作业等原子操作成为可能。

Streaming service changes

部署流程

在我们的代码中,客户端提供 Apache Beam 管道选项,其中包含基本信息,例如 Kubernetes 集群的 API 端点、身份验证详细信息、用于上传 JAR 文件的 Google Cloud/S3 临时位置以及工作器类型规范。Kubernetes Operator 库使用此信息来协调无缝的部署流程。以下部分解释了采取的步骤。大多数核心步骤在我们代码库中是自动化的。

步骤 1

  • 客户端希望为客户和特定应用程序启动作业。

步骤 2

  • **生成唯一的作业 ID:**库生成一个唯一的作业 ID,该 ID 设置为 Kubernetes 标签。此标识符有助于跟踪和管理已部署的 Flink 作业。
  • **配置和代码上传:**库将所有必要的配置和用户代码上传到 Google Cloud Storage 或 Amazon S3 上的指定位置。此步骤确保 Flink 应用程序的资源可用于部署。
  • **YAML 有效负载生成:**上传过程完成后,库会构造一个 YAML 有效负载。此有效负载包含关键部署信息,包括基于指定工作器类型的资源设置。

我们使用了一种约定来命名我们的工作器 VM 实例类型。我们的约定类似于 Google Cloud 使用的命名约定。名称 n1-standard-1 指的是特定预定义的 VM 机器类型。让我们分解名称的每个组件的含义

  • **n1** 指示实例的 CPU 类型。在这种情况下,它指的是基于 N1 系列中实例的英特尔。Google Cloud 具有多代实例,具有不同的硬件和性能特征。
  • standard 代表机器类型系列。标准机器类型为任务管理器提供 1 个虚拟 CPU (vCPU) 和 4 GB 内存的平衡比例,以及为作业管理器提供 0.5 个 vCPU 和 2 GB 内存的平衡比例。
  • 1 代表实例中可用的 vCPU 数量。在 n1-standard-1 的情况下,表示该实例具有 1 个 vCPU。

步骤 3

  • 使用 Fabric8 调用 Kubernetes API:为了启动部署,库使用 Fabric8 与 Kubernetes API 交互。Fabric8 最初不支持 Google Kubernetes Engine 或 Amazon Elastic Kubernetes Service (EKS) 中的身份验证。为了解决此限制,我们的团队实现了必要的身份验证支持,可以在 GitHub PR [4] 上的合并请求中找到。

步骤 4

  • Flink Operator 部署:当它接收到 YAML 有效负载时,Flink Operator 负责部署 Flink 作业的各个组件。任务包括预配资源和管理 Flink 作业管理器、任务管理器和作业服务的部署。

步骤 5

  • 作业提交和执行:当 Flink 作业管理器运行时,它会从指定的 Google Cloud Storage 或 S3 位置获取 JAR 文件和配置。拥有所有必要的资源后,它会将 Flink 作业提交到独立的 Flink 集群以供执行。

步骤 6

  • 持续监控:部署后,我们的操作员会持续监控正在运行的 Flink 作业的状态。这种实时反馈循环使我们能够立即解决出现的任何问题,确保 Flink 应用程序的整体健康状况和最佳性能。

总之,我们的部署过程利用 Apache Beam 管道选项,与 Kubernetes 和 Flink Operator 无缝集成,并使用自定义逻辑来处理配置上传和身份验证。这种端到端工作流程确保在 Kubernetes 集群中可靠高效地部署 Flink 应用程序,同时保持持续监控以实现平稳运行。以下时序图显示了这些步骤。

Job Start Activity Diagram

开发自动缩放器

拥有自动缩放器对于拥有一个自管理的流式服务至关重要。互联网上没有足够的资源让我们学习构建自己的自动缩放器,这使得工作流的这一部分变得困难。

自动缩放器会增加任务管理器的数量以消除滞后并跟上吞吐量。它还会缩减处理传入流量所需的最低资源数量,以降低成本。我们需要经常这样做,同时将处理中断降至最低。

我们对自动缩放器进行了广泛的调整,以满足延迟的 SLA。这种调整涉及成本权衡。我们还使自动缩放器特定于应用程序,以满足某些应用程序的特定需求。每个决定都有一个隐藏的成本。本博文的第二部分将详细介绍自动缩放器。

创建用于流式作业开发的客户端库

要使用 Flink Kubernetes Operator 部署作业,您需要了解 Kubernetes 的工作原理。以下步骤说明了如何创建一个 Flink 作业。

  1. 定义一个具有适当规范的 YAML 文件。下图提供了一个示例。
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-reactive-example
spec:
  image: flink:1.13
  flinkVersion: v1_13
  flinkConfiguration:
    scheduler-mode: REACTIVE
    taskmanager.numberOfTaskSlots: "2"
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
          - mountPath: /flink-data
            name: flink-volume
      volumes:
      - name: flink-volume
        hostPath:
          # directory location on host
          path: /tmp/flink
          # this field is optional
          type: Directory
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 0
  mode: standalone
  1. SSH 到您的 Flink 集群并运行以下命令
kubectl create -f job1.yaml
  1. 使用以下命令检查作业状态
kubectl get flinkdeployment job1

此过程影响我们的可扩展性。由于我们经常更新作业,因此无法手动执行所有运行作业的这些步骤。这样做会非常容易出错且耗时。YAML 中的一个错误空格可能会导致部署失败。这种方法也是创新的障碍,因为您需要了解 Kubernetes 才能与 Flink 作业交互。

我们构建了一个库,为任何想要启动、删除、更新或获取作业状态的团队和应用程序提供接口。

Flink Kubernetes Operator Library

该库扩展了 Fabric8 客户端和 FlinkDeployment CRD。FlinkDeployment CRD 由 Flink Kubernetes Operator 公开。CRD 允许您存储和检索结构化数据。通过扩展 CRD,我们获得了对 POJO 的访问权限,这使得操作 YAML 文件变得更加容易。

该库支持以下任务

  1. 身份验证以确保您被允许对 Flink 集群执行操作。
  2. 验证(从 AWS/Google Cloud Storage 获取模板以进行验证)获取用户变量输入并根据策略、规则和 YAML 格式进行验证。
  3. 操作执行将 Java 调用转换为调用 Kubernetes 操作。

在此过程中,我们学习了以下教训

  1. 特定于应用程序的操作员服务:在我们的规模下,操作员无法处理如此大量的作业。Kubernetes 调用开始超时并失败。为了解决这个问题,我们在高流量区域创建了多个操作员(大约 4 个)来处理每个应用程序。
  2. Kube 调用缓存:为了防止过载,我们缓存了 Kubernetes 调用的结果 30 到 60 秒。
  3. 标签支持:为使用特定于客户端的变量搜索作业提供标签支持,减少了对 Kube 的负载,并将作业搜索速度提高了 5 倍。

以下是公开库后我们取得的一些最大成果

  1. 标准化的作业管理:用户可以使用单个库在 Kubernetes 环境中启动、删除和获取 Flink 作业的状态更新。
  2. 抽象的 Kubernetes 复杂性:团队不再需要担心 Kubernetes 的内部工作原理或格式化作业部署 YAML 文件。库在内部处理这些细节。
  3. 简化的升级:凭借底层的 Kubernetes 基础设施,该库为 Flink 作业管理带来了稳健性和容错能力,确保最小的停机时间和高效的恢复。

可观察性和警报

在大型生产系统中运行时,可观察性非常重要。我们在 PANW 中大约有 30,000 个流式作业。每个作业为特定应用程序的客户提供服务。每个作业还从 Kafka 中的多个主题读取数据,执行转换,然后将数据写入各种接收器和端点。

约束可能出现在管道或其端点的任何地方,例如客户 API、BigQuery 等。我们希望确保流式延迟满足 SLA。因此,了解作业是否健康、是否满足 SLA,以及在需要时发出警报和干预非常具有挑战性。

为了实现我们的运营目标,我们构建了先进的可观察性和警报功能。我们在以下部分介绍了三种可观察性和调试工具。

每个 Flink 作业都会将各种指标发送到我们的 Prometheus,其中包含基数详细信息,例如应用程序名称、客户 ID 和区域,以便我们可以查看每个作业。关键指标包括输入流量速率、输出吞吐量、Kafka 中的积压、基于时间戳的延迟、任务 CPU 使用率、任务数量、OOM 计数等等。

以下图表提供了一些示例。这些图表提供了有关特定客户对 Kafka 的摄取流量速率、流式作业的整体吞吐量、每个 vCPU 的吞吐量、Kafka 中的积压以及基于观察到的积压的 worker 自动缩放的详细信息。

Flink Job Metrics

Flink Job Autoscaling Metrics

以下图表显示了基于时间戳水印的流式延迟。除了 Kafka 中的事件数量作为积压外,了解端到端流式的延迟时间也很重要,这样我们就可以定义和监控 SLA。延迟定义为流式处理所花费的时间,从摄取时间戳开始,到发送到流式端点的時間戳。水印是最后处理的事件的时间。使用水印,我们跟踪 P100 延迟。我们跟踪每个事件的流式延迟,以便了解每个 Kafka 主题和分区或 Flink 作业管道问题。以下示例显示了每个事件流及其延迟

Apache Beam Watermark Metrics

我们使用并扩展了 Apache Flink 仪表板 UI 来监控作业和任务,例如检查点持续时间、大小和故障。我们使用的一个重要扩展是作业历史记录页面,它让我们可以查看作业的启动和更新时间线以及详细信息,这有助于我们调试问题。

Flink Checkpoint UI

积压和延迟的仪表板和警报

我们大约有 30,000 个作业,我们希望密切监控这些作业并接收处于异常状态的作业的警报,以便我们可以进行干预。我们为每个应用程序创建了仪表板,以便我们可以显示具有最高延迟的作业列表并为警报创建阈值。以下示例显示了某个应用程序的基于时间戳的延迟仪表板。如果延迟大于阈值(例如 10 分钟),并在一段时间内持续存在,我们可以设置警报。

Latency Graph

以下示例显示了更多基于积压的仪表板

Backlog Graph

这些警报基于阈值,我们经常检查指标。如果达到阈值并在一段时间内持续存在,我们会向我们的内部 Slack 频道或 PagerDuty 发出警报,以便立即关注。我们调整警报,以提高准确性。

成本优化策略和调整

我们还迁移到自管理的流式服务,以提高成本效率。一些微调使我们能够将成本降低一半,而且我们还有更多改进的机会。

以下列表包括一些帮助过我们的提示

  • 使用 Google Cloud Storage 作为检查点存储。
  • 减少写入 Google Cloud Storage 的频率。
  • 使用合适的机器类型。例如,在 Google Cloud 中,N2D 机器比 N2 机器便宜 15%。
  • 自动缩放任务以使用最佳资源,同时保持延迟 SLA。

以下部分将详细介绍前两个提示。

Google Cloud Storage 和检查点

我们使用 Google Cloud Storage 作为我们的检查点存储,因为它具有成本效益、可扩展性和耐用性。在使用 Google Cloud Storage 时,以下设计注意事项和最佳实践可以帮助您优化扩展和性能

  • 使用数据分区方法,例如范围分区(根据特定属性划分数据)和哈希分区(使用哈希函数均匀分配数据)。
  • 避免使用连续的键名称,尤其是时间戳,以避免热点和不均匀的数据分布。相反,为对象分布引入随机前缀。
  • 使用分层文件夹结构来改善数据管理并减少单个目录中的对象数量。
  • 将小文件合并成大文件以提高读取吞吐量。最小化小文件的数量可以减少低效的存储使用和元数据操作。

调整写入 Google Cloud Storage 的频率

高效地扩展作业是我们面临的主要挑战之一。无状态作业相对简单,但仍然存在障碍,特别是在 Flink 需要处理大量 worker 的情况下。为了克服这一挑战,我们将 state.storage.fs.memory-threshold 设置从 20KB 增加到 1 MB (??)。此配置使我们能够将小型检查点文件在作业管理器级别合并成更大的文件,并减少元数据调用。

优化 Google Cloud 操作的性能是另一个挑战。虽然 Google Cloud Storage 非常适合流式传输大量数据,但它在处理高频 I/O 请求时存在局限性。为了缓解这个问题,我们在键名称中引入了随机前缀,避免使用连续的键名称,并优化了 Google Cloud Storage 分片技术。这些方法显著提高了 Google Cloud Storage 的性能,使我们的无状态作业能够平稳运行。

以下图表显示了更改内存阈值后 Google Cloud Storage 写入量减少的情况

GCS write Graph

结论

Palo Alto Networks® Cortex Data Lake 已从 Dataflow 流式引擎完全迁移到 Flink 自管理流式引擎基础设施。我们已实现目标,以更经济高效的方式运行系统(成本降低一半以上),并在 GCP 和 AWS 等多个云上运行基础设施。我们已了解如何基于开源构建大规模可靠的生产系统。由于我们拥有很大的自由度来定制开源代码和配置,因此我们看到了根据特定需求定制系统的巨大潜力。在下一部分的第二篇文章中,我们将详细介绍自动缩放和性能调优部分。我们希望我们的经验能帮助读者探索类似的解决方案以应用于其组织。

其他资源

我们在此提供相关演示的链接,供有兴趣实施类似解决方案的读者作为进一步阅读材料。通过添加此部分,我们希望您可以找到更多关于构建全托管流式基础设施的详细信息,从而使读者更容易了解我们的故事和经验。

[1] PANW 在 Apache Beam 上发布的流式框架:https://beam.apache.org/case-studies/paloalto/

[2] PANW 在 Beam Summit 2023 上的演讲:https://youtu.be/IsGW8IU3NfA?feature=shared

[3] 在 Beam Summit 2021 上发布的基准测试:https://2021.beamsummit.org/sessions/tpc-ds-and-apache-beam/

[4] PANW 对 Flink 的开源贡献,用于 GKE Auth 支持:https://github.com/fabric8io/kubernetes-client/pull/4185

鸣谢

这是构建新的基础设施并从云提供商管理的流式基础设施迁移到基于 Flink 的自管理流式基础设施的大规模工作。感谢 Palo Alto Networks CDL 流式团队帮助实现这一目标:Kishore Pola、Andrew Park、Hemant Kumar、Manan Mangal、Helen Jiang、Mandy Wang、Praveen Kumar Pasupuleti、JM Teo、Rishabh Kedia、Talat Uyarer、Naitk Dani 和 David He。