gpt4 book ai didi

java - 无法在 Spark Executor 中创建 DynamoDB 客户端

转载 作者:行者123 更新时间:2023-11-30 06:36:00 24 4
gpt4 key购买 nike

我需要将流数据加载到 DynamoDB 表中。我尝试了下面的代码。

  object UnResolvedLoad  {

def main(args: Array[String]){
val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate()
val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load")
tokensDf.foreachPartition { x => loadFunc(x) }
}


def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = {

val client:AmazonDynamoDB = AmazonDynamoDBClientBuilder.standard().build()
val dynamoDB:DynamoDB = new DynamoDB(client)
val table:Table = dynamoDB.getTable("UnResolvedTokens")

while(iter.hasNext){
val cur = iter.next()
val item:Item = new Item().withString("receiverId ", cur.get(2).asInstanceOf[String]).
withString("payload_id", cur.get(0).asInstanceOf[String]).
withString("payload_confirmation_code", cur.get(1).asInstanceOf[String]).
withString("token", cur.get(3).asInstanceOf[String])

table.putItem(item)

}

}

}

当我执行spark-Submit时,它无法实例化类。以下是错误消息。它说它无法实例化类。感谢帮助。有没有办法将 Spark DataSet 保存到 Amazon DynamoDB 中

, executor 5): java.lang.NoClassDefFoundError: Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22)
at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16)
at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

17/07/19 17:35:15 INFO TaskSetManager: Lost task 26.0 in stage 0.0 (TID 26) on ip-10-176-225-151.us-west-2.compute.internal, executor 5: java.lang.NoClassDefFoundError (Could not initialize class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder) [duplicate 1]
17/07/19 17:35:15 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-176-225-151.us-west-2.compute.internal, executor 5): java.lang.IllegalAccessError: tried to access class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientConfigurationFactory from class com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder.<clinit>(AmazonDynamoDBClientBuilder.java:30)
at com.dish.payloads.UnResolvedLoad$.loadFunc(UnResolvedLoad.scala:22)
at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16)
at com.dish.payloads.UnResolvedLoad$$anonfun$main$1.apply(UnResolvedLoad.scala:16)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

最佳答案

我终于能够使用较低版本的 DynamoDB API 解决这个问题。 EMR 5.7 仅支持 1.10.75.1。下面是适合我的代码。

object UnResolvedLoad  {

def main(args: Array[String]){
val spark = SparkSession.builder().appName("unresolvedload").enableHiveSupport().getOrCreate()
val tokensDf = spark.sql("select * from unresolved_logic.unresolved_dynamo_load")
tokensDf.foreachPartition { x => loadFunc(x) }
}


def loadFunc(iter : Iterator[org.apache.spark.sql.Row]) = {

val client:AmazonDynamoDBClient = new AmazonDynamoDBClient();
val usWest2 = Region.getRegion(Regions.US_WEST_2);
client.setRegion(usWest2)



while(iter.hasNext){
val cur = iter.next()

val putMap = Map("receiverId" -> new AttributeValue(cur.get(2).asInstanceOf[String]),
"payload_id" -> new AttributeValue(cur.get(0).asInstanceOf[String]),
"payload_confirmation_code" -> new AttributeValue(cur.get(1).asInstanceOf[String]),
"token" -> new AttributeValue(cur.get(3).asInstanceOf[String])).asJava

val putItemRequest:PutItemRequest = new PutItemRequest("UnResolvedTokens",putMap)
client.putItem(putItemRequest)
}

}
}

关于java - 无法在 Spark Executor 中创建 DynamoDB 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45197850/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com