管道选项模式
此页面上的示例向您展示了常见的管道配置。有关管道配置选项的更多信息,请参阅 创建管道 和 配置管道选项.
- Java SDK
- Python SDK
追溯性记录运行时参数
使用 ValueProvider
接口在完成管道作业后访问运行时参数。
您可以使用 ValueProvider
接口将运行时参数传递给您的管道,但您只能在 Beam DAG 内记录这些参数。一种解决方案是添加一个包含 DoFn
的管道 分支,该 DoFn
处理占位符值,然后记录运行时参数
/** Sample of PipelineOptions with a ValueProvider option argument. */
public interface MyOptions extends PipelineOptions {
@Description("My option")
@Default.String("Hello world!")
ValueProvider<String> getStringValue();
void setStringValue(ValueProvider<String> value);
}
public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
// Create pipeline.
Pipeline p = Pipeline.create(options);
// Add a branch for logging the ValueProvider value.
p.apply(Create.of(1))
.apply(
ParDo.of(
new DoFn<Integer, Integer>() {
// Define the DoFn that logs the ValueProvider value.
@ProcessElement
public void process(ProcessContext c) {
MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
// This example logs the ValueProvider value, but you could store it by
// pushing it to an external database.
LOG.info("Option StringValue was {}", ops.getStringValue());
}
}));
// The main pipeline.
p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());
p.run();
}
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--string_value', type=str)
class LogValueProvidersFn(beam.DoFn):
def __init__(self, string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value, but
# you could store it by pushing it to an external database.
def process(self, an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value', str, ''))
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Create pipeline.
with beam.Pipeline(options=beam_options) as pipeline:
# Add a branch for logging the ValueProvider value.
_ = (
pipeline
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))
# The main pipeline.
result_pc = (
pipeline
| "main_pc" >> beam.Create([1, 2, 3])
| beam.combiners.Sum.Globally())
最后更新于 2024/10/31
您是否找到了您要找的所有内容?
所有内容都实用且清晰吗?您想更改任何内容吗?请告诉我们!