gpt4 book ai didi

java - 如何用 Direct Spark Cassandra 表数据操作替换 JAVA 循环

转载 作者:搜寻专家 更新时间:2023-11-01 03:34:29 25 4
gpt4 key购买 nike

我正在努力使我的代码更有效率,因为我必须在 cassandra 中处理数十亿行数据。我目前在 Datastax Cassandra Spark Connector 中使用 JAVA 循环来提取数据并将其放入一种格式我熟悉 (multimap),以便让 spark 进行操作。我希望能够用 cassandra 表的直接 spark 操作替换这个 Multimap 循环,以节省时间并使一切更有效率。我我非常感谢任何代码建议来实现这一点。这是我现有的代码:

        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");

// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for

// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());

// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();

} // end session
} // End Compute

最佳答案

JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();

// Define Variable
double channel,channel_end, increment;

// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;

List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for

return list;
}
});

sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();

} // end session

public static class MeasuredValue implements Serializable {

public MeasuredValue() { }

private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }

private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }

private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }

}

关于java - 如何用 Direct Spark Cassandra 表数据操作替换 JAVA 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35515113/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com