gpt4 book ai didi

pyspark - 如何使用 PySpark Structure Streaming +Kafka

转载 作者:行者123 更新时间:2023-12-04 09:30:23 27 4
gpt4 key购买 nike

我尝试在 kafka 中使用 Spark 结构流,但在使用 Spark 提交时遇到问题,消费者仍然从产品接收数据,但 Spark 结构是错误的。请帮我在我的代码中找到问题
这是我在 test.py 中的代码:

from kafka import KafkaProducer
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('stream_test').getOrCreate()
import random

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
for i in range(0,100):
lg_value = str(random.uniform(5000, 10000))
producer.send(topic = 'test', value = bytes(lg_value, encoding='utf-8'))
producer.flush()

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","test").load()
df_to_string = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
print("done")
当我运行时:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 test.py
终端输出:
> 20/07/12 19:39:09 INFO Executor: Starting executor ID driver on host
> 192.168.31.129 20/07/12 19:39:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on
> port 38885. 20/07/12 19:39:09 INFO NettyBlockTransferService: Server
> created on 192.168.31.129:38885 20/07/12 19:39:09 INFO BlockManager:
> Using org.apache.spark.storage.RandomBlockReplicationPolicy for block
> replication policy 20/07/12 19:39:09 INFO BlockManagerMaster:
> Registering BlockManager BlockManagerId(driver, 192.168.31.129, 38885,
> None) 20/07/12 19:39:09 INFO BlockManagerMasterEndpoint: Registering
> block manager 192.168.31.129:38885 with 413.9 MiB RAM,
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, 192.168.31.129, 38885, None) 20/07/12 19:39:09
> INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,
> 192.168.31.129, 38885, None) 20/07/12 19:39:11 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of
> spark.sql.warehouse.dir ('file:/home/thoaint2/spark-warehouse').
> 20/07/12 19:39:11 INFO SharedState: Warehouse path is
> 'file:/home/thoaint2/spark-warehouse'. Traceback (most recent call
> last): File "/home/thoaint2/test.py", line 15, in <module>
> df = spark.readStream.format("kafka").option('kafka.bootstrap.servers','localhost:9092')
> \ File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 420, in load File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__ File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 131, in deco File
> "/home/thoaint2/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error
> occurred while calling o31.load. : java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArraySerializer at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:557)
> at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
> at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:325)
> at
> org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:70)
> at
> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:220)
> at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:112)
> at
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:112)
> at
> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
> at
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:205)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
> py4j.Gateway.invoke(Gateway.java:282) at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79) at
> py4j.GatewayConnection.run(GatewayConnection.java:238) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ClassNotFoundException:
> org.apache.kafka.common.serialization.ByteArraySerializer at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382)

最佳答案

您需要将 kafka-clients JAR 添加到您的 --packages另请注意,Spark 也可以作为生产者,因此您不需要不同的 Python Kafka 库。
如果您只想在不使用 JVM 的情况下处理 Kafka Streams,那么请查看 Faust

关于pyspark - 如何使用 PySpark Structure Streaming +Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62868678/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com