gpt4 book ai didi

python - Spark Streaming 不从本地目录读取

转载 作者:太空宇宙 更新时间:2023-11-03 16:42:28 25 4
gpt4 key购买 nike

我正在尝试使用 Spark Python API 编写 Spark 流应用程序。

应用程序应该从本地目录读取文本文件并将其发送到 Kafka 集群。

将python脚本提交到spark引擎时,根本没有发送任何内容到kafka。

我尝试打印事件而不是将其发送到 Kafka,但发现没有读取到任何内容。

这是脚本的代码。

#!/usr/lib/python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
import sys
import time
reload(sys)
sys.setdefaultencoding('utf8')


producer = KafkaProducer(bootstrap_servers="kafka-b01.css.org:9092,kafka-b02.css.org:9092,kafka-b03.css.org:9092,kafka-b04.css.org:9092,kafka-b05.css.org:9092")


def send_to_kafka(rdd):
tweets = rdd.collect()
print ("--------------------------")
print (tweets)
print "--------------------------"
#for tweet in tweets:
# producer.send('test_historical_job', value=bytes(tweet))


if __name__ == "__main__":

conf = SparkConf().setAppName("TestSparkFromPython")

sc = SparkContext(conf=conf)

ssc = StreamingContext(sc, 1)

tweetsDstream = ssc.textFileStream("/tmp/historical/")

tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
ssc.start()
ssc.awaitTermination()

我正在使用此命令提交脚本

./spark-submit --master spark://spark-master:7077 /apps/historical_streamer.py

打印语句的输出是一个空列表。

--------------------------
[]
--------------------------

编辑

基于this question我将数据目录的路径从 "/tmp/historical/" 更改为 "file:///tmp/historical/"

我尝试先运行该作业,然后将文件移动到该目录,但不幸的是它也不起作用。

最佳答案

基于文件流的源(例如 fileStreamtextFileStream)期望数据文件为:

be created in the dataDirectory by atomically moving or renaming them into the data directory.

如果给定窗口中没有新文件,则无需处理任何内容,因此不会读取预先存在的文件(似乎是此处的情况),也不会在输出中显示。

关于python - Spark Streaming 不从本地目录读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36677762/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com