gpt4 book ai didi

java - Spark 将每个 Action 执行两次

转载 作者:搜寻专家 更新时间:2023-11-01 03:01:15 25 4
gpt4 key购买 nike

我创建了一个简单的 Java 应用程序,它使用 Apache Spark 从 Cassandra 检索数据,对其进行一些转换并将其保存在另一个 Cassandra 表中。

我使用的是 Apache Spark 1.4.1,配置为独立集群模式,在我的机器上有一个主机和一个从机。

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
"WHERE CAST(store_id as string) = '" + storeId + "'");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE CAST(store_id as string) = '" + storeId + "' AND product_id = " + productId + "");

// We need only the customers who did not order the product
// We cache the DataFrame because we use it twice.
DataFrame customersWhoHaventOrderedTheProduct = customers
.join(customersWhoOrderedTheProduct
.select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter")
.where(customersWhoOrderedTheProduct.col("email").isNull())
.drop(customersWhoOrderedTheProduct.col("email"))
.cache();

int numberOfCustomers = (int) customersWhoHaventOrderedTheProduct.count();

Date reportTime = new Date();

// Prepare the Broadcast values. They are used in the map below.
Broadcast<String> bStoreId = sparkContext.broadcast(storeId, classTag(String.class));
Broadcast<String> bReportName = sparkContext.broadcast(MessageBrokerQueue.report_did_not_buy_product.toString(), classTag(String.class));
Broadcast<java.sql.Timestamp> bReportTime = sparkContext.broadcast(new java.sql.Timestamp(reportTime.getTime()), classTag(java.sql.Timestamp.class));
Broadcast<Integer> bNumberOfCustomers = sparkContext.broadcast(numberOfCustomers, classTag(Integer.class));

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
.map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
.option("keyspace", "my_keyspace")
.option("table", "my_report")
.format("org.apache.spark.sql.cassandra")
.save();

如您所见,我缓存 customersWhoHaventOrderedTheProduct DataFrame,之后我执行一个count并调用toJavaRDD .

根据我的计算,这些操作应该只执行一次。但是当我进入当前作业的 Spark UI 时,我看到以下阶段: enter image description here

如您所见,每个操作都执行了两次。

我做错了什么吗?有没有我错过的设置?

非常感谢任何想法。


编辑:

在我调用 System.out.println(storeCustomerReport.toJavaRDD().toDebugString()); 之后

这是调试字符串:

(200) MapPartitionsRDD[43] at toJavaRDD at DidNotBuyProductReport.java:93 []
| MapPartitionsRDD[42] at createDataFrame at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[41] at map at DidNotBuyProductReport.java:90 []
| MapPartitionsRDD[40] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[39] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[38] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ZippedPartitionsRDD2[37] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[31] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ShuffledRDD[30] at toJavaRDD at DidNotBuyProductReport.java:89 []
+-(2) MapPartitionsRDD[29] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[28] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[27] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[3] at cache at DidNotBuyProductReport.java:76 []
| CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
| MapPartitionsRDD[36] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ShuffledRDD[35] at toJavaRDD at DidNotBuyProductReport.java:89 []
+-(2) MapPartitionsRDD[34] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[33] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[32] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[5] at cache at DidNotBuyProductReport.java:76 []
| CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:15 []

编辑 2:

因此,经过一些研究并结合试验和错误,我设法优化了这项工作。

我从 customersWhoHaventOrderedTheProduct 创建了一个 RDD,并在调用 count() 操作之前将其缓存。 (我将缓存从 DataFrame 移到了 RDD)。

之后,我使用此 RDD 创建了 storeCustomerReport DataFrame

JavaRDD<Row> customersWhoHaventOrderedTheProductRdd = customersWhoHaventOrderedTheProduct.javaRDD().cache();

现在的阶段是这样的:

enter image description here

如您所见,两个 countcache 现在都没有了,但是仍然有两个“javaRDD”操作。我不知道它们来自哪里,因为我在我的代码中只调用了一次 toJavaRDD

最佳答案

看起来您在下面的代码段中应用了两个操作

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
.map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
.option("keyspace", "my_keyspace")

一个在 sqlContext.createDataFrame(),另一个在 storeCustomerReport.write(),这两个都需要 customersWhoHaventOrderedTheProduct.toJavaRDD() .

持久化由 产生的 RDD 应该可以解决这个问题。

JavaRDD cachedRdd = customersWhoHaventOrderedTheProduct.toJavaRDD().persist(StorageLevel.DISK_AND_MEMORY) //Or any other storage level

关于java - Spark 将每个 Action 执行两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33760644/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com