gpt4 book ai didi

python - PySpark 马尔可夫模型的算法/编码帮助

转载 作者:太空狗 更新时间:2023-10-30 01:28:22 24 4
gpt4 key购买 nike

我需要一些帮助来让我的大脑围绕在 spark 中设计一个(高效的)马尔可夫链(通过 python)。我已经尽我所能地编写了它,但是我想出的代码无法扩展。基本上对于各个 map 阶段,我编写了自定义函数并且它们可以很好地处理几千个序列,但是当我们得到在 20,000 多个(我有一些高达 800k)中,事情进展缓慢。

对于那些不熟悉马尔可夫穆德尔的人来说,这就是它的要点..

这是我的数据。此时我在 RDD 中获得了实际数据(无 header )。

ID, SEQ
500, HNL, LNH, MLH, HML

我们查看元组中的序列,所以

(HNL, LNH), (LNH,MLH), etc..

我需要达到这一点.. 我返回一个字典(针对每一行数据),然后将其序列化并存储在内存数据库中。

{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}

所以本质上,每个序列都与下一个序列组合(HNL,LNH 变成 'HNLLNH'),然后对于所有可能的转换(序列组合),我们计算它们的出现次数,然后除以转换总数(3 in这种情况)并得到它们的出现频率。

上面有 3 个转换,其中一个是 HNLLNH.. 所以对于 HNLLNH,1/3 = 0.333

另一方面,我不确定它是否相关,但序列中每个位置的值都是有限的。第一个位置 (H/M/L),第二个位置 (M/L),第三个位置(H,M,L)。

我的代码之前所做的是收集()rdd,并使用我编写的函数将其映射几次。这些函数首先将字符串变成一个列表,然后将 list[1] 与 list[2] 合并,然后将 list[2] 与 list[3] 合并,然后将 list[3] 与 list[4] 合并,等等。所以我结束了想出这样的东西..

[HNLLNH],[LNHMLH],[MHLHML], etc..

然后下一个函数从该列表中创建一个字典,使用列表项作为键,然后计算该键在完整列表中的总出现次数,除以 len(list) 以获得频率。然后我将该字典连同它的 ID 号一起包装在另一个字典中(产生第二个代码块,在上面)。

就像我说的,这对小序列很有效,但对长度超过 100k 的列表不太有效。

此外,请记住,这只是一行数据。我必须对 10-20k 行数据的任何位置执行此操作,数据行的长度在每行 500-800,000 个序列之间变化。

关于如何编写 pyspark 代码(使用 API map/reduce/agg/etc.. 函数)以高效执行此操作的任何建议?

编辑代码如下。从底部开始可能是有意义的。请记住,我正在学习这个(Python 和 Spark),我不是以此为生,所以我的编码标准不是很好..

def f(x):
# Custom RDD map function
# Combines two separate transactions
# into a single transition state

cust_id = x[0]
trans = ','.join(x[1])
y = trans.split(",")
s = ''
for i in range(len(y)-1):
s= s + str(y[i] + str(y[i+1]))+","
return str(cust_id+','+s[:-1])

def g(x):
# Custom RDD map function
# Calculates the transition state probabilities
# by adding up state-transition occurrences
# and dividing by total transitions
cust_id=str(x.split(",")[0])
trans = x.split(",")[1:]
temp_list=[]
middle = int((len(trans[0])+1)/2)
for i in trans:
temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )

state_trans = {}
for i in temp_list:
state_trans[i] = temp_list.count(i)/(len(temp_list))

my_dict = {}
my_dict[cust_id]=state_trans
return my_dict


def gen_tsm_dict_spark(lines):
# Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
# Returns RDD of dict with CUST_ID and tsm per customer
# i.e. {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}

# creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))

with_seq = cust_trans.map(f)

full_tsm_dict = with_seq.map(g)

return full_tsm_dict


def main():
result = gen_tsm_spark(my_rdd)

# Insert into DB
for x in result.collect():
for k,v in x.iteritems():
db_insert(k,v)

最佳答案

您可以尝试以下操作。它在很大程度上取决于 tooolz但如果您希望避免外部依赖,您可以轻松地将其替换为一些标准 Python 库。

from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge

# Generate all possible transitions
defaults = sc.broadcast(dict(map(
lambda x: ("".join(concat(x)), 0.0),
product(product("HNL", "NL", "HNL"), repeat=2))))

rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])

def process(line):
"""
>>> process("000, HHH, LLL, NNN")
('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
"""
bits = line.split(", ")
transactions = bits[1:]
n = len(transactions) - 1
frequencies = pipe(
sliding_window(2, transactions), # Get all transitions
map(lambda p: "".join(p)), # Joins strings
Counter, # Count
lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
)
return bits[0], frequencies

def store_partition(iter):
for (k, v) in iter:
db_insert(k, merge([defaults.value, v]))

rdd.map(process).foreachPartition(store_partition)

既然您知道所有可能的转换,我建议您使用稀疏表示并忽略零。此外,您可以用稀疏向量替换字典以减少内存占用。

关于python - PySpark 马尔可夫模型的算法/编码帮助,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32641643/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com