测试您的管道

测试您的管道是在开发有效的​​数据处理解决方案中特别重要的一步。Beam 模型的间接性(其中您的用户代码构建了一个将在远程执行的管道图)会使调试失败的运行变得不平凡的任务。通常,在您的管道代码上执行本地单元测试比调试管道的远程执行速度更快、更简单。

在您在所选运行器上运行管道之前,在本地对您的管道代码进行单元测试通常是识别和修复管道代码中错误的最佳方法。在本地对您的管道进行单元测试还允许您使用您熟悉/喜欢的本地调试工具。

您可以使用DirectRunner,或者PrismRunner。两者都是本地运行器,有助于测试和本地开发。

在您在本地测试您的管道之后,您可以使用您选择的运行器来进行小规模测试。例如,将 Flink 运行器与本地或远程 Flink 集群一起使用。

Beam SDK 提供了许多方法来对您的管道代码进行单元测试,从最低级别到最高级别。从最低级别到最高级别,它们是

为了支持单元测试,Beam SDK for Java 在testing 包中提供了许多测试类。您可以使用这些测试作为参考和指南。

测试转换

要测试您创建的转换,您可以使用以下模式

TestPipeline

TestPipeline 是 Beam Java SDK 中专门用于测试转换的类。

TestPipeline 是 Beam Python SDK 中专门用于测试转换的类。

对于测试,使用 TestPipeline 代替 Pipeline 来创建管道对象。与 Pipeline.create 不同,TestPipeline.create 在内部处理设置 PipelineOptions

您可以按照以下步骤创建一个 TestPipeline

Pipeline p = TestPipeline.create();
with TestPipeline as p:
    ...
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"

// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
	ptest.Main(m)
}

func TestPipeline(t *testing.T) {
	...
	// The Go SDK doesn't use a TestPipeline concept,
	// and recommends using the ptest harness
	// to wrap pipeline construction.
	pr := ptest.BuildAndRun(t, func(s beam.Scope) {
		...
	})
	...
}

注意:在 Beam 中阅读有关测试无界管道的信息这篇博文

使用 Create 转换

您可以使用 Create 转换从标准内存中集合类(如 Java 或 Python List)创建一个 PCollection。有关更多信息,请参阅创建 PCollection

PAssert

PAssert 是 Beam Java SDK 中的一个类,是对 PCollection 内容的断言。您可以使用 PAssert 来验证 PCollection 是否包含一组特定的预期元素。

对于给定的 PCollection,您可以使用 PAssert 来验证内容,如下所示

PCollection<String> output = ...;

// Check whether a PCollection contains some elements in any order.
PAssert.that(output)
.containsInAnyOrder(
  "elem1",
  "elem3",
  "elem2");
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

output = ...

# Check whether a PCollection contains some elements in any order.
assert_that(
    output,
    equal_to(["elem1", "elem3", "elem2"]))
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"

output := ... // beam.PCollection

// Check whether a PCollection contains some elements in any order.
passert.EqualsList(s, output, ["elem1", "elem3", "elem2"])

任何使用 PAssert 的 Java 代码都必须链接到 JUnitHamcrest。如果您使用的是 Maven,您可以通过将以下依赖项添加到项目的 pom.xml 文件中来链接到 Hamcrest

<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest</artifactId>
    <version>2.2</version>
    <scope>test</scope>
</dependency>

有关这些类如何工作的更多信息,请参阅org.apache.beam.sdk.testing 包文档。

复合转换的示例测试

以下代码显示了复合转换的完整测试。测试将 Count 转换应用于 String 元素的输入 PCollection。测试使用 Create 转换从 List<String> 创建输入 PCollection

public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
  "hi", "there", "hi", "hi", "sue", "bob",
  "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

  public void testCount() {
    // Create a test pipeline.
    Pipeline p = TestPipeline.create();

    // Create an input PCollection.
    PCollection<String> input = p.apply(Create.of(WORDS));

    // Apply the Count transform under test.
    PCollection<KV<String, Long>> output =
      input.apply(Count.<String>perElement());

    // Assert on the results.
    PAssert.that(output)
      .containsInAnyOrder(
          KV.of("hi", 4L),
          KV.of("there", 1L),
          KV.of("sue", 2L),
          KV.of("bob", 2L),
          KV.of("", 3L),
          KV.of("ZOW", 1L));

    // Run the pipeline.
    p.run();
  }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountTest(unittest.TestCase):

  def test_count(self):
    # Our static input data, which will make up the initial PCollection.
    WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
    ]
    # Create a test pipeline.
    with TestPipeline() as p:

      # Create an input PCollection.
      input = p | beam.Create(WORDS)

      # Apply the Count transform under test.
      output = input | beam.combiners.Count.PerElement()

      # Assert on the results.
      assert_that(
        output,
        equal_to([
            ("hi", 4),
            ("there", 1),
            ("sue", 2),
            ("bob", 2),
            ("", 3),
            ("ZOW", 1)]))

      # The pipeline will run and verify the results.
import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)

// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
	return fmt.Sprintf("%s: %d", w, c)
}

// Register the functional DoFn to ensure execution on workers.
func init() {
	register.Function2x1(formatFn)
}

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Apply the Count transform under test.
		output := stats.Count(s, col)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}

端到端测试管道

您可以使用 Beam SDK 中的测试类(例如 Beam SDK for Java 中的 TestPipelinePAssert)来端到端测试整个管道。通常,要测试整个管道,您需要执行以下操作

测试 WordCount 管道

以下示例代码展示了如何测试WordCount 示例管道WordCount 通常从文本文件读取行作为输入数据;相反,测试创建一个包含一些文本行的 List<String>,并使用 Create 变换来创建一个初始 PCollection

WordCount 的最终变换(来自复合变换 CountWords)生成一个格式化的词频统计结果的 PCollection<String>,适合打印。我们的测试管道没有将该 PCollection 写入输出文本文件,而是使用 PAssert 来验证 PCollection 的元素是否与包含我们预期输出数据的静态 String 数组的元素匹配。

public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.

    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS));

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountWords(beam.PTransform):
    # CountWords transform omitted for conciseness.
    # Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py

class WordCountTest(unittest.TestCase):

  # Our input data, which will make up the initial PCollection.
  WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
  ]

  # Our output data, which is the expected data that the final PCollection must match.
  EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]

  # Example test that tests the pipeline's transforms.

  def test_count_words(self):
    with TestPipeline() as p:

      # Create a PCollection from the WORDS static input data.
      input = p | beam.Create(WORDS)

      # Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      output = input | CountWords()

      # Assert that the output PCollection matches the EXPECTED_COUNTS data.
      assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')

    # The pipeline will run and verify the results.
package wordcount

import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go

func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }

func formatFn(w string, c int) string { ... }

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Run ALL the pipeline's transforms
		// (in this case, the CountWords composite transform).
		output := CountWords(s, input)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches
		// the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}