Hadoop 输入/输出格式 IO
重要! 从 Apache Beam 2.10 开始,名为
HadoopInputFormatIO
的 Hadoop 输入格式 IO 的先前实现已弃用。请使用当前的HadoopFormatIO
,它同时支持InputFormat
和OutputFormat
。
HadoopFormatIO
是一种转换,用于从任何源读取数据或将数据写入任何实现 Hadoop 的 InputFormat
或 OutputFormat
的接收器。例如,Cassandra、Elasticsearch、HBase、Redis、Postgres 等。
HadoopFormatIO
允许您连接到许多尚未拥有 Beam IO 转换的数据源/接收器。但是,HadoopFormatIO
在连接到 InputFormat
或 OutputFormat
时必须进行一些性能权衡。因此,如果存在另一个 Beam IO 转换专门用于连接到您选择的数据源/接收器,我们建议您使用该转换。
使用 HadoopFormatIO 读取
您需要传递一个 Hadoop Configuration
,其中包含指定读取方式的参数。Configuration
的许多属性是可选的,而某些属性对于某些 InputFormat
类是必需的,但以下属性必须为所有 InputFormat
类设置
mapreduce.job.inputformat.class
- 用于连接到您选择的数据源的InputFormat
类。key.class
- 由mapreduce.job.inputformat.class
中的InputFormat
返回的Key
类。value.class
- 由mapreduce.job.inputformat.class
中的InputFormat
返回的Value
类。
例如
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
您需要检查 InputFormat
输出的 Key
和 Value
类是否具有可用的 Beam Coder
。如果没有,您可以使用 withKeyTranslation
或 withValueTranslation
来指定将这些类实例转换为 Beam Coder
支持的另一个类的方法。这些设置是可选的,您不需要为键和值都指定转换。
例如
SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
public MyKeyClass apply(InputFormatKeyClass input) {
// ...logic to transform InputFormatKeyClass to MyKeyClass
}
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
public MyValueClass apply(InputFormatValueClass input) {
// ...logic to transform InputFormatValueClass to MyValueClass
}
};
仅使用 Hadoop 配置读取数据。
使用配置和键转换读取数据
例如,Beam Coder
不适用于 Key
类,因此需要键转换。
使用配置和值转换读取数据
例如,Beam Coder
不适用于 Value
类,因此需要值转换。
使用配置、值转换和键转换读取数据
例如,Beam Coders 不适用于 InputFormat
的 Key
类和 Value
类,因此需要键和值转换。
特定 InputFormats 的示例
Cassandra - CqlInputFormat
要从 Cassandra 读取数据,请使用 org.apache.cassandra.hadoop.cql3.CqlInputFormat
,它需要设置以下属性
Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
按如下所示调用 Read 转换
CqlInputFormat
键类是 java.lang.Long
Long
,它具有 Beam Coder
。CqlInputFormat
值类是 com.datastax.driver.core.Row
Row
,它没有 Beam Coder
。您可以提供自己的转换方法,而不是编写新的编码器,如下所示
Elasticsearch - EsInputFormat
要从 Elasticsearch 读取数据,请使用 EsInputFormat
,它需要设置以下属性
Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
按如下所示调用 Read 转换
org.elasticsearch.hadoop.mr.EsInputFormat
的 EsInputFormat
键类是 org.apache.hadoop.io.Text
Text
,其值类是 org.elasticsearch.hadoop.mr.LinkedMapWritable
LinkedMapWritable
。键和值类都具有 Beam Coders。
HCatalog - HCatInputFormat
要使用 HCatalog 读取数据,请使用 org.apache.hive.hcatalog.mapreduce.HCatInputFormat
,它需要设置以下属性
Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
按如下所示调用 Read 转换
Amazon DynamoDB - DynamoDBInputFormat
要从 Amazon DynamoDB 读取数据,请使用 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
。DynamoDBInputFormat 实现旧的 org.apache.hadoop.mapred.InputFormat
接口,为了使其与使用较新的抽象类 org.apache.hadoop.mapreduce.InputFormat
的 HadoopFormatIO 兼容,需要一个包装器 API 作为 HadoopFormatIO 和 DynamoDBInputFormat(或通常是实现 org.apache.hadoop.mapred.InputFormat
的任何 InputFormat)之间的适配器。以下示例使用一个这样的可用的包装器 API - https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java
Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
按如下所示调用 Read 转换
Apache HBase - TableSnapshotInputFormat
要从 HBase 表快照读取数据,请使用 org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
。从表快照读取会绕过 HBase 区域服务器,而是直接从文件系统读取 HBase 数据文件。这对于读取历史数据或从 HBase 集群卸载工作等情况非常有用。在某些情况下,这可能比使用 HBaseIO
通过区域服务器访问内容更快。
可以使用 HBase shell 或以编程方式获取表快照
TableSnapshotInputFormat
的配置如下
// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
"mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
按如下所示调用 Read 转换
使用 HadoopFormatIO 写入
您需要传递一个 Hadoop Configuration
,其中包含指定写入方式的参数。Configuration
的许多属性是可选的,而某些属性对于某些 OutputFormat
类是必需的,但以下属性必须为所有 OutputFormat
设置
mapreduce.job.id
- 写入作业的标识符。例如:窗口的结束时间戳。mapreduce.job.outputformat.class
- 用于连接到您选择的数据接收器的OutputFormat
类。mapreduce.job.output.key.class
- 传递给mapreduce.job.outputformat.class
中OutputFormat
的键类。mapreduce.job.output.value.class
- 传递给mapreduce.job.outputformat.class
中OutputFormat
的值类。mapreduce.job.reduces
- 减少任务的数量。该值等于将生成的写入任务的数量。对于Write.PartitionedWriterBuilder#withoutPartitioning()
写入,此属性不是必需的。mapreduce.job.partitioner.class
- 用于将记录分配到分区之间的 Hadoop 分区器类。对于Write.PartitionedWriterBuilder#withoutPartitioning()
写入,此属性不是必需的。
注意:所有提到的值都有相应的常量。例如:HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR
。
例如
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
您需要在 Hadoop Configuration
中设置OutputFormat
的键和值类(即“mapreduce.job.output.key.class”和“mapreduce.job.output.value.class”),它们分别等于KeyT
和ValueT
。如果设置的OutputFormat
键或值类与OutputFormat
的实际键或值类不同,则会抛出IllegalArgumentException
。
批量写入
// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;
boundedWordsCount.apply(
"writeBatch",
HadoopFormatIO.<Text, LongWritable>write()
.withConfiguration(myHadoopConfiguration)
.withPartitioning()
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
流写入
// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
configTransform = ...;
unboundedWordsCount.apply(
"writeStream",
HadoopFormatIO.<Text, LongWritable>write()
.withConfigurationTransform(configTransform)
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
最后更新时间:2024/10/31
您找到您要找的所有内容了吗?
所有内容是否有用且清晰?您想更改任何内容吗?请告诉我们!