gpt4 book ai didi

apache-spark - 酸洗错误-Cython 与 Pyspark : scikit-learn knn with user defined metric for large dataset

转载 作者:行者123 更新时间:2023-12-01 05:55:38 24 4
gpt4 key购买 nike

我想使用 Cython 和 Pyspark 来加速 Sklearn knn 与用户定义的度量标准,用于具有 400000 行和 65 列的大型数据集。我已按照 here 中的说明进行操作和 here .我使用的是 Spark 版本 1.6.0 和 python 2.7.13

我为一个小样本数据集编写了以下代码,但出现以下酸洗错误

Traceback (most recent call last):
File "/farzanadata/main.py", line 26, in <module>
bc_nbrs = sc.broadcast(nbrs)
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 741, in broadcast
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 70, in __init__
File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 78, in dump
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

spark_tools.py
def spark_cython(module, method):
def wrapped(*args, **kwargs):
global cython_function_
try:
return cython_function_(*args, **kwargs)
except:
import pyximport
pyximport.install()
cython_function_ = getattr(__import__(module), method)
return cython_function_(*args, **kwargs)
return wrapped

临床内核.pyx
cimport cython
from libc cimport math
cimport numpy as cnp
cnp.import_array()
def mydist(cnp.npy_double[:] x,cnp.npy_double[:] y):
cdef double ranges[3]
cdef int k
cdef double out=0, out2=0
ranges[:]= [0.04028, 0.0983, 0.06602]
for k in range(3):
out += (ranges[k] - math.fabs(x[k] - y[k])) / ranges[k]
for k in range(3,5):
out2 += x[k]==y[k]
return (out+out2)/5

main.py
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from sklearn.neighbors import NearestNeighbors
import numpy as np
from spark_tools import spark_cython

import pyximport

conf = SparkConf().setAppName('Fibo')
sc = SparkContext(conf=conf)
sc.addFile('file:///farzanadata/clinical_kernel.pyx')
sc.addFile('file:///farzanadata/spark_tools.py')
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
pyximport.install()
import clinical_kernel
df=sc.parallelize([[0.72694,1.4742,0.32396,1,1],[0.74173,1.5257,0.36116,0,0],[0.76722,1.5725,0.38998,1,0],[0.76722, 1.5725, 0.38998,0,1]])
X=np.array(df.collect())
mapper = spark_cython('clinical_kernel', 'mydist')
nbrs=NearestNeighbors(n_neighbors=4,metric=mapper)
nbrs.fit(X)
bc_nbrs = sc.broadcast(nbrs)
neighbors=df.map(lambda x: bc_nbrs.value.kneighbors(x,n_neighbors=4,return_distance=False))
neigh_df = neighbors.map(lambda x: x.tolist()).toDF(["neighbors"])
neigh_df.show()

而不是广播 KNN 树,使用以下代码可以完美地工作,当然这对于大型数据集并不理想。
neighbors=nbrs.kneighbors(X,n_neighbors=4,return_distance=False)

使用进口莳萝作为泡菜也没有帮助

最佳答案

按照以下方式更改 sparktool.py 可以解决问题

def spark_cython(*args,**kwargs):
global cython_function_
module='clinical_kernel'
method='mydist'
try:
return cython_function_(*args, **kwargs)
except:
import pyximport
pyximport.install()
cython_function_ = getattr(__import__(module), method)
return cython_function_(*args, **kwargs)

关于apache-spark - 酸洗错误-Cython 与 Pyspark : scikit-learn knn with user defined metric for large dataset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49764147/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com