gpt4 book ai didi

python - 值错误 : too many values to unpack (while reducing with foldByKey)

转载 作者:太空宇宙 更新时间:2023-11-04 07:36:16 25 4
gpt4 key购买 nike

与“ValueError:要解压的值太多”错误作斗争,在运行以下代码时,目的是为每个键构建值的直方图:

%pyspark

import datetime
from pyspark.sql import SQLContext, Row

def featVecSms( x ):
sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
hourOfDay = int( sttm.strftime( '%H' ) )
dayOfWeek = int( sttm.strftime( '%w' ) )
dayOfMonth = int( sttm.strftime( '%d' ) )
duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
duration = duration.total_seconds()
service = x[3]
resultCode = int( x[4] )
msc = x[5]
actionMap = {
"0":'fsm',
"1":'fsm',
"2000":'sri',
"2001":'sri',
"2100":'sri',
"2101":'sri',
"2102":'fsm',
"2200":'sri',
"2201":'sri',
"2202":'fsm',
"2203":'fsm',
"2204":'fsm',
"2205":'fsm',
"2206":'fsm',
"2207":'sri',
"2208":'sri',
"2209":'sri',
"2210":'fsm',
"2211":'fsm',
"2212":'fsm',
"2213":'fsm',
"2214":'fsm',
"2215":'sri',
"2216":'fsm'
}
action = actionMap.get( x[4] )
return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode, msc, action )

textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )

def countByCrit( accVal, currVal, idx ):
accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
return accVal

def countByTod( accVal, currVal ):
return countByCrit( accVal, currVal, 1 )

todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )

msTodSuccess.collect()

抛出以下错误:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
merger.mergeValues(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack

数据如下所示:

$ head -15 /export/sampleMses10M.txt/part-00000 
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')

样本只有123k,应用中应该有千万条记录。

最佳答案

您的代码的问题是类型错误。

首先,*byKey 方法对PairwiseRDDs 进行操作。在 Python 中,它表示包含长度为 2 的元组或其他结构(我们称之为 pair)的 RDD,可以像这样解包:

k, v = pair

msRec,其中包含长度为 9 的元素,显然不会在这里工作。

第二个问题是您使用了错误的转换。让我们看一下 Scala 中 foldByKey 的签名:

def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

其中 V 是值的类型 (RDD[(K, V)])。如您所见,函数的 zeroValue 和返回类型应该与值的类型相同,但这里显然不是这种情况。

如果结果类型与输入类型不同,您应该使用 combineByKeyaggregateByKey

关于python - 值错误 : too many values to unpack (while reducing with foldByKey),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34546404/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com