确保 Python 类型安全
Python 是一种动态类型语言,没有静态类型检查。由于 Python 类型检查的工作方式以及运行器执行的延迟性,开发人员的生产力很容易受到因调查类型相关错误而浪费的时间的限制。
Apache Beam SDK for Python 在管道构建和运行时使用**类型提示**来尝试模拟通过真正的静态类型实现的正确性保证。此外,使用类型提示为后端服务执行有效的类型推断和 Coder
对象注册奠定了一些基础。
Python 版本 3.5 引入了一个名为**typing**的模块,为语言中的类型验证器提供提示。Beam SDK for Python 实现了一个PEP 484 的子集,并力争在自己的 typehints 模块中尽可能紧密地遵循它。
这些标志控制 Beam 类型安全
--no_pipeline_type_check
在管道构建期间禁用类型检查。默认情况下执行这些检查。
--runtime_type_check
启用每个元素的运行时类型检查。这可能会影响管道性能,因此默认情况下会跳过这些检查。
--type_check_additional
启用其他类型检查。默认情况下不启用这些检查以保留向后兼容性。此标志接受一个用逗号分隔的选项列表
all
: 启用所有其他检查。ptransform_fn
: 在使用@ptransform_fn
装饰器时启用类型提示装饰器。
类型提示的优势
当您使用类型提示时,Beam 会在管道构建时间(而不是运行时)引发异常。例如,如果 Beam 检测到您的管道应用了不匹配的 PTransforms
(其中一个转换的预期输出与下一个转换的预期输入不匹配),Beam 会生成一个异常。无论您的管道将在哪里执行,这些异常都会在管道构建时引发。为定义的 PTransforms
引入类型提示可以让您在本地运行器中提前捕获潜在的错误,而不是在执行了数分钟之后才进入一个深层、复杂的管道。
考虑以下示例,其中 numbers
是一个包含 str
值的 PCollection
然后,代码将 Filter
转换应用于 numbers
集合,该转换使用一个可调用函数来检索偶数。
当您调用 p.run()
时,这段代码在尝试执行此转换时会生成一个错误,因为 Filter
预期一个包含整数的 PCollection
,但它却得到一个包含字符串的 PCollection
。使用类型提示,此错误可以在管道构建时捕获,甚至在管道开始运行之前。
Beam SDK for Python 包含一些自动类型提示:例如,一些 PTransforms
(如 Create
和简单的 ParDo
转换)会尝试根据其输入推断其输出类型。但是,Beam 并非在所有情况下都可以推断类型。因此,建议您声明类型提示以帮助您执行自己的类型检查。
声明类型提示
您可以对可调用对象、DoFns
或整个 PTransforms
声明类型提示。有三种方法可以声明类型提示:在管道构建期间内联声明、作为使用装饰器的 DoFn
或 PTransform
的属性,或作为某些函数的 Python 3 类型注释。
您始终可以在管道构建期间内联声明类型提示,但是如果需要它们用于将要重用的代码,请将它们声明为注释或装饰器。例如,如果您的 DoFn
需要 int
输入,那么将输入的类型提示声明为 process
的参数的注释(或 DoFn
的属性)更有意义,而不是内联声明。
使用注释还有一个额外的好处,就是允许使用静态类型检查器(如 mypy)来额外检查您的代码类型。如果您已经使用类型检查器,那么使用注释而不是装饰器可以减少代码重复。但是,注释并没有涵盖装饰器和内联声明所能涵盖的所有用例。例如,它们不适用于 lambda 函数。
使用类型注释声明类型提示
版本 2.21.0 中的新功能。
要在某些函数上指定类型提示作为注释,请照常使用它们,并省略任何装饰器提示或内联提示。
目前支持在以下位置使用注释:
DoFn
子类的process()
方法。PTransform
子类的expand()
方法。- 传递给以下函数的函数:
ParDo
、Map
、FlatMap
、Filter
。
以下代码使用 my_fn
上的注释,在 to_id
转换上声明了一个 int
输入和 str
输出类型提示。
以下代码演示了如何在 PTransform
子类上使用注释。有效的注释是一个包装内部(嵌套)类型的 PCollection
、PBegin
、PDone
或 None
。以下代码使用注释,在自定义 PTransform 上声明类型提示,该 PTransform 接受一个 PCollection[int]
输入,并输出一个 PCollection[str]
。
以下代码使用 FilterEvensDoFn.process
上的注释,在 filter_evens
上声明了 int
输入和输出类型提示。由于 process
返回一个生成器,因此生成 PCollection[int]
的 DoFn 的输出类型被注释为 Iterable[int]
(Generator[int, None, None]
也适用于此)。Beam 将从 DoFn.process
方法和传递给 FlatMap
的函数的返回类型的外部可迭代对象中删除外部可迭代对象,以推断生成的 PCollection 的元素类型。对于这些函数,具有非可迭代的返回类型注释将是一个错误。其他支持的可迭代类型包括:Iterator
、Generator
、Tuple
、List
。
以下代码使用 FilterEvensDoubleDoFn.process
上的注释,在 double_evens
上声明了 int
输入和输出类型提示。由于 process
返回一个 list
或 None
,因此输出类型被注释为 Optional[List[int]]
。Beam 也将删除外部 Optional
以及(如上所述)外部可迭代对象,只在 DoFn.process
方法和传递给 FlatMap
的函数上进行删除。
内联声明类型提示
要内联指定类型提示,请使用 with_input_types
和 with_output_types
方法。以下示例代码内联声明了一个输入类型提示
当您将 Filter 转换应用于上面的示例中的 numbers 集合时,您将能够在管道构建期间捕获该错误。
使用装饰器声明类型提示
要将类型提示指定为 DoFn
或 PTransform
的属性,请使用 @with_input_types()
和 @with_output_types()
装饰器。
以下代码使用 @with_input_types()
装饰器,在 FilterEvensDoFn
上声明了一个 int
类型提示。
装饰器接收任意数量的位置参数和/或关键字参数,通常在它们所包装的函数的上下文中进行解释。通常,第一个参数是主输入的类型提示,其他参数是侧输入的类型提示。
禁用注释使用
由于这种类型提示声明方式默认情况下是启用的,因此这里有一些方法可以禁用它。
- 在您希望 Beam 忽略其注释的特定函数上使用
@beam.typehints.no_annotations
装饰器。 - 使用上面介绍的装饰器或内联方法声明类型提示。这些方法将优先于注释。
- 在创建管道之前调用
beam.typehints.disable_type_annotations()
。这将阻止 Beam 查看所有函数上的注释。
定义泛型类型
您可以使用类型提示注释来定义泛型类型。以下代码指定了一个输入类型提示,该提示断言泛型类型 T
,以及一个输出类型提示,该提示断言类型 Tuple[int, T]
。如果 MyTransform
的输入是 str
类型,Beam 将推断输出类型为 Tuple[int, str]
。
类型提示的种类
您可以对任何类使用类型提示,包括 Python 原生类型、容器类和用户定义类。所有类,例如 int
、float
和用户定义类,都可以用来定义类型提示,称为**简单类型提示**。容器类型,例如列表、元组和可迭代对象,也可以用来定义类型提示,称为**参数化类型提示**。最后,还有一些特殊类型不对应任何具体的 Python 类,例如 Any
、Optional
和 Union
,这些类型也可以作为类型提示使用。
Beam 定义了自己的内部类型提示类型,这些类型仍然可用于向后兼容。它还支持 Python 的 typing 模块类型,这些类型在内部会转换为 Beam 内部类型。
对于新代码,建议使用 typing 模块类型。
简单类型提示
类型提示可以是任何类,从 int
和 str
到用户定义类。如果您将类作为类型提示,您可能希望为其定义一个编码器。
参数化类型提示
参数化类型提示对于提示容器类 Python 对象(例如 list
)的类型很有用。这些类型提示进一步细化了这些容器对象中的元素。
参数化类型提示的参数可以是简单类型、参数化类型或类型变量。作为类型变量的元素类型(例如 T
)在操作的输入和输出之间建立关系(例如,List[T]
-> T
)。类型提示可以嵌套,允许您为复杂类型定义类型提示。例如,List[Tuple[int, int, str]]
。
为了避免与内置容器类型的命名空间冲突,第一个字母大写。
以下参数化类型提示是允许的
Tuple[T, U]
Tuple[T, ...]
List[T]
KV[T, U]
Dict[T, U]
Set[T]
FrozenSet[T]
Iterable[T]
Iterator[T]
Generator[T]
PCollection[T]
注意:Tuple[T, U]
类型提示是具有固定数量异构类型元素的元组,而 Tuple[T, ...]
类型提示是具有可变数量同构类型元素的元组。
特殊类型提示
以下是一些特殊类型提示,它们不对应于任何类,而是对应于 PEP 484 中引入的特殊类型。
Any
Union[T, U, V]
Optional[T]
运行时类型检查
除了在管道构建时使用类型提示进行类型检查外,您还可以启用运行时类型检查,以检查实际元素在管道执行期间是否满足声明的类型约束。
例如,以下管道发出错误类型的元素。根据运行器实现,其执行可能在运行时失败,也可能不失败。
但是,如果您启用运行时类型检查,则代码保证在运行时失败。要启用运行时类型检查,请将管道选项 runtime_type_check
设置为 True
。
请注意,由于运行时类型检查是对每个 PCollection
元素执行的,因此启用此功能可能会导致性能显着下降。因此,建议在生产管道中禁用运行时类型检查。有关更快、更适合生产环境的替代方法,请参阅下一节。
更快的运行时类型检查
您可以通过将管道选项 performance_runtime_type_check
设置为 True
来启用更快的基于采样的运行时类型检查。
这是一个仅 Python 3 功能,通过使用优化后的 Cython 代码对称为样本的少量值进行运行时类型检查来实现。
目前,此功能不支持对侧输入或合并操作进行运行时类型检查。这些功能将在 Beam 的未来版本中得到支持。
类型提示在编码器中的使用
当您的管道读取、写入或以其他方式具体化其数据时,您 PCollection
中的元素需要被编码和解码为字节字符串。字节字符串用于中间存储、在 GroupByKey
操作中比较键以及从源读取和写入接收器。
适用于 Python 的 Beam SDK 使用 Python 对未知类型对象进行序列化的原生支持,此过程称为**pickle**。但是,使用 PickleCoder
会带来一些缺点:它在时间和空间上效率较低,并且使用的编码不是确定性的,这会阻碍分布式分区、分组和状态查找。
为了避免这些缺点,您可以为编码和解码类型定义 Coder
类,从而以更高效的方式实现。您可以指定一个 Coder
来描述如何编码和解码给定 PCollection
的元素。
为了使 Coder
正确且高效,它需要类型信息,并且 PCollections
需要与特定类型相关联。类型提示正是使这种类型信息可用的方法。适用于 Python 的 Beam SDK 为标准 Python 类型(例如 int
、float
、str
、bytes
和 unicode
)提供内置编码器。
确定性编码器
如果您没有定义 Coder
,则默认情况下使用一个编码器,该编码器会对未知类型回退到 pickle。在某些情况下,您必须指定一个确定性 Coder
,否则会出现运行时错误。
例如,假设您有一个 PCollection
,它包含键值对,这些键值对的键是 Player
对象。如果您对这样的集合应用 GroupByKey
变换,则当使用非确定性编码器(例如默认的 pickle 编码器)时,其键对象可能在不同的机器上以不同的方式序列化。由于 GroupByKey
使用这种序列化表示来比较键,因此这可能会导致行为不正确。为了确保元素始终以相同的方式编码和解码,您需要为 Player
类定义一个确定性 Coder
。
以下代码显示了示例 Player
类以及如何为其定义 Coder
。当您使用类型提示时,Beam 会使用 beam.coders.registry
推断要使用的 Coders
。以下代码将 PlayerCoder
注册为 Player
类的编码器。在本例中,为 CombinePerKey
声明的输入类型是 Tuple[Player, int]
。在这种情况下,Beam 会推断要使用的 Coder
对象是 TupleCoder
、PlayerCoder
和 IntCoder
。
from typing import Tuple
class Player(object):
def __init__(self, team, name):
self.team = team
self.name = name
class PlayerCoder(beam.coders.Coder):
def encode(self, player):
return ('%s:%s' % (player.team, player.name)).encode('utf-8')
def decode(self, s):
return Player(*s.decode('utf-8').split(':'))
def is_deterministic(self):
return True
beam.coders.registry.register_coder(Player, PlayerCoder)
def parse_player_and_score(csv):
name, team, score = csv.split(',')
return Player(team, name), int(score)
totals = (
lines
| beam.Map(parse_player_and_score)
| beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))