gpt4 book ai didi

java - 如何在 Spark 中按字段对数据进行分组?

转载 作者:行者123 更新时间:2023-11-30 07:04:43 24 4
gpt4 key购买 nike

我想从数据库中读取两列,按第一列将它们分组,然后使用 Spark 将结果插入到另一个表中。我的程序是用Java 编写的。我尝试了以下方法:

public static void aggregateSessionEvents(org.apache.spark.SparkContext sparkContext) {
com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD<String, String> logs = javaFunctions(sparkContext)
.cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))
.select("session_id", "event");
logs.groupByKey();
com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions(logs).writerBuilder("dove", "event_aggregation", null).saveToCassandra();
sparkContext.stop();
}

这给了我错误:

The method cassandraTable(String, String, RowReaderFactory<T>) in the type SparkContextJavaFunctions is not applicable for the arguments (String, String, RowReaderFactory<String>, mapColumnTo(String.class))

我的依赖项是:

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>

如何解决这个问题?

最佳答案

更改此:

.cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))

至:

.cassandraTable("dove", "event_log", mapColumnTo(String.class))

您正在发送额外的参数。

关于java - 如何在 Spark 中按字段对数据进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40340122/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com