gpt4 book ai didi

apache-spark - 如何将 Spark Streaming 与 Cassandra 连接起来?

转载 作者:行者123 更新时间:2023-12-02 01:29:30 26 4
gpt4 key购买 nike

我正在使用

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

cassandra 正在监听

rpc_address:127.0.1.1
rpc_port:9160

例如,要连接kafka和spark-streaming,同时每4秒听一次kafka,我有以下spark作业

sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)

并且 spark-streaming 每 4 秒持续监听 kafka broker 并输出内容。

同理,我想让spark streaming监听cassandra,每隔4秒输出指定表的内容

如何转换上面的流式代码,使其与 cassandra 而不是 kafka 一起工作?


非流媒体解决方案

我显然可以在无限循环中继续运行查询,但这不是真正的流式传输,对吧?

Spark 作业:

from __future__ import print_function
import time
import sys

from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *

sc = SparkContext(appName="sparkcassandra")
while(True):
time.sleep(5)
sqlContext = SQLContext(sc)
stream=StreamingContext(sc,4)
lines = stream.socketTextStream("127.0.1.1", 9160)
sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table="users", keyspace="keyspace2")\
.load()\
.show()

这样跑

sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py

然后我得到了大致看起来像的表值

lastname|age|city|email|firstname

那么从 cassandra“流式传输”数据的正确方法是什么?


最佳答案

目前,从 C* 流式传输数据的“正确方法”不是从 C* 流式传输数据 :) 相反,将消息队列(如 Kafka)放在 C* 前面并从中流式传输通常更有意义那。 C* 不容易支持增量表读取,但如果集群键基于插入时间,则可以做到这一点。

如果您有兴趣将 C* 用作流媒体源,请务必查看并发表评论 https://issues.apache.org/jira/browse/CASSANDRA-8844更改数据捕获

这很可能是您正在寻找的。

如果您实际上只是尝试定期读取整个表并做一些事情,那么您最好只使用 cron 作业来启动批处理操作,因为您确实无法恢复状态。

关于apache-spark - 如何将 Spark Streaming 与 Cassandra 连接起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34993290/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com