gpt4 book ai didi

scala - Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB

转载 作者:行者123 更新时间:2023-12-03 00:30:40 24 4
gpt4 key购买 nike

我希望我的 Spark 应用程序从 DynamoDB 读取表,执行操作,然后将结果写入 DynamoDB。

将表读入 DataFrame

现在,我可以将表从 DynamoDB 读取到 Spark 中,格式为 hadoopRDD并将其转换为 DataFrame。但是,我必须使用正则表达式从 AttributeValue 中提取值。 。有更好/更优雅的方式吗?在 AWS API 中找不到任何内容。

package main.scala.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable

object Tester {

// {S: 298905396168806365,}
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r

val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}


def main(args: Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val sparkContext = spark.sparkContext

import spark.implicits._

// UDF to extract Value from AttributeValue
val col_extractValue = udf(extractValue)

// Configure connection to DynamoDB
var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
jobConf_add.set("dynamodb.input.tableName", "MyTable")
jobConf_add.set("dynamodb.output.tableName", "MyTable")
jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")


// org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

// Convert HadoopRDD to RDD
val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
}

// Convert RDD to DataFrame and extract Values from AttributeValue
val df_add = rdd_add.toDF()
.withColumn("PIN", col_extractValue($"_1"))
.withColumn("Address", col_extractValue($"_2"))
.select("PIN","Address")
}
}

将 DataFrame 写入 DynamoDB

stackoverflow 和其他地方的许多答案都只指向 blog postemr-dynamodb-hadoop github 。这些资源都没有实际演示如何写入 DynamoDB。

I tried converting我的DataFrameRDD[Row]没有成功。

df_add.rdd.saveAsHadoopDataset(jobConf_add)

将此 DataFrame 写入 DynamoDB 的步骤是什么? (如果你告诉我如何控制 overwriteputItem 即可获得奖励积分;)

注:df_addMyTable 具有相同的架构在 DynamoDB 中。

编辑:我遵循 this answer 的建议它指向 Using Spark SQL for ETL 上的这篇文章:

// Format table to DynamoDB format
val output_rdd = df_add.as[(String,String)].rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()

// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a._1) // Set value of Attribute as String. First element of tuple
ddbMap.put("PIN", PINValue) // Add to HashMap

// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a._2) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap

var item = new DynamoDBItemWritable()
item.setItem(ddbMap)

(new Text(""), item)
})

output_rdd.saveAsHadoopDataset(jobConf_add)

但是,现在我得到 java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text尽管遵循文档...您有什么建议吗?

编辑 2:更仔细地阅读 Using Spark SQL for ETL 上的这篇文章:

After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

考虑到这一点,下面的代码正是 AWS 博客文章所建议的,除了我转换 output_df否则作为 rdd saveAsHadoopDataset不起作用。现在,我得到 Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience 。我已经束手无策了!

      // Format table to DynamoDB format
val output_df = df_add.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()

// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a.get(0).toString()) // Set value of Attribute as String
ddbMap.put("PIN", PINValue) // Add to HashMap

// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a.get(1).toString()) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap

var item = new DynamoDBItemWritable()
item.setItem(ddbMap)

(new Text(""), item)
})

output_df.rdd.saveAsHadoopDataset(jobConf_add)

最佳答案

我跟踪了“使用 Spark SQL 进行 ETL”链接,并发现了相同的“非法循环引用”异常。该异常的解决方案非常简单(但我花了 2 天时间才弄清楚),如下所示。关键点是在数据框的 RDD 上使用映射函数,而不是数据框本身。

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")


val df_ddb = spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes

var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
val ddbMap = new HashMap[String, AttributeValue]()

for (i <- 0 to schema_ddb.length - 1) {
val value = a.get(i)
if (value != null) {
val att = new AttributeValue()
att.setS(value.toString)
ddbMap.put(schema_ddb(i)._1, att)
}
}

val item = new DynamoDBItemWritable()
item.setItem(ddbMap)

(new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

关于scala - Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47722648/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com