gpt4 book ai didi

python发送csv数据到spark streaming

转载 作者:太空宇宙 更新时间:2023-11-03 11:23:55 25 4
gpt4 key购买 nike

我想尝试在 python 中加载一个 csv 数据,并通过 SPark Streaming 流式传输每一行 Spark 。

我对网络的东西还很陌生。我不完全是如果我应该创建一个服务器 python 脚本,一旦它建立连接(使用 spark 流)它就会开始发送每一行。在 Spark Streaming 文档中,他们做了一个 nc -l 9999,如果我正确的话,这是一个监听端口 9999 的 netcat 服务器。所以我尝试创建一个类似于解析 csv 并在端口 60000 上发送的 python 脚本

import socket                   # Import socket module
import csv

port = 60000 # Reserve a port for your service.
s = socket.socket() # Create a socket object
host = socket.gethostname() # Get local machine name
s.bind((host, port)) # Bind to the port
s.listen(5) # Now wait for client connection.

print('Server listening....')

while True:
conn, addr = s.accept() # Establish connection with client.
print('Got connection from', addr)



csvfile = open('Titantic.csv', 'rb')

reader = csv.reader(csvfile, delimiter = ',')
for row in reader:
line = ','.join(row)

conn.send(line)
print(line)

csvfile.close()

print('Done sending')
conn.send('Thank you for connecting')
conn.close()

SPark 流媒体脚本 -

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)

# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))

data_RDD.pprint()

ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

当运行 spark 脚本时(顺便说一句,这是在 Jupyter Notebooks 中)我得到这个错误 -IllegalArgumentException: '要求失败:没有注册输出操作,所以没有执行'

我认为我没有正确地执行我的套接字脚本,但我真的不确定该怎么做我基本上是在尝试复制 nc -lk 9999 所做的,这样我就可以通过端口发送文本数据,然后 spark streaming 正在监听它并接收数据并进行处理。

任何帮助将不胜感激

最佳答案

我正在尝试做类似的事情,但我想每 10 秒流式传输一次。我用这个脚本解决了:

import socket
from time import sleep

host = 'localhost'
port = 12345

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
print('\nListening for a client at',host , port)
conn, addr = s.accept()
print('\nConnected by', addr)
try:
print('\nReading file...\n')
with open('iris_test.csv') as f:
for line in f:
out = line.encode('utf-8')
print('Sending line',line)
conn.send(out)
sleep(10)
print('End Of Stream.')
except socket.error:
print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

希望这对您有所帮助。

关于python发送csv数据到spark streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37667771/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com