SparkReceiver IO
SparkReceiverIO 是一个从 Apache Spark Receiver 读取数据的无界源转换。
Spark Receiver 支持
SparkReceiverIO
目前支持 Apache Spark Receiver.
Spark Receiver
的要求
- Spark 版本应为 2.4.*。
Spark Receiver
应支持与偏移量的协同工作。Spark Receiver
应实现 HasOffset 接口。- 记录应具有表示记录偏移量的数字字段。
有关更多详细信息,请参见 SparkReceiverIO 自述文件.
使用 SparkReceiverIO 进行流式读取
为了从 Spark Receiver
读取数据,你需要传入
getOffsetFn
,它是一个SerializableFunction
,用于定义如何从记录中获取Long
记录偏移量。receiverBuilder
,它用于构建使用 Apache Beam 机制(而不是 Spark 环境)的Spark Receiver
实例。
你可以通过传入以下参数轻松创建 receiverBuilder
对象
- 你的
Spark Receiver
的类。 - 创建你的
Spark Receiver
实例所需的构造函数参数。
例如
//In this example, MyReceiver accepts a MyConfig object as its only constructor parameter.
MyConfig myPluginConfig = new MyConfig(authToken, apiServerUrl);
Object[] myConstructorArgs = new Object[] {myConfig};
ReceiverBuilder<String, MyReceiver<String>> myReceiverBuilder =
new ReceiverBuilder<>(MyReceiver.class)
.withConstructorArgs(myConstructorArgs);
然后,你可以将此 receiverBuilder
对象传入 SparkReceiverIO
。
例如
使用可选参数读取数据
你还可以选择传入以下可选参数
pullFrequencySec
,它表示轮询新记录更新之间的延迟时间(以秒为单位)。startOffset
,它表示读取应该从哪个(包含)起始偏移量开始。timestampFn
,它是一个SerializableFunction
,用于定义如何从记录中获取Instant
时间戳。
例如
特定 Spark Receiver 的示例
CDAP Hubspot Receiver
ReceiverBuilder<String, HubspotReceiver<String>> hubspotReceiverBuilder =
new ReceiverBuilder<>(HubspotReceiver.class)
.withConstructorArgs(hubspotConfig);
SparkReceiverIO.Read<String> readTransform =
SparkReceiverIO.<String>read()
.withGetOffsetFn(GetOffsetUtils.getOffsetFnForHubspot())
.withSparkReceiverBuilder(hubspotReceiverBuilder)
p.apply("readFromHubspotReceiver", readTransform);
最后更新时间:2024/10/31
你是否找到了你想要的所有内容?
所有内容都实用且清晰吗?你有什么想要更改的内容吗?请告诉我们!