gpt4 book ai didi

python - 即使使用较旧的 spark 版本,也没有名为 'pyspark.streaming.kafka' 的模块

转载 作者:行者123 更新时间:2023-12-03 19:07:59 32 4
gpt4 key购买 nike

在另一个类似的question ,他们提示“安装较旧的 spark 2.4.5。”
编辑:上面链接的解决方案说'安装 spark 2.4.5,它确实有 kafkautils。但问题是我无法下载 spark2.4.5 - 即使在存档中也不可用。
我听从了建议,安装了旧版本的 spark - spark2.4.6(唯一可用的旧版本)并且还有 python37、kafka-python、pyspark 库。
我有需要使用 kafka 的 spark_job.py 文件

from pyspark.streaming.kafka import KafkaUtils
当点击'python spark_job.py
ModuleNotFoundError: No module named 'pyspark.streaming.kafka'
错误仍然存​​在!
spark_job.py:
from __future__ import print_function
import sys
import os
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json


outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'


def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']

#-------------------------------------------------
# What I want to do per each RDD...
#-------------------------------------------------
def process(time, rdd):

print("===========-----> %s <-----===========" % str(time))

try:
spark = getSparkSessionInstance(rdd.context.getConf())

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))

testDataFrame = spark.createDataFrame(rowRdd)

testDataFrame.createOrReplaceTempView("treasury_stream")

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
except Exception as e:
print("--> Opps! It seems an Errrorrr with DB working!", e)

except Exception as e:
print("--> Opps! Is seems an Error!!!", e)

#-------------------------------------------------
# General function
#-------------------------------------------------
def createContext():

sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")

ssc = StreamingContext(sc, 2)

broker_list, topic = sys.argv[1:]

try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list={} topic={}".format(broker_list, topic))

parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))

# RDD handling
parsed_lines.foreachRDD(process)

return ssc


if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)

print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')

ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()

最佳答案

我只是使用 pip 降级了它:

pip install --force-reinstall pyspark==2.4.6
我没有使用任何诗歌。重新安装后,可以识别 kafkaUtils pkg。

关于python - 即使使用较旧的 spark 版本,也没有名为 'pyspark.streaming.kafka' 的模块,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63053460/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com