Hadoop 输入/输出格式 IO

重要!Apache Beam 2.10 开始,名为 HadoopInputFormatIO 的 Hadoop 输入格式 IO 的先前实现已弃用。请使用当前的 HadoopFormatIO,它同时支持 InputFormatOutputFormat

HadoopFormatIO 是一种转换,用于从任何源读取数据或将数据写入任何实现 Hadoop 的 InputFormatOutputFormat 的接收器。例如,Cassandra、Elasticsearch、HBase、Redis、Postgres 等。

HadoopFormatIO 允许您连接到许多尚未拥有 Beam IO 转换的数据源/接收器。但是,HadoopFormatIO 在连接到 InputFormatOutputFormat 时必须进行一些性能权衡。因此,如果存在另一个 Beam IO 转换专门用于连接到您选择的数据源/接收器,我们建议您使用该转换。

使用 HadoopFormatIO 读取

您需要传递一个 Hadoop Configuration,其中包含指定读取方式的参数。Configuration 的许多属性是可选的,而某些属性对于某些 InputFormat 类是必需的,但以下属性必须为所有 InputFormat 类设置

例如

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
  InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

您需要检查 InputFormat 输出的 KeyValue 类是否具有可用的 Beam Coder。如果没有,您可以使用 withKeyTranslationwithValueTranslation 来指定将这些类实例转换为 Beam Coder 支持的另一个类的方法。这些设置是可选的,您不需要为键和值都指定转换。

例如

SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
  public MyKeyClass apply(InputFormatKeyClass input) {
  // ...logic to transform InputFormatKeyClass to MyKeyClass
  }
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
  public MyValueClass apply(InputFormatValueClass input) {
  // ...logic to transform InputFormatValueClass to MyValueClass
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

仅使用 Hadoop 配置读取数据。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用配置和键转换读取数据

例如,Beam Coder 不适用于 Key 类,因此需要键转换。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用配置和值转换读取数据

例如,Beam Coder 不适用于 Value 类,因此需要值转换。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用配置、值转换和键转换读取数据

例如,Beam Coders 不适用于 InputFormatKey 类和 Value 类,因此需要键和值转换。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

特定 InputFormats 的示例

Cassandra - CqlInputFormat

要从 Cassandra 读取数据,请使用 org.apache.cassandra.hadoop.cql3.CqlInputFormat,它需要设置以下属性

Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

按如下所示调用 Read 转换

PCollection<KV<Long, String>> cassandraData =
  p.apply("read",
  HadoopFormatIO.<Long, String>read()
  .withConfiguration(cassandraConf)
  .withValueTranslation(cassandraOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

CqlInputFormat 键类是 java.lang.Long Long,它具有 Beam CoderCqlInputFormat 值类是 com.datastax.driver.core.Row Row,它没有 Beam Coder。您可以提供自己的转换方法,而不是编写新的编码器,如下所示

SimpleFunction<Row, String> cassandraOutputValueType = SimpleFunction<Row, String>()
{
  public String apply(Row row) {
    return row.getString('myColName');
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Elasticsearch - EsInputFormat

要从 Elasticsearch 读取数据,请使用 EsInputFormat,它需要设置以下属性

Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

按如下所示调用 Read 转换

PCollection<KV<Text, LinkedMapWritable>> elasticData = p.apply("read",
  HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(elasticsearchConf));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

org.elasticsearch.hadoop.mr.EsInputFormatEsInputFormat 键类是 org.apache.hadoop.io.Text Text,其值类是 org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable。键和值类都具有 Beam Coders。

HCatalog - HCatInputFormat

要使用 HCatalog 读取数据,请使用 org.apache.hive.hcatalog.mapreduce.HCatInputFormat,它需要设置以下属性

Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");

org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

按如下所示调用 Read 转换

PCollection<KV<Long, HCatRecord>> hcatData =
  p.apply("read",
  HadoopFormatIO.<Long, HCatRecord>read()
  .withConfiguration(hcatConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Amazon DynamoDB - DynamoDBInputFormat

要从 Amazon DynamoDB 读取数据,请使用 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat。DynamoDBInputFormat 实现旧的 org.apache.hadoop.mapred.InputFormat 接口,为了使其与使用较新的抽象类 org.apache.hadoop.mapreduce.InputFormat 的 HadoopFormatIO 兼容,需要一个包装器 API 作为 HadoopFormatIO 和 DynamoDBInputFormat(或通常是实现 org.apache.hadoop.mapred.InputFormat 的任何 InputFormat)之间的适配器。以下示例使用一个这样的可用的包装器 API - https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java

Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

按如下所示调用 Read 转换

PCollection<Text, DynamoDBItemWritable> dynamoDBData =
  p.apply("read",
  HadoopFormatIO.<Text, DynamoDBItemWritable>read()
  .withConfiguration(dynamoDBConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Apache HBase - TableSnapshotInputFormat

要从 HBase 表快照读取数据,请使用 org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat。从表快照读取会绕过 HBase 区域服务器,而是直接从文件系统读取 HBase 数据文件。这对于读取历史数据或从 HBase 集群卸载工作等情况非常有用。在某些情况下,这可能比使用 HBaseIO 通过区域服务器访问内容更快。

可以使用 HBase shell 或以编程方式获取表快照

try (
    Connection connection = ConnectionFactory.createConnection(hbaseConf);
    Admin admin = connection.getAdmin()
  ) {
  admin.snapshot(
    "my_snaphshot",
    TableName.valueOf("my_table"),
    HBaseProtos.SnapshotDescription.Type.FLUSH);
}
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

TableSnapshotInputFormat 的配置如下

// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));

Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
    "mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));

// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

按如下所示调用 Read 转换

PCollection<ImmutableBytesWritable, Result> hbaseSnapshotData =
  p.apply("read",
  HadoopFormatIO.<ImmutableBytesWritable, Result>read()
  .withConfiguration(hbaseConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用 HadoopFormatIO 写入

您需要传递一个 Hadoop Configuration,其中包含指定写入方式的参数。Configuration 的许多属性是可选的,而某些属性对于某些 OutputFormat 类是必需的,但以下属性必须为所有 OutputFormat 设置

注意:所有提到的值都有相应的常量。例如:HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR

例如

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
   MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
   MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
   MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
   MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

您需要在 Hadoop Configuration 中设置OutputFormat 的键和值类(即“mapreduce.job.output.key.class”和“mapreduce.job.output.value.class”),它们分别等于KeyTValueT。如果设置的OutputFormat 键或值类与OutputFormat 的实际键或值类不同,则会抛出IllegalArgumentException

批量写入

// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...

// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;

boundedWordsCount.apply(
    "writeBatch",
    HadoopFormatIO.<Text, LongWritable>write()
        .withConfiguration(myHadoopConfiguration)
        .withPartitioning()
        .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

流写入

// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;

// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
  configTransform = ...;

unboundedWordsCount.apply(
  "writeStream",
  HadoopFormatIO.<Text, LongWritable>write()
      .withConfigurationTransform(configTransform)
      .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.