- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用 Kafka 和 Spark 进行情感分析。我想要做的是从 Kafka 读取流数据,然后使用 Spark 对数据进行批处理。之后,我想使用我使用 Tensorflow 制作的函数情绪预测()来分析批处理。这是我到目前为止所做的......
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
from multiprocessing import Lock
lock = Lock()
numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000
import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import DataPreprocessing as proc
import time
with open('dictionary.pickle', 'rb') as handle:
wordsList = pickle.load(handle)
wordVectors = np.load('final_embeddings.npy')
import tensorflow as tf
tf.reset_default_graph()
labels = tf.placeholder(tf.float32, [batchSize, numClasses])
input_data = tf.placeholder(tf.int32, [batchSize, maxSeqLength])
data = tf.Variable(tf.zeros([batchSize, maxSeqLength, numDimensions]),dtype=tf.float32)
data = tf.nn.embedding_lookup(wordVectors,input_data)
lstmCell = tf.contrib.rnn.BasicLSTMCell(lstmUnits)
lstmCell = tf.contrib.rnn.DropoutWrapper(cell=lstmCell, output_keep_prob=0.25)
value, _ = tf.nn.dynamic_rnn(lstmCell, data, dtype=tf.float32)
weight = tf.Variable(tf.truncated_normal([lstmUnits, numClasses]))
bias = tf.Variable(tf.constant(0.1, shape=[numClasses]))
value = tf.transpose(value, [1, 0, 2])
last = tf.gather(value, int(value.get_shape()[0]) - 1)
prediction = (tf.matmul(last, weight) + bias)
correctPred = tf.equal(tf.argmax(prediction,1), tf.argmax(labels,1))
accuracy = tf.reduce_mean(tf.cast(correctPred, tf.float32))
sess = tf.InteractiveSession()
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('models'))
def getSentenceMatrix(sentence):
arr = np.zeros([batchSize, maxSeqLength])
sentenceMatrix = np.zeros([batchSize,maxSeqLength], dtype='int32')
cleanedSentence = proc.cleanSentences(sentence)
split = cleanedSentence.split()
for indexCounter,word in enumerate(split):
try:
if word in wordsList:
sentenceMatrix[0,indexCounter] = wordsList[word]
else:
sentenceMatrix[0,indexCounter] = 0 #Vector for unkown words
except ValueError:
sentenceMatrix[0,indexCounter] = 399999 #Vector for unkown words
return sentenceMatrix
def sentimentCorrect(data):
try:
sentiment_results = {}
#sentences = data['sentences']
string = data.split(' ')
exact = [(spell.correction(word)) for word in string]
exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(exact))))
predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except:
print("Delay for 5 seconds")
time.sleep(5)
def sentimentPredict(data):
try:
sentiment_results = {}
#sentences = data['sentences']
#string = sentences.split(' ')
#exact = [get_exact_words(word) for word in string]
#exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(data))))
predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except TypeError:
raise
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
#kafkaStream = KafkaUtils.createStream(ssc, 'NLP:2181', 'spark-streaming', {'weblogs':1})
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['weblogs'], kafkaParams = {"metadata.broker.list": "NLP:9092"})
# Here to parse the inbound messages isn't valid JSON
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.saveAsTextFiles("file:///D:/spark-kafka.txt")
id_twitter = parsed.map(lambda tweet: tweet["id"])
id_twitter.saveAsTextFiles("file:///D:/id-tweet.txt")
id_twitter.count().map(lambda x:'ID in this batch: %s' % x).pprint()
name = parsed.map(lambda tweet: tweet["name"])
name.saveAsTextFiles("file:///D:/name-tweet.txt")
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
text = parsed.map(lambda tweet: tweet["text"])
text.saveAsTextFiles("file:///D:/text-tweet.txt")
sentiment = text.mapPartitions(sentimentPredict)
sentiment.saveAsTextFiles("file:///D:/sentiment-tweet.txt")
#sentiment_result = text.map(sentimentPredict(text))
#sentiment_result = text.flatMap(sentimentPredict(text))
#print(sentiment_result)
#parsed.map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.encode("utf-8").pprint()
#print(parsed)
#print(soup.encode("utf-8"))
#sentiment_result.saveAsTextFiles("file:///D:/spark-kafka.txt")
ssc.start()
ssc.awaitTermination()
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
2018-04-09 16:21:48 ERROR JobScheduler:91 - Error generating jobs for time 1523265708000 ms
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Traceback (most recent call last):
File "D:/PROJECT_MABESPOLRI/progress_spark_sentiment/spark+sentiment.py", line 171, in <module>
ssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 206, in awaitTermination
self._jssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
最佳答案
尝试将 spark 代码放在一个单独的函数中,只有必要的参数。当您运行 spark 操作时,它会尝试腌制当前范围内的所有内容(在您的情况下是顶级),如果遇到某些无法腌制的对象,则会引发错误。在您的情况下,我怀疑错误可能是由变量“锁定”引起的。
关于python - _pickle.PicklingError : Could not serialize object: TypeError: can't pickle _thread. RLock 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49729648/
目前我有以下内容: $.ajax({ type: 'POST', url: this.action, data: $(this).serialize(), }); 这工作正常,
目前我有以下内容: $.ajax({ type: 'POST', url: this.action, data: $(this).serialize(), }); 这很好用,但
我知道什么是序列化,但对我来说,这是一个无法描述其含义的术语。 为什么我们称序列化为序列化?将对象转换为原始数据(以及膨胀/反序列化,就此而言)有什么意义?谁创造了这个术语,为什么? 最佳答案 它可能
是否可以将数据结构(使用 boost::serialization)序列化为字符串变量或缓冲区(而不是磁盘上的文件)? 最佳答案 当然,让它在stringstream上完成工作。 关于serializ
假设我有以下类型定义,它依赖于常量来指示记录成员的向量长度: type point_t is record x: std_logic_vector(X_WIDTH-1 downto 0);
我尝试序列化一个向量和一个 map 容器,并通过 cout 输出它们的值。然而,我很难理解boost输出的含义。我的代码如下所示: #include #include #include #
我正在尝试将序列化功能添加到我的 Rust 结构之一。这是一个日历事件,看起来像这样: #[derive(PartialEq, Clone, Encodable, Decodable)] pub st
正如主题所暗示的那样,在将大量数据序列化到文件时,我遇到了 boost::serialization 的一个小问题。问题在于应用程序的序列化部分的内存占用大约是被序列化对象内存的 3 到 3.5 倍。
在搜索解决方案时,我得到了 this和 this但我不清楚这个概念,所以无法实现:(。当我尝试更新数据库中的值(特别是日期时间对象)时会发生此错误。以下是我正在使用的代码:- var upd
我收到以下错误, 模板对象不可迭代 def get_AJAX(request, id): data = serializers.serialize("json", Template.objec
由于方便,我正在考虑对我的所有数据 i/o 使用 serialize() 和 deserialize()。但是,我不想在 Julia 更新中被不可读的文件所困扰。 serialize() 和 dese
我有一个通常使用 JMS Serializer 包序列化的实体。我必须在序列化中添加一些不驻留在实体本身中但通过一些数据库查询收集的字段。 我的想法是创建一个自定义对象,用实体字段填充字段并添加自定义
我正在尝试使用 XmlParser 从 xml 文件中删除和添加标签。以下是我在使用“grails run-app”命令部署的 grails 应用程序中执行时运行良好的代码块: def parser
我正在使用 MRUnit 测试 MultipleOutputs。测试用例失败并显示以下消息。 java.lang.ClassCastException: org.apache.hadoop.io.se
本文整理了Java中com.jme3.network.serializing.serializers.ZIPSerializer类的一些代码示例,展示了ZIPSerializer类的具体用法。这些代码
我有一个处理草图,需要与 USB 设备建立 2 个连接。我无法提前判断哪个设备是 USB0 哪个是 USB1。 (不是我至少知道) 其中一台设备发出问候语,另一台设备根本不回答。因此,我编写了带有简单
我在下面有这个代码,我来自 this forum我遵循了。它对我不起作用,但他们声称代码很好。我已经尝试了几种字符串比较方法,例如 string.equals(string)和标准==运营商,仍然没有
当我尝试调用特定的 Web 服务方法时,我收到“Unspecified error”。使用 XMLSpy 我发现参数对象还没有被序列化。 在生成的序列化程序源中,我注意到以下几行: if (!need
在 Rust 中编写 NEAR 智能合约,我的编译器似乎要求通过 API 发送的对象具有 Serialize trait,以及存储在区块链中的对象 BorshSerialize和 BorshDeser
我正在尝试 Kotlin 序列化。按照说明进行设置后,我得到了 Unresolved reference: serializer使用此代码构建错误: val serializer : KSeriali
我是一名优秀的程序员,十分优秀!