作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Spark-streaming 读取 kafka 流消息。现在我想将 Cassandra 设置为我的输出。我在 cassandra“test_table”中创建了一个表,其中包含“key:text 主键”和“value:text”列我已成功将数据映射到 JavaDStream<Tuple2<String,String>> data
像这样:
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>( message._1(), message._2() );
}
}
);
然后我创建了一个列表:
List<TestTable> list = new ArrayList<TestTable>();
其中 TestTable 是我的自定义类,其结构与我的 Cassandra 表相同,成员为“key”和“value”:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}
请建议一种如何添加 JavaDStream<Tuple2<String,String>> data
中的数据的方法进入List<TestTable> list
。我这样做是为了随后可以使用
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");
将 RDD 数据保存到 Cassandra 中。
我尝试过这样编码:
messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
但似乎发生了某种类型不匹配。请帮忙。
最佳答案
假设该程序的目的是将kafka中的流数据保存到Cassandra中,则无需转储JavaDStream<Tuple2<String,String>>
数据写入List<TestTable>
列表。
DataStax 的 Spark-Cassandra 连接器直接通过 Spark Streaming extensions 支持此功能。 。
在 JavaDStream
上使用此类扩展应该足够了:
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
而不是在中间列表上耗尽数据。
关于cassandra - Spark-streaming:如何将流数据输出到cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27318202/
我是一名优秀的程序员,十分优秀!