Cdap IO
CdapIO
是一个从源读取数据或将数据写入接收器 CDAP 插件的转换。
批处理插件支持
CdapIO
当前通过引用 CDAP 插件
类名支持以下 CDAP 批处理插件
此外,还可以使用基于 Hadoop 的 InputFormat
或 OutputFormat
的任何其他 CDAP 批处理插件。它们可以轻松地添加到支持的插件列表中,有关更多详细信息,请参阅 CdapIO 自述文件。
流式插件支持
CdapIO
当前支持基于 Apache Spark Receiver 的 CDAP 流式插件。
CDAP 流式插件的要求
- CDAP 流式插件应基于
Spark Receiver
(Spark 2.4)。 - CDAP 流式插件应支持使用偏移量。
- 相应的 Spark Receiver 应实现 HasOffset 接口。
- 记录应具有表示记录偏移量的数字字段。
使用 CdapIO 进行批处理读取
为了从 CDAP 插件读取数据,您需要传递
Key
和Value
类。您需要检查这些类是否具有可用的 Beam Coder。PluginConfig
对象,其中包含特定 CDAP 插件的参数。
您可以通过指定以下内容轻松地使用 ConfigWrapper
类构建 PluginConfig
对象
- 所需的
PluginConfig
的类。 Map<String, Object>
参数映射,用于相应的 CDAP 插件。
例如
Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();
按插件类名读取数据
某些 CDAP 插件已获得支持,只需使用插件类名即可使用。
例如
使用构建的批处理插件读取数据
如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin
对象
- CDAP 批处理插件的类。
- 用于连接到所选 CDAP 插件的
InputFormat
类。 - 用于提供
InputFormat
的InputFormatProvider
类。
然后,您可以将此 Plugin
对象传递给 CdapIO
。
例如
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyInputFormat.class,
MyInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
p.apply("read", readTransform);
特定 CDAP 插件的示例
CDAP Hubspot 批处理源插件
SourceHubspotConfig pluginConfig =
new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
CdapIO.<NullWritable, JsonElement>read()
.withCdapPluginClass(HubspotBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce 批处理源插件
SalesforceSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
CdapIO.<Schema, LinkedHashMap>read()
.withCdapPluginClass(SalesforceBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(Schema.class)
.withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);
CDAP ServiceNow 批处理源插件
ServiceNowSourceConfig pluginConfig =
new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ServiceNowSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);
CDAP Zendesk 批处理源插件
ZendeskBatchSourceConfig pluginConfig =
new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ZendeskBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);
要了解更多信息,请查看 完整的示例。
使用 CdapIO 进行批处理写入
为了写入 CDAP 插件,您需要传递
Key
和Value
类。您需要检查这些类是否具有可用的 Beam Coder。locksDirPath
,这是存储锁的锁目录路径。此参数是 Hadoop 外部同步(获取与写入作业相关的锁的机制)所需的。PluginConfig
对象,其中包含特定 CDAP 插件的参数。
您可以通过指定以下内容轻松地使用 ConfigWrapper
类构建 PluginConfig
对象
- 所需的
PluginConfig
的类。 Map<String, Object>
参数映射,用于相应的 CDAP 插件。
例如
按插件类名写入数据
某些 CDAP 插件已获得支持,只需使用插件类名即可使用。
例如
使用构建的批处理插件写入数据
如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin
对象
- CDAP 插件的类。
- 用于连接到所选 CDAP 插件的
OutputFormat
类。 - 用于提供
OutputFormat
的OutputFormatProvider
类。
然后,您可以将此 Plugin
对象传递给 CdapIO
。
例如
CdapIO.Write<String, String> writeTransform =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyOutputFormat.class,
MyOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);
特定 CDAP 插件的示例
CDAP Hubspot 批处理接收器插件
SinkHubspotConfig pluginConfig =
new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
CdapIO.<NullWritable, String>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);
CDAP Salesforce 批处理接收器插件
SalesforceSinkConfig pluginConfig =
new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
CdapIO.<NullWritable, CSVRecord>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(CSVRecord.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);
要了解更多信息,请查看 完整的示例。
使用 CdapIO 进行流式读取
为了从 CDAP 插件读取数据,您需要传递
Key
和Value
类。您需要检查这些类是否具有可用的 Beam Coder。PluginConfig
对象,其中包含特定 CDAP 插件的参数。
您可以通过指定以下内容轻松地使用 ConfigWrapper
类构建 PluginConfig
对象
- 所需的
PluginConfig
的类。 Map<String, Object>
参数映射,用于相应的 CDAP 插件。
例如
按插件类名读取数据
某些 CDAP 插件已获得支持,只需使用插件类名即可使用。
例如
使用构建的流式插件读取数据
如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin
对象
- CDAP 流式插件的类。
getOffsetFn
,这是一个SerializableFunction
,用于定义如何从记录中获取Long
记录偏移量。receiverClass
,这是与 CDAP 插件关联的 Spark (v 2.4)Receiver
类。- (可选)
getReceiverArgsFromConfigFn
,这是一个SerializableFunction
,用于定义如何使用PluginConfig
对象获取 SparkReceiver
的构造函数参数。
然后,您可以将此 Plugin
对象传递给 CdapIO
。
例如
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
MyStreamingPlugin.class,
myGetOffsetFn,
MyReceiver.class,
myGetReceiverArgsFromConfigFn))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("read", readTransform);
使用可选参数读取数据
您还可以选择传递以下可选参数
pullFrequencySec
,这是轮询新记录更新之间的延迟(以秒为单位)。startOffset
,这是应开始读取的包含的起始偏移量。
例如
特定 CDAP 插件的示例
CDAP Hubspot 流式源插件
HubspotStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
HubspotStreamingSource.class,
GetOffsetUtils.getOffsetFnForHubspot(),
HubspotReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce 流式源插件
SalesforceStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
SalesforceStreamingSource.class,
GetOffsetUtils.getOffsetFnForSalesforce(),
SalesforceReceiver.class,
config -> {
SalesforceStreamingSourceConfig salesforceConfig =
SalesforceStreamingSourceConfig) config;
return new Object[] {
salesforceConfig.getAuthenticatorCredentials(),
salesforceConfig.getPushTopicName()
};
}))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);
要了解更多信息,请查看 完整的示例。
最后更新时间:2024/10/31
您是否找到了所需的所有内容?
所有内容是否都很有用且清晰?您是否想更改任何内容?请告诉我们!