gpt4 book ai didi

java - Spark on Java - 在所有工作人员上拥有静态对象的正确方法是什么

转载 作者:行者123 更新时间:2023-12-02 06:21:25 26 4
gpt4 key购买 nike

我需要在 Spark 中所有执行器的函数中使用不可序列化的第三方类,例如:

JavaRDD<String> resRdd = origRdd
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {

//A DynamoDB mapper I don't want to initialise every time
DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials));

Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav();

return userFav;
}
});

我想要一个静态的 DynamoDBMapper 映射器,我为每个执行器初始化一次并能够反复使用它。

由于它不是可序列化的,因此我无法在驱动器中初始化它并广播它。

注意:这是这里的答案( What is the right way to have a static object on all workers ),但仅适用于 Scala。

最佳答案

您可以使用mapPartitionforeachPartition。这是摘自 Learning Spark 的片段

By using partition- based operations, we can share a connection pool to this database to avoid setting up many connections, and reuse our JSON parser. As Examples 6-10 through 6-12 show, we use the mapPartitions() function, which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results.

这允许我们为每个执行程序初始化一个连接,然后根据需要迭代分区中的元素。这对于将数据保存到某些外部数据库或创建昂贵的可重用对象非常有用。

这是一个简单的 scala 示例,摘自链接的书籍。如果需要的话可以将其翻译成java。只是在这里展示 mapPartition 和 foreachPartition 的简单用例。

ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition =>
// Open connection to storage system (e.g. a database connection)
partition.foreach { item =>
// Use connection to push item to system
}
// Close connection
}
}

Here is a link一个java例子。

关于java - Spark on Java - 在所有工作人员上拥有静态对象的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35018033/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com