确保 Python 类型安全

Python 是一种动态类型语言,没有静态类型检查。由于 Python 类型检查的工作方式以及运行器执行的延迟性,开发人员的生产力很容易受到因调查类型相关错误而浪费的时间的限制。

Apache Beam SDK for Python 在管道构建和运行时使用**类型提示**来尝试模拟通过真正的静态类型实现的正确性保证。此外,使用类型提示为后端服务执行有效的类型推断和 Coder 对象注册奠定了一些基础。

Python 版本 3.5 引入了一个名为**typing**的模块,为语言中的类型验证器提供提示。Beam SDK for Python 实现了一个PEP 484 的子集,并力争在自己的 typehints 模块中尽可能紧密地遵循它。

这些标志控制 Beam 类型安全

类型提示的优势

当您使用类型提示时,Beam 会在管道构建时间(而不是运行时)引发异常。例如,如果 Beam 检测到您的管道应用了不匹配的 PTransforms(其中一个转换的预期输出与下一个转换的预期输入不匹配),Beam 会生成一个异常。无论您的管道将在哪里执行,这些异常都会在管道构建时引发。为定义的 PTransforms 引入类型提示可以让您在本地运行器中提前捕获潜在的错误,而不是在执行了数分钟之后才进入一个深层、复杂的管道。

考虑以下示例,其中 numbers 是一个包含 str 值的 PCollection

p = TestPipeline()

numbers = p | beam.Create(['1', '2', '3'])

然后,代码将 Filter 转换应用于 numbers 集合,该转换使用一个可调用函数来检索偶数。

evens = numbers | beam.Filter(lambda x: x % 2 == 0)

当您调用 p.run() 时,这段代码在尝试执行此转换时会生成一个错误,因为 Filter 预期一个包含整数的 PCollection,但它却得到一个包含字符串的 PCollection。使用类型提示,此错误可以在管道构建时捕获,甚至在管道开始运行之前。

Beam SDK for Python 包含一些自动类型提示:例如,一些 PTransforms(如 Create 和简单的 ParDo 转换)会尝试根据其输入推断其输出类型。但是,Beam 并非在所有情况下都可以推断类型。因此,建议您声明类型提示以帮助您执行自己的类型检查。

声明类型提示

您可以对可调用对象、DoFns 或整个 PTransforms 声明类型提示。有三种方法可以声明类型提示:在管道构建期间内联声明、作为使用装饰器的 DoFnPTransform 的属性,或作为某些函数的 Python 3 类型注释。

您始终可以在管道构建期间内联声明类型提示,但是如果需要它们用于将要重用的代码,请将它们声明为注释或装饰器。例如,如果您的 DoFn 需要 int 输入,那么将输入的类型提示声明为 process 的参数的注释(或 DoFn 的属性)更有意义,而不是内联声明。

使用注释还有一个额外的好处,就是允许使用静态类型检查器(如 mypy)来额外检查您的代码类型。如果您已经使用类型检查器,那么使用注释而不是装饰器可以减少代码重复。但是,注释并没有涵盖装饰器和内联声明所能涵盖的所有用例。例如,它们不适用于 lambda 函数。

使用类型注释声明类型提示

版本 2.21.0 中的新功能。

要在某些函数上指定类型提示作为注释,请照常使用它们,并省略任何装饰器提示或内联提示。

目前支持在以下位置使用注释:

以下代码使用 my_fn 上的注释,在 to_id 转换上声明了一个 int 输入和 str 输出类型提示。

def my_fn(element: int) -> str:
  return 'id_' + str(element)

ids = numbers | 'to_id' >> beam.Map(my_fn)

以下代码演示了如何在 PTransform 子类上使用注释。有效的注释是一个包装内部(嵌套)类型的 PCollectionPBeginPDoneNone。以下代码使用注释,在自定义 PTransform 上声明类型提示,该 PTransform 接受一个 PCollection[int] 输入,并输出一个 PCollection[str]

from apache_beam.pvalue import PCollection

class IntToStr(beam.PTransform):
  def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
    return pcoll | beam.Map(lambda elem: str(elem))

ids = numbers | 'convert to str' >> IntToStr()

