gpt4 book ai didi

apache-spark - 从 apache Spark 读取/写入 dynamo 数据库

转载 作者:行者123 更新时间:2023-12-03 07:46:06 25 4
gpt4 key购买 nike

我想知道是否有任何 Java 库支持从 apache Spark(Mesos) 读取/写入 dynamo db (AWS),我知道根据本文有一些库支持 EMR Spark https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.请指教。

谢谢普拉迪普

最佳答案

您可以使用 apache Sparkemr-dynamodb-connector 库从 DynamoDB 表中读取项目或将项目写入其中。要读取数据,您可以使用 javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class); 并将数据写入 DynamoDB:javaPairRDD.saveAsHadoopDataset(jobConf);。以下是一个示例(适用于 EMR 和非 EMR 环境):

public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf()
.setAppName("DynamoDBApplication")
.setMaster("local[4]")
.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
});

JavaSparkContext sc = new JavaSparkContext(conf);

JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite");

// read all items from DynamoDB table with name TableNameForRead
JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
System.out.println("count: " + javaPairRdd.count());

// process data in any way, below is just a simple example
JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> {
DynamoDBItemWritable item = t._2();
Map<String, AttributeValue> attrs = item.getItem();
String hashKey = attrs.get("key").getS();
Long result = Long.valueOf(attrs.get("resultAttribute").getN());
System.out.println(String.format("hashKey=%s, result=%d", hashKey, result));
return attrs;
});
System.out.println("count: " + javaRDD.count());

// update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey
JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> {
DynamoDBItemWritable item = t._2();
Map<String, AttributeValue> attrs = item.getItem();
String hashKey = attrs.get("key").getS();
String updatedHashKey = hashKey + "_new";
attrs.get("key").setS(updatedHashKey);
return new Tuple2<>(t._1(), item);
});

// write items to DynamoDB table with name TableNameForWrite
updatedJavaPairRDD.saveAsHadoopDataset(jobConf);

sc.stop();
}


private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) {
final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");

jobConf.set("dynamodb.input.tableName", tableNameForRead);
jobConf.set("dynamodb.output.tableName", tableNameForWrite);

jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY");
jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY");
jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

return jobConf;
}

要运行此代码,您需要以下 Maven 依赖项:

<dependencies>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-emr</artifactId>
<version>1.11.113</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.11.113</version>
</dependency>

<!-- https://github.com/awslabs/emr-dynamodb-connector -->
<dependency>
<groupId>com.amazon.emr</groupId>
<artifactId>emr-dynamodb-hadoop</artifactId>
<version>4.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.0</version>
</dependency>

</dependencies>

关于apache-spark - 从 apache Spark 读取/写入 dynamo 数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46797324/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com