测试您的管道
测试您的管道是在开发有效的数据处理解决方案中特别重要的一步。Beam 模型的间接性(其中您的用户代码构建了一个将在远程执行的管道图)会使调试失败的运行变得不平凡的任务。通常,在您的管道代码上执行本地单元测试比调试管道的远程执行速度更快、更简单。
在您在所选运行器上运行管道之前,在本地对您的管道代码进行单元测试通常是识别和修复管道代码中错误的最佳方法。在本地对您的管道进行单元测试还允许您使用您熟悉/喜欢的本地调试工具。
您可以使用DirectRunner,或者PrismRunner。两者都是本地运行器,有助于测试和本地开发。
在您在本地测试您的管道之后,您可以使用您选择的运行器来进行小规模测试。例如,将 Flink 运行器与本地或远程 Flink 集群一起使用。
Beam SDK 提供了许多方法来对您的管道代码进行单元测试,从最低级别到最高级别。从最低级别到最高级别,它们是
- 您可以测试管道中使用的各个函数。
- 您可以将整个转换作为一个单元进行测试。
- 您可以对整个管道进行端到端测试。
为了支持单元测试,Beam SDK for Java 在testing 包中提供了许多测试类。您可以使用这些测试作为参考和指南。
测试转换
要测试您创建的转换,您可以使用以下模式
- 创建一个
TestPipeline
。 - 创建一些静态的、已知的测试输入数据。
- 使用
Create
转换从您的输入数据创建一个PCollection
。 - 将您的转换
应用
于输入PCollection
,并保存生成的输出PCollection
。 - 使用
PAssert
及其子类来验证输出PCollection
是否包含您期望的元素。
TestPipeline
TestPipeline 是 Beam Java SDK 中专门用于测试转换的类。
TestPipeline 是 Beam Python SDK 中专门用于测试转换的类。
对于测试,使用TestPipeline
代替 Pipeline
来创建管道对象。与 Pipeline.create
不同,TestPipeline.create
在内部处理设置 PipelineOptions
。您可以按照以下步骤创建一个 TestPipeline
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
ptest.Main(m)
}
func TestPipeline(t *testing.T) {
...
// The Go SDK doesn't use a TestPipeline concept,
// and recommends using the ptest harness
// to wrap pipeline construction.
pr := ptest.BuildAndRun(t, func(s beam.Scope) {
...
})
...
}
注意:在 Beam 中阅读有关测试无界管道的信息这篇博文。
使用 Create 转换
您可以使用 Create
转换从标准内存中集合类(如 Java 或 Python List
)创建一个 PCollection
。有关更多信息,请参阅创建 PCollection。
PAssert
PAssert 是 Beam Java SDK 中的一个类,是对 PCollection
内容的断言。您可以使用 PAssert
来验证 PCollection
是否包含一组特定的预期元素。
对于给定的 PCollection
,您可以使用 PAssert
来验证内容,如下所示
任何使用 PAssert
的 Java 代码都必须链接到 JUnit
和 Hamcrest
。如果您使用的是 Maven,您可以通过将以下依赖项添加到项目的 pom.xml
文件中来链接到 Hamcrest
有关这些类如何工作的更多信息,请参阅org.apache.beam.sdk.testing 包文档。
复合转换的示例测试
以下代码显示了复合转换的完整测试。测试将 Count
转换应用于 String
元素的输入 PCollection
。测试使用 Create
转换从 List<String>
创建输入 PCollection
。
public class CountTest {
// Our static input data, which will make up the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// Create a test pipeline.
Pipeline p = TestPipeline.create();
// Create an input PCollection.
PCollection<String> input = p.apply(Create.of(WORDS));
// Apply the Count transform under test.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// Assert on the results.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountTest(unittest.TestCase):
def test_count(self):
# Our static input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Create a test pipeline.
with TestPipeline() as p:
# Create an input PCollection.
input = p | beam.Create(WORDS)
# Apply the Count transform under test.
output = input | beam.combiners.Count.PerElement()
# Assert on the results.
assert_that(
output,
equal_to([
("hi", 4),
("there", 1),
("sue", 2),
("bob", 2),
("", 3),
("ZOW", 1)]))
# The pipeline will run and verify the results.
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %d", w, c)
}
// Register the functional DoFn to ensure execution on workers.
func init() {
register.Function2x1(formatFn)
}
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Apply the Count transform under test.
output := stats.Count(s, col)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
端到端测试管道
您可以使用 Beam SDK 中的测试类(例如 Beam SDK for Java 中的 TestPipeline
和 PAssert
)来端到端测试整个管道。通常,要测试整个管道,您需要执行以下操作
- 对于管道输入数据的每个来源,创建一些已知的静态测试输入数据。
- 创建一些静态测试输出数据,这些数据与您在管道最终输出
PCollection
(s) 中期望的匹配。 - 创建一个
TestPipeline
来代替标准Pipeline.create
。 - 代替管道中的
Read
转换(s),使用Create
转换从您的静态输入数据创建一个或多个PCollection
。 - 应用管道的转换。
- 代替管道中的
Write
转换(s),使用PAssert
来验证管道生成的最终PCollection
的内容是否与您的静态输出数据中的预期值匹配。
测试 WordCount 管道
以下示例代码展示了如何测试WordCount 示例管道。WordCount
通常从文本文件读取行作为输入数据;相反,测试创建一个包含一些文本行的 List<String>
,并使用 Create
变换来创建一个初始 PCollection
。
WordCount
的最终变换(来自复合变换 CountWords
)生成一个格式化的词频统计结果的 PCollection<String>
,适合打印。我们的测试管道没有将该 PCollection
写入输出文本文件,而是使用 PAssert
来验证 PCollection
的元素是否与包含我们预期输出数据的静态 String
数组的元素匹配。
public class WordCountTest {
// Our static input data, which will comprise the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
// Our static output data, which is the expected data that the final PCollection must match.
static final String[] COUNTS_ARRAY = new String[] {
"hi: 5", "there: 1", "sue: 2", "bob: 2"};
// Example test that tests the pipeline's transforms.
public void testCountWords() throws Exception {
Pipeline p = TestPipeline.create();
// Create a PCollection from the WORDS static input data.
PCollection<String> input = p.apply(Create.of(WORDS));
// Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
PCollection<String> output = input.apply(new CountWords());
// Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountWords(beam.PTransform):
# CountWords transform omitted for conciseness.
# Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py
class WordCountTest(unittest.TestCase):
# Our input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Our output data, which is the expected data that the final PCollection must match.
EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]
# Example test that tests the pipeline's transforms.
def test_count_words(self):
with TestPipeline() as p:
# Create a PCollection from the WORDS static input data.
input = p | beam.Create(WORDS)
# Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
output = input | CountWords()
# Assert that the output PCollection matches the EXPECTED_COUNTS data.
assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')
# The pipeline will run and verify the results.
package wordcount
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }
func formatFn(w string, c int) string { ... }
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Run ALL the pipeline's transforms
// (in this case, the CountWords composite transform).
output := CountWords(s, input)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches
// the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
最后更新时间:2024/10/31
您是否找到了您要找的所有内容?
所有内容是否都很有用且清晰?您是否想更改任何内容?请告诉我们!