以下代码使用 FilterEvensDoFn.process 上的注释,在 filter_evens 上声明了 int 输入和输出类型提示。由于 process 返回一个生成器,因此生成 PCollection[int] 的 DoFn 的输出类型被注释为 Iterable[int]Generator[int, None, None] 也适用于此)。Beam 将从 DoFn.process 方法和传递给 FlatMap 的函数的返回类型的外部可迭代对象中删除外部可迭代对象,以推断生成的 PCollection 的元素类型。对于这些函数,具有非可迭代的返回类型注释将是一个错误。其他支持的可迭代类型包括:IteratorGeneratorTupleList

from typing import Iterable

class TypedFilterEvensDoFn(beam.DoFn):
  def process(self, element: int) -> Iterable[int]:
    if element % 2 == 0:
      yield element

evens = numbers | 'filter_evens' >> beam.ParDo(TypedFilterEvensDoFn())

以下代码使用 FilterEvensDoubleDoFn.process 上的注释,在 double_evens 上声明了 int 输入和输出类型提示。由于 process 返回一个 listNone,因此输出类型被注释为 Optional[List[int]]。Beam 也将删除外部 Optional 以及(如上所述)外部可迭代对象,只在 DoFn.process 方法和传递给 FlatMap 的函数上进行删除。

from typing import List, Optional

class FilterEvensDoubleDoFn(beam.DoFn):
  def process(self, element: int) -> Optional[List[int]]:
    if element % 2 == 0:
      return [element, element]
    return None

evens = numbers | 'double_evens' >> beam.ParDo(FilterEvensDoubleDoFn())

内联声明类型提示

要内联指定类型提示,请使用 with_input_typeswith_output_types 方法。以下示例代码内联声明了一个输入类型提示

evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)

当您将 Filter 转换应用于上面的示例中的 numbers 集合时,您将能够在管道构建期间捕获该错误。

使用装饰器声明类型提示

要将类型提示指定为 DoFnPTransform 的属性,请使用 @with_input_types()@with_output_types() 装饰器。

以下代码使用 @with_input_types() 装饰器,在 FilterEvensDoFn 上声明了一个 int 类型提示。

@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
  def process(self, element):
    if element % 2 == 0:
      yield element

evens = numbers | beam.ParDo(FilterEvensDoFn())

装饰器接收任意数量的位置参数和/或关键字参数,通常在它们所包装的函数的上下文中进行解释。通常,第一个参数是主输入的类型提示,其他参数是侧输入的类型提示。

禁用注释使用

由于这种类型提示声明方式默认情况下是启用的,因此这里有一些方法可以禁用它。

  1. 在您希望 Beam 忽略其注释的特定函数上使用 @beam.typehints.no_annotations 装饰器。
  2. 使用上面介绍的装饰器或内联方法声明类型提示。这些方法将优先于注释。
  3. 在创建管道之前调用 beam.typehints.disable_type_annotations()。这将阻止 Beam 查看所有函数上的注释。

定义泛型类型

您可以使用类型提示注释来定义泛型类型。以下代码指定了一个输入类型提示,该提示断言泛型类型 T,以及一个输出类型提示,该提示断言类型 Tuple[int, T]。如果 MyTransform 的输入是 str 类型,Beam 将推断输出类型为 Tuple[int, str]

from typing import Tuple, TypeVar

T = TypeVar('T')

@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(Tuple[int, T])
class MyTransform(beam.PTransform):
  def expand(self, pcoll):
    return pcoll | beam.Map(lambda x: (len(x), x))

words_with_lens = words | MyTransform()

类型提示的种类

您可以对任何类使用类型提示,包括 Python 原生类型、容器类和用户定义类。所有类,例如 intfloat 和用户定义类,都可以用来定义类型提示,称为**简单类型提示**。容器类型,例如列表、元组和可迭代对象,也可以用来定义类型提示,称为**参数化类型提示**。最后,还有一些特殊类型不对应任何具体的 Python 类,例如 AnyOptionalUnion,这些类型也可以作为类型提示使用。

Beam 定义了自己的内部类型提示类型,这些类型仍然可用于向后兼容。它还支持 Python 的 typing 模块类型,这些类型在内部会转换为 Beam 内部类型。

对于新代码,建议使用 typing 模块类型。

简单类型提示

类型提示可以是任何类,从 intstr 到用户定义类。如果您将类作为类型提示,您可能希望为其定义一个编码器。

参数化类型提示

参数化类型提示对于提示容器类 Python 对象(例如 list)的类型很有用。这些类型提示进一步细化了这些容器对象中的元素。

