- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我希望我的 Spark 应用程序从 DynamoDB 读取表,执行操作,然后将结果写入 DynamoDB。
现在,我可以将表从 DynamoDB 读取到 Spark 中,格式为 hadoopRDD
并将其转换为 DataFrame。但是,我必须使用正则表达式从 AttributeValue
中提取值。 。有更好/更优雅的方式吗?在 AWS API 中找不到任何内容。
package main.scala.util
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
object Tester {
// {S: 298905396168806365,}
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r
val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val sparkContext = spark.sparkContext
import spark.implicits._
// UDF to extract Value from AttributeValue
val col_extractValue = udf(extractValue)
// Configure connection to DynamoDB
var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
jobConf_add.set("dynamodb.input.tableName", "MyTable")
jobConf_add.set("dynamodb.output.tableName", "MyTable")
jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
// org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
// Convert HadoopRDD to RDD
val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
}
// Convert RDD to DataFrame and extract Values from AttributeValue
val df_add = rdd_add.toDF()
.withColumn("PIN", col_extractValue($"_1"))
.withColumn("Address", col_extractValue($"_2"))
.select("PIN","Address")
}
}
stackoverflow 和其他地方的许多答案都只指向 blog post和 emr-dynamodb-hadoop github 。这些资源都没有实际演示如何写入 DynamoDB。
I tried converting我的DataFrame
至RDD[Row]
没有成功。
df_add.rdd.saveAsHadoopDataset(jobConf_add)
将此 DataFrame 写入 DynamoDB 的步骤是什么? (如果你告诉我如何控制 overwrite
与 putItem
即可获得奖励积分;)
注:df_add
与 MyTable
具有相同的架构在 DynamoDB 中。
编辑:我遵循 this answer 的建议它指向 Using Spark SQL for ETL 上的这篇文章:
// Format table to DynamoDB format
val output_rdd = df_add.as[(String,String)].rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a._1) // Set value of Attribute as String. First element of tuple
ddbMap.put("PIN", PINValue) // Add to HashMap
// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a._2) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
output_rdd.saveAsHadoopDataset(jobConf_add)
但是,现在我得到 java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
尽管遵循文档...您有什么建议吗?
编辑 2:更仔细地阅读 Using Spark SQL for ETL 上的这篇文章:
After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and
DynamoDBItemWritable
types.
考虑到这一点,下面的代码正是 AWS 博客文章所建议的,除了我转换 output_df
否则作为 rdd saveAsHadoopDataset
不起作用。现在,我得到 Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
。我已经束手无策了!
// Format table to DynamoDB format
val output_df = df_add.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a.get(0).toString()) // Set value of Attribute as String
ddbMap.put("PIN", PINValue) // Add to HashMap
// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a.get(1).toString()) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
output_df.rdd.saveAsHadoopDataset(jobConf_add)
最佳答案
我跟踪了“使用 Spark SQL 进行 ETL”链接,并发现了相同的“非法循环引用”异常。该异常的解决方案非常简单(但我花了 2 天时间才弄清楚),如下所示。关键点是在数据框的 RDD 上使用映射函数,而不是数据框本身。
val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
val df_ddb = spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes
var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
val ddbMap = new HashMap[String, AttributeValue]()
for (i <- 0 to schema_ddb.length - 1) {
val value = a.get(i)
if (value != null) {
val att = new AttributeValue()
att.setS(value.toString)
ddbMap.put(schema_ddb(i)._1, att)
}
}
val item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
)
ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
关于scala - Spark 2.2.0 - 如何将 DataFrame 写入/读取 DynamoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47722648/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!