gpt4 book ai didi

cassandra - Spark-streaming:如何将流数据输出到cassandra

转载 作者:行者123 更新时间:2023-12-02 23:10:14 26 4
gpt4 key购买 nike

我正在使用 Spark-streaming 读取 kafka 流消息。现在我想将 Cassandra 设置为我的输出。我在 cassandra“test_table”中创建了一个表,其中包含“key:text 主键”和“value:text”列我已成功将数据映射到 JavaDStream<Tuple2<String,String>> data像这样:

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>( message._1(), message._2() );
}
}
);

然后我创建了一个列表:

List<TestTable> list = new ArrayList<TestTable>();

其中 TestTable 是我的自定义类,其结构与我的 Cassandra 表相同,成员为“key”和“value”:

class TestTable
{
String key;
String val;

public TestTable() {}

public TestTable(String k, String v)
{
key=k;
val=v;
}

public String getKey(){
return key;
}

public void setKey(String k){
key=k;
}

public String getVal(){
return val;
}

public void setVal(String v){
val=v;
}

public String toString(){
return "Key:"+key+",Val:"+val;
}
}

请建议一种如何添加 JavaDStream<Tuple2<String,String>> data 中的数据的方法进入List<TestTable> list 。我这样做是为了随后可以使用

JavaRDD<TestTable> rdd = sc.parallelize(list); 
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");

将 RDD 数据保存到 Cassandra 中。

我尝试过这样编码:

messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);

但似乎发生了某种类型不匹配。请帮忙。

最佳答案

假设该程序的目的是将kafka中的流数据保存到Cassandra中,则无需转储JavaDStream<Tuple2<String,String>>数据写入List<TestTable>列表。

DataStax 的 Spark-Cassandra 连接器直接通过 Spark Streaming extensions 支持此功能。 。

JavaDStream 上使用此类扩展应该足够了:

javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();

而不是在中间列表上耗尽数据。

关于cassandra - Spark-streaming:如何将流数据输出到cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27318202/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com