参数化类型提示的参数可以是简单类型、参数化类型或类型变量。作为类型变量的元素类型(例如 T)在操作的输入和输出之间建立关系(例如,List[T] -> T)。类型提示可以嵌套,允许您为复杂类型定义类型提示。例如,List[Tuple[int, int, str]]

为了避免与内置容器类型的命名空间冲突,第一个字母大写。

以下参数化类型提示是允许的

注意:Tuple[T, U] 类型提示是具有固定数量异构类型元素的元组,而 Tuple[T, ...] 类型提示是具有可变数量同构类型元素的元组。

特殊类型提示

以下是一些特殊类型提示,它们不对应于任何类,而是对应于 PEP 484 中引入的特殊类型。

运行时类型检查

除了在管道构建时使用类型提示进行类型检查外,您还可以启用运行时类型检查,以检查实际元素在管道执行期间是否满足声明的类型约束。

例如,以下管道发出错误类型的元素。根据运行器实现,其执行可能在运行时失败,也可能不失败。

p = TestPipeline()
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)

但是,如果您启用运行时类型检查,则代码保证在运行时失败。要启用运行时类型检查,请将管道选项 runtime_type_check 设置为 True

p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()

请注意,由于运行时类型检查是对每个 PCollection 元素执行的,因此启用此功能可能会导致性能显着下降。因此,建议在生产管道中禁用运行时类型检查。有关更快、更适合生产环境的替代方法,请参阅下一节。

更快的运行时类型检查

您可以通过将管道选项 performance_runtime_type_check 设置为 True 来启用更快的基于采样的运行时类型检查。

这是一个仅 Python 3 功能,通过使用优化后的 Cython 代码对称为样本的少量值进行运行时类型检查来实现。

目前,此功能不支持对侧输入或合并操作进行运行时类型检查。这些功能将在 Beam 的未来版本中得到支持。

类型提示在编码器中的使用

当您的管道读取、写入或以其他方式具体化其数据时,您 PCollection 中的元素需要被编码和解码为字节字符串。字节字符串用于中间存储、在 GroupByKey 操作中比较键以及从源读取和写入接收器。

适用于 Python 的 Beam SDK 使用 Python 对未知类型对象进行序列化的原生支持,此过程称为**pickle**。但是,使用 PickleCoder 会带来一些缺点:它在时间和空间上效率较低,并且使用的编码不是确定性的,这会阻碍分布式分区、分组和状态查找。

为了避免这些缺点,您可以为编码和解码类型定义 Coder 类,从而以更高效的方式实现。您可以指定一个 Coder 来描述如何编码和解码给定 PCollection 的元素。

为了使 Coder 正确且高效,它需要类型信息,并且 PCollections 需要与特定类型相关联。类型提示正是使这种类型信息可用的方法。适用于 Python 的 Beam SDK 为标准 Python 类型(例如 intfloatstrbytesunicode)提供内置编码器。

确定性编码器

如果您没有定义 Coder,则默认情况下使用一个编码器,该编码器会对未知类型回退到 pickle。在某些情况下,您必须指定一个确定性 Coder,否则会出现运行时错误。

例如,假设您有一个 PCollection,它包含键值对,这些键值对的键是 Player 对象。如果您对这样的集合应用 GroupByKey 变换,则当使用非确定性编码器(例如默认的 pickle 编码器)时,其键对象可能在不同的机器上以不同的方式序列化。由于 GroupByKey 使用这种序列化表示来比较键,因此这可能会导致行为不正确。为了确保元素始终以相同的方式编码和解码,您需要为 Player 类定义一个确定性 Coder

以下代码显示了示例 Player 类以及如何为其定义 Coder。当您使用类型提示时,Beam 会使用 beam.coders.registry 推断要使用的 Coders。以下代码将 PlayerCoder 注册为 Player 类的编码器。在本例中,为 CombinePerKey 声明的输入类型是 Tuple[Player, int]。在这种情况下,Beam 会推断要使用的 Coder 对象是 TupleCoderPlayerCoderIntCoder

from typing import Tuple

class Player(object):
  def __init__(self, team, name):
    self.team = team
    self.name = name

class PlayerCoder(beam.coders.Coder):
  def encode(self, player):
    return ('%s:%s' % (player.team, player.name)).encode('utf-8')

  def decode(self, s):
    return Player(*s.decode('utf-8').split(':'))

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(Player, PlayerCoder)

def parse_player_and_score(csv):
  name, team, score = csv.split(',')
  return Player(team, name), int(score)

totals = (
    lines
    | beam.Map(parse_player_and_score)
    | beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))