gpt4 book ai didi

hadoop - hadoop临时表中的Dataproc冲突

转载 作者:行者123 更新时间:2023-12-02 20:42:31 24 4
gpt4 key购买 nike

我有一个流程,可针对不同区域在Dataproc集群上并行执行spark作业。它为每个区域创建一个集群,执行 Spark 作业,并在完成后删除该集群。

spark作业使用传递BigQuery Configurationorg.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset方法将数据保存在BigQuery表上。该作业将数据保存在一个以上的表中,每个作业多次调用saveAsNewAPIHadoopDataset方法。

问题是,有时我会因内部创建以运行作业的Hadoop临时BigQuery数据集冲突而引起错误:

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
"code" : 409,
"errors" : [ {
"domain" : "global",
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
"reason" : "duplicate"
} ],
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
...

上述异常(exception)的时间戳 201802250620_0013 具有 _0013 后缀,我不确定它是否代表时间。

我的想法是,有时作业同时运行,并尝试创建名称相同的时间戳数据集。在并行作业中或在另一个saveAsNewAPIHadoopDataset调用的同一作业中。

我们如何避免此错误而又不耽误工作执行?

我正在使用的依赖项是:
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigquery-connector</artifactId>
<version>0.10.2-hadoop2</version>
<scope>provided</scope>
</dependency>

Dataproc镜像版本为1.1

编辑1:

我尝试使用 IndirectBigQueryOutputFormat,但现在出现错误,说gcs输出路径已经存在,即使在每个 saveAsNewAPIHadoopDataset调用中传递了不同的时间也是如此。

这是我的代码:
SparkConf sc =新的SparkConf()。setAppName(“MyApp”);
try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();

objsJson
.filter(new FilterType(MSG_TYPE1))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));

objsJson
.filter(new FilterType(MSG_TYPE2))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));

objsJson
.filter(new FilterType(MSG_TYPE3))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));

// here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}

Configuration createConf(String table, String outGCS) {
Configuration conf = new Configuration();
BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
return conf;
}

最佳答案

我相信可能正在发生的事情是每个映射器都试图创建自己的数据集。这是相当低效的(并且每天消耗的配额与映射器的数量成比例)。

一种替代方法是对输出类使用IndirectBigQueryOutputFormat:

IndirectBigQueryOutputFormat works by first buffering all the data into a Cloud Storage temporary table, and then, on commitJob, copies all data from Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs since it only requires one BigQuery "load" job per Hadoop/Spark job, as compared to BigQueryOutputFormat, which performs one BigQuery job for each Hadoop/Spark task.



请参见此处的示例: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

关于hadoop - hadoop临时表中的Dataproc冲突,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49267443/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com