gpt4 book ai didi

Python - 将整数或字符串发送到 Spark-Streaming

转载 作者:太空宇宙 更新时间:2023-11-03 14:24:40 25 4
gpt4 key购买 nike

我可以通过 CSV 文件发送我的数据。首先,将我的随机数写入CSV文件然后发送,但是可以直接发送吗?我的套接字代码:

import socket
host = 'localhost'
port = 8080

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
print('\nListening for a client at',host , port)
conn, addr = s.accept()
print('\nConnected by', addr)
try:
print('\nReading file...\n')
while 1:
out = "test01"
print('Sending line', line)
conn.send(out)
except socket.error:
print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

Spark 流代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]","deneme")
ssc = StreamingContext(sc, 10)
socket_stream = ssc.socketTextStream("localhost",8080)

random_integers = socket_stream.window( 30 )

digits = random_integers.flatMap(lambda line: line.split(" ")).map(lambda digit: (digit, 1))

digit_count = digits.reduceByKey(lambda x,y:x+y)
digit_count.pprint()

ssc.start()

最佳答案

这是因为套接字会阻止发送数据并且永远不会继续前进。最基本的解决方案是发送一些数据并关闭连接:

import socket
import time

host = 'localhost'
port = 50007

i = 0

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind((host, port))
s.listen(1)

try:
while True:
conn, addr = s.accept()
try:
for j in range(10):
conn.send(bytes("{}\n".format(i), "utf-8"))
i += 1
time.sleep(1)
conn.close()
except socket.error: pass
finally:
s.close()

要获得更有趣的内容,请检查带超时的非阻塞模式。

关于Python - 将整数或字符串发送到 Spark-Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47726552/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com