Cdap IO

CdapIO 是一个从源读取数据或将数据写入接收器 CDAP 插件的转换。

批处理插件支持

CdapIO 当前通过引用 CDAP 插件 类名支持以下 CDAP 批处理插件

此外,还可以使用基于 Hadoop 的 InputFormatOutputFormat 的任何其他 CDAP 批处理插件。它们可以轻松地添加到支持的插件列表中,有关更多详细信息,请参阅 CdapIO 自述文件

流式插件支持

CdapIO 当前支持基于 Apache Spark Receiver 的 CDAP 流式插件。

CDAP 流式插件的要求

使用 CdapIO 进行批处理读取

为了从 CDAP 插件读取数据,您需要传递

您可以通过指定以下内容轻松地使用 ConfigWrapper 类构建 PluginConfig 对象

例如

Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();

按插件类名读取数据

某些 CDAP 插件已获得支持,只需使用插件类名即可使用。

例如

CdapIO.Read<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("read", readTransform);

使用构建的批处理插件读取数据

如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin 对象

然后,您可以将此 Plugin 对象传递给 CdapIO

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyInputFormat.class,
        MyInputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

特定 CDAP 插件的示例

CDAP Hubspot 批处理源插件

SourceHubspotConfig pluginConfig =
  new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforce 批处理源插件

SalesforceSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
  CdapIO.<Schema, LinkedHashMap>read()
    .withCdapPluginClass(SalesforceBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(Schema.class)
    .withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);

CDAP ServiceNow 批处理源插件

ServiceNowSourceConfig pluginConfig =
  new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ServiceNowSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);

CDAP Zendesk 批处理源插件

ZendeskBatchSourceConfig pluginConfig =
  new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ZendeskBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);

要了解更多信息,请查看 完整的示例

使用 CdapIO 进行批处理写入

为了写入 CDAP 插件,您需要传递

您可以通过指定以下内容轻松地使用 ConfigWrapper 类构建 PluginConfig 对象

例如

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

按插件类名写入数据

某些 CDAP 插件已获得支持,只需使用插件类名即可使用。

例如

CdapIO.Write<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(HubspotBatchSink.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

使用构建的批处理插件写入数据

如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin 对象

然后,您可以将此 Plugin 对象传递给 CdapIO

例如

CdapIO.Write<String, String> writeTransform =
  CdapIO.<String, String>write()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyOutputFormat.class,
        MyOutputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

特定 CDAP 插件的示例

CDAP Hubspot 批处理接收器插件

SinkHubspotConfig pluginConfig =
  new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);

CDAP Salesforce 批处理接收器插件

SalesforceSinkConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
  CdapIO.<NullWritable, CSVRecord>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(CSVRecord.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);

要了解更多信息,请查看 完整的示例

使用 CdapIO 进行流式读取

为了从 CDAP 插件读取数据,您需要传递

您可以通过指定以下内容轻松地使用 ConfigWrapper 类构建 PluginConfig 对象

例如

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

按插件类名读取数据

某些 CDAP 插件已获得支持,只需使用插件类名即可使用。

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

使用构建的流式插件读取数据

如果 CDAP 插件不受插件类名支持,您可以通过传递以下参数轻松地构建 Plugin 对象

然后,您可以将此 Plugin 对象传递给 CdapIO

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        MyStreamingPlugin.class,
        myGetOffsetFn,
        MyReceiver.class,
        myGetReceiverArgsFromConfigFn))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

使用可选参数读取数据

您还可以选择传递以下可选参数

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withPullFrequencySec(1L)
    .withStartOffset(1L);
p.apply("read", readTransform);

特定 CDAP 插件的示例

CDAP Hubspot 流式源插件

HubspotStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        HubspotStreamingSource.class,
        GetOffsetUtils.getOffsetFnForHubspot(),
        HubspotReceiver.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforce 流式源插件

SalesforceStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        SalesforceStreamingSource.class,
        GetOffsetUtils.getOffsetFnForSalesforce(),
        SalesforceReceiver.class,
        config -> {
          SalesforceStreamingSourceConfig salesforceConfig =
            SalesforceStreamingSourceConfig) config;
          return new Object[] {
            salesforceConfig.getAuthenticatorCredentials(),
            salesforceConfig.getPushTopicName()
          };
        }))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);

要了解更多信息,请查看 完整的示例