- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 PySpark 应用程序,它必须详细说明大约 5gb 的压缩数据(字符串)。我正在使用具有 12 个内核(24 个线程)和 72Gb RAM 的小型服务器。我的 PySpark 程序仅包含 2 个映射操作,借助于 3 个非常大的正则表达式(每个已编译 3gb)并加载了 pickle
。 Spark 在独立模式下工作,worker 和 master 在同一台机器上。
我的问题是:spark 是否会为每个执行程序核心复制每个变量?因为它使用了所有可用的内存,然后使用了大量的交换空间。或者它是否将所有分区加载到 RAM 中? RDD 包含大约 1000 万个必须由 3 正则表达式搜索的字符串。 RDD 大约有 1000 个分区。我很难完成这项任务,因为几分钟后内存已满, Spark 开始使用交换空间变得非常非常慢。 我注意到没有正则表达式情况是一样的。
这是我的代码,它删除了 twitter 推文的所有无用字段,并扫描推文的文本和描述中的特定单词:
import json
import re
import twitter_util as twu
import pickle
from pyspark import SparkContext
sc = SparkContext()
prefix = '/home/lucadiliello'
source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'
#Regex's path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'
#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))
#Loading the RDD from textfile
tx = sc.textFile(source).map(lambda a: json.loads(a))
def get_device(input_text):
output_text = re.sub('<[^>]*>', '', input_text)
return output_text
def filter_data(a):
res = {}
try:
res['mentions'] = a['entities']['user_mentions']
res['hashtags'] = a['entities']['hashtags']
res['created_at'] = a['created_at']
res['id'] = a['id']
res['lang'] = a['lang']
if 'place' in a and a['place'] is not None:
res['place'] = {}
res['place']['country_code'] = a['place']['country_code']
res['place']['place_type'] = a['place']['place_type']
res['place']['name'] = a['place']['name']
res['place']['full_name'] = a['place']['full_name']
res['source'] = get_device(a['source'])
res['text'] = a['text']
res['timestamp_ms'] = a['timestamp_ms']
res['user'] = {}
res['user']['created_at'] = a['user']['created_at']
res['user']['description'] = a['user']['description']
res['user']['followers_count'] = a['user']['followers_count']
res['user']['friends_count'] = a['user']['friends_count']
res['user']['screen_name'] = a['user']['screen_name']
res['user']['lang'] = a['user']['lang']
res['user']['name'] = a['user']['name']
res['user']['location'] = a['user']['location']
res['user']['statuses_count'] = a['user']['statuses_count']
res['user']['verified'] = a['user']['verified']
res['user']['url'] = a['user']['url']
except KeyError:
return []
return [res]
results = tx.flatMap(filter_data)
def setting_tweet(tweet):
text = tweet['text'] if tweet['text'] is not None else ''
descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
del tweet['text']
del tweet['user']['description']
tweet['text'] = {}
tweet['user']['description'] = {}
del tweet['mentions']
#tweet
tweet['text']['original_text'] = text
tweet['text']['mentions'] = twu.find_retweet(text)
tweet['text']['links'] = []
for j in twu.find_links(text):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['text']['links'].append(tmp)
except ValueError:
pass
tweet['text']['companies'] = []
for x in comp_regex.findall(text.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['text']['companies'].append(tmp)
# descr
tweet['user']['description']['original_text'] = descr
tweet['user']['description']['mentions'] = twu.find_retweet(descr)
tweet['user']['description']['links'] = []
for j in twu.find_links(descr):
tmp = {}
try:
tmp['host'] = twu.get_host(j)
tmp['link'] = j
tweet['user']['description']['links'].append(tmp)
except ValueError:
pass
tweet['user']['description']['companies'] = []
for x in comp_regex.findall(descr.lower()):
tmp = {}
tmp['id'] = comp_dict[x.lower()]
tmp['name'] = x
tmp['legalName'] = comp_dict_legal[x.lower()]
tweet['user']['description']['companies'].append(tmp)
return tweet
res = results.map(setting_tweet)
res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")
更新大约 1 小时后,内存 (72gb) 和交换空间 (72gb) 完全满了。就我而言,使用广播不是解决方案。
更新 2在不使用 pickle 加载 3 个变量的情况下,使用高达 10gb 的 RAM 而不是 144GB 的 RAM 就可以毫无问题地结束! (72GB 内存 + 72Gb 交换空间)
最佳答案
My question is: does spark replicate each variable for each executor core?
是的!
每个(局部)变量的副本数等于您分配给 Python worker 的线程数。
至于你的问题,尝试在不使用 pickle
的情况下加载 comp_regex
、comp_dict
和 comp_dict_legal
。
关于python - spark 做了多少环境副本?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43955326/
我有一个关于 JavaScript 语法的问题。实际上,我在自学 MEAN 堆栈教程时想出了编码(https://thinkster.io/mean-stack-tutorial#adding-aut
在我的书中它使用了这样的东西: for($ARGV[0]) { Expression && do { print "..."; last; }; ... } for 循环不完整吗?另外,do 的意义何
我已经编写了读取开关状态的代码,如果按 3 次 # 则退出。 void allkeypadTest(void) { static uint8_t modeKeyCount=0; do
因此,对于上周我必须做的作业,我必须使用 4 个 do-while 循环和 if 语句在 Java 中制作一个猜谜游戏。我无法成功完成它,类(class)已经继续,没有为我提供任何帮助。如果有人可以查
int i=1,j=0,n=10,k; do{ j+=i; i<<1; printf("%d\n",i); // printf("%d\n",12<<1); }while
此代码用于基本杂货计算器的按钮。当我按下按钮时,一个输入对话框会显示您输入商品价格的位置。我遇到的问题是我无法弄清楚如何获得 do ... while 循环以使输入对话框在输入后弹出。 我希望它始终恢
当我在循环中修改字符串或另一个变量时,它的条件是否每次都重新计算?或者在循环开始前一次 std::string a("aa"); do { a = "aaaa"; } while(a.size<10)
我刚刚写了这个,但我找不到问题。我使用代码块并编写了这个问题 error: expected 'while' before '{' token === Build finished: 1 errors
do { printf("Enter number (0-6): ", ""); scanf("%d", &Num); }while(Num >= 0 && Num 表示“超过”,<表
我有一个包含 10 个项目的 vector (为简单起见,所有项目都属于同一类,称其为“a”)。我想要做的是检查“A”不是 a) 隐藏墙壁或 b) 隐藏另一个“A”。我有一个碰撞函数可以做到这一点。
嗨,这是我的第二个问题。我有下表 |-----|-------|------|------| |._id.|..INFO.|.DONE.|.LAST.| |..1..|...A...|...N..|.
这个问题在这里已经有了答案: 关闭 12 年前。 Possible Duplicates: Why are there sometimes meaningless do/while and if/e
来自 wikibook在 F# 上有一小部分它说: What does let! do?# let! runs an async object on its own thread, then it i
我在 Real World Haskell 书中遇到了以下函数: namesMatching pat | not (isPattern pat) = do exists do
我有一个类似于下面的用例,我创建了多个图并使用 gridExtra 将它们排列到一些页面布局中,最后使用 ggsave 将其保存为 PDF : p1 % mutate(label2
当我使用具有 for 循环的嵌套 let 语句时,如果没有 (do (html5 ..)),我将无法运行内部 [:tr]。 (defpartial column-settings-layout [&
执行 vagrant up 时出现此错误: anr@anr-Lenovo-G505s ~ $ vagrant up Bringing machine 'default' up with 'virtua
# ################################################# # Subroutine to add data to the table Blas
我想创建一个检查特定日期格式的读取主机。此外,目标是检查用户输入是否正确,如果不正确,则提示应再次弹出。 当我刚接触编程时,发现了这段代码,这似乎很合适。我仍然在努力“直到” do {
我关注这个tutorial在谷歌云机器学习引擎上进行培训。我一步一步地跟着它,但是在将 ml 作业提交到云时我遇到了错误。我运行了这个命令。 sam@sam-VirtualBox:~/models/r
我是一名优秀的程序员,十分优秀!