gpt4 book ai didi

java - 使用 BigQuery 仅映射 MapReduce 作业

转载 作者:可可西里 更新时间:2023-11-01 16:25:51 25 4
gpt4 key购买 nike

我们创建了一个 Mapreduce 作业以将数据注入(inject) BigQuery。我们的工作中没有太多的过滤功能,因此我们希望将其设为仅限 map 的工作,以使其更快、更高效。

但是,BigQuery 接受的 java 类“com.google.gson.JsonObject”并未实现 hadoop Mapper 接口(interface)所需的 Writable 接口(interface)。 JsonObject 也是最终的,我们不能扩展它......

关于我们如何解决这个问题有什么建议吗?

谢谢,

最佳答案

补充 William 的回复:我想自己测试一下,我创建了一个安装了 bigquery 连接器的新集群,然后运行了以下仅映射的作业:

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat;
import com.google.common.base.Splitter;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.regex.Pattern;

/**
* An example MapOnlyJob with BigQuery output
*/
public class MapOnlyJob {
public static class MapOnlyMapper extends Mapper<LongWritable, Text, LongWritable, JsonObject> {
private static final LongWritable KEY_OUT = new LongWritable(0L);
// This requires a new version of guava be included in a shaded / repackaged libjar.
private static final Splitter SPLITTER =
Splitter.on(Pattern.compile("\\s+"))
.trimResults()
.omitEmptyStrings();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
for (String word : SPLITTER.split(line)) {
JsonObject json = new JsonObject();
json.addProperty("word", word);
json.addProperty("mapKey", key.get());
context.write(KEY_OUT, json);
}
}
}

/**
* Configures and runs the main Hadoop job.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {

GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();

if (args.length != 3) {
System.out.println("Usage: hadoop MapOnlyJob "
+ "[projectId] [input_file] [fullyQualifiedOutputTableId]");
String indent = " ";
System.out.println(indent
+ "projectId - Project under which to issue the BigQuery operations. "
+ "Also serves as the default project for table IDs which don't explicitly specify a "
+ "project for the table.");
System.out.println(indent
+ "input_file - Input file pattern of the form "
+ "gs://foo/bar*.txt or hdfs:///foo/bar*.txt or foo*.txt");
System.out.println(indent
+ "fullyQualifiedOutputTableId - Output table ID of the form "
+ "<optional projectId>:<datasetId>.<tableId>");
System.exit(1);
}

// Global parameters from args.
String projectId = args[0];

// Set InputFormat parameters from args.
String inputPattern = args[1];

// Set OutputFormat parameters from args.
String fullyQualifiedOutputTableId = args[2];

// Default OutputFormat parameters for this sample.
String outputTableSchema =
"[{'name': 'word','type': 'STRING'},{'name': 'mapKey','type': 'INTEGER'}]";

Configuration conf = parser.getConfiguration();
Job job = Job.getInstance(conf);
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Set classes and configure them:
job.setOutputFormatClass(BigQueryOutputFormat.class);
BigQueryConfiguration.configureBigQueryOutput(
job.getConfiguration() /* Required as Job made a new Configuration object */,
fullyQualifiedOutputTableId,
outputTableSchema);
// Configure file-based input:
FileInputFormat.setInputPaths(job, inputPattern);

job.setJarByClass(MapOnlyMapper.class);
job.setMapperClass(MapOnlyMapper.class);
// The key will be discarded by BigQueryOutputFormat.
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(JsonObject.class);
// Make map-only
job.setNumReduceTasks(0);

job.waitForCompletion(true);
}
}

我有以下依赖项:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigquery-connector</artifactId>
<version>0.7.0-hadoop1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>

关于java - 使用 BigQuery 仅映射 MapReduce 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30484670/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com