gpt4 book ai didi

mysql - Spark rdd通过查询mysql进行过滤

转载 作者:行者123 更新时间:2023-11-29 10:58:09 25 4
gpt4 key购买 nike

我使用 Spark Streaming 从 Kafka 流式传输数据,我想通过 MySql 中的数据来过滤数据判断。

例如,我从 kafka 获取数据,如下所示:

{"id":1, "data":"abcdefg"}

MySql中有这样的数据:

id  | state  
1 | "success"

我需要查询 MySql 来获取术语 id 的状态。我可以在过滤器的函数中定义与MySql的连接,并且它可以工作。代码如下:

def isSuccess(x):
id = x["id"]
sql = """
SELECT *
FROM Test
WHERE id = "{0}"
""".format(id)
conn = mysql_connection(......)
result = rdbi.query_one(sql)
if result == None:
return False
else:
return True
successRDD = rdd.filter(isSuccess)

但是它会为RDD的每一行定义连接,并且会浪费大量的计算资源。

如何在过滤器中进行操作?

最佳答案

我建议您使用 Apache Spark 中提供的 mapPartition 来防止每个 RDD 的 MySQL 连接初始化。

这是我创建的 MySQL 表:

create table test2(id varchar(10), state varchar(10));

具有以下值:

+------+---------+
| id | state |
+------+---------+
| 1 | success |
| 2 | stopped |
+------+---------+

使用以下 PySpark 代码作为引用:

import MySQLdb

data1=[["1", "afdasds"],["2","dfsdfada"],["3","dsfdsf"]] #sampe data, in your case streaming data
rdd = sc.parallelize(data1)

def func1(data1):
con = MySQLdb.connect(host="127.0.0.1", user="root", passwd="yourpassword", db="yourdb")
c=con.cursor()
c.execute("select * from test2;")
data=c.fetchall()
dict={}
for x in data:
dict[x[0]]=x[1]
list1=[]
for x in data1:
if x[0] in dict:
list1.append([x[0], x[1], dict[x[0]]])
else:
list1.append([x[0], x[1], "none"]) # i assign none if id in table and one received from streaming dont match
return iter(list1)

print rdd.mapPartitions(func1).filter(lambda x: "none" not in x[2]).collect()

我得到的输出是:

[['1', 'afdasds', 'success'], ['2', 'dfsdfada', 'stopped']]

关于mysql - Spark rdd通过查询mysql进行过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42714394/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com