- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个具有以下结构的 Spark 数据框。 bodyText_token 具有标记(已处理/单词集)。我有一个定义关键字的嵌套列表
root
|-- id: string (nullable = true)
|-- body: string (nullable = true)
|-- bodyText_token: array (nullable = true)
keyword_list=[['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]
我需要检查每个关键字列表下有多少标记,并将结果添加为现有数据框的新列。例如:if tokens =["become", "farmer","rally","workers","student"]
结果将是 -> [1,2,0]
以下功能按预期工作。
def label_maker_topic(tokens,topic_words):
twt_list = []
for i in range(0, len(topic_words)):
count = 0
#print(topic_words[i])
for tkn in tokens:
if tkn in topic_words[i]:
count += 1
twt_list.append(count)
return twt_list
我在 withColumn
下使用 udf 访问该函数,但出现错误。我认为这是关于将外部列表传递给 udf。有没有一种方法可以将外部列表和数据框列传递给 udf 并向我的数据框添加一个新列?
topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))
最佳答案
最干净的解决方案是使用闭包传递额外的参数:
def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words))
df = sc.parallelize([(["union"], )]).toDF(["tokens"])
(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
.show())
这不需要对 keyword_list
或您用 UDF 包装的函数进行任何更改。您还可以使用此方法传递任意对象。这可用于传递例如 sets
列表以进行高效查找。
如果您想使用当前的 UDF 并直接传递 topic_words
,您必须先将其转换为列文字:
from pyspark.sql.functions import array, lit
ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
根据您的数据和要求,可以选择更高效的解决方案,这些解决方案不需要 UDF(分解 + 聚合 + 折叠)或查找(哈希 + 向量运算)。
关于python - 将数据框列和外部列表传递给 withColumn 下的 udf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37409857/
我有一个数据框(mydf),如下所示: +---+---+---+---+ | F1| F2| F3| F4| +---+---+---+---+ | t| y4| 5|1.0| | x| y
我在spark中写了一些代码如下: val df = sqlContext.read.json("s3n://blah/blah.gz").repartition(200) val newdf = d
我有一个包含 2 列的数据框:account_id 和 email_address,现在我想再添加一列 updated_email_address,我称之为email_address 上的函数以获取
我注意到我的代码存储库警告我在 for/while 循环中使用 withColumn 是一种反模式。为什么不推荐这样做?这不是PySpark API的正常使用吗? 最佳答案 我们在实践中注意到,在 f
在我使用 RDD 进行了几个项目之后,我开始使用数据集。我正在使用 Java 进行开发。 据我了解,列是不可变的 - 列没有映射函数,映射列的标准方法是使用 withColumn 添加列。 我的问题是
这个问题已经有答案了: Multiple condition filter on dataframe (2 个回答) 已关闭 3 年前。 我是 Pyspark 新手 我有这段代码: df2 = df.
我有一个 df,其中包含一列 type,并且我有两个列表 women = ['0980981', '0987098'] men = ['1234567', '4567854'] 现在我想根据 type
我正在为某些要求创建一个空数据框,当我在其上调用 withColumn 函数时,我得到了列,但数据为空,如下所示- schema = StructType([]) df = sqlContext.cr
我有一个包含列“col1”和“col2”的数据框 df。我想创建第三列,它使用其中一列作为指数函数。 df = df.withColumn("col3", 100**(df("col1")))*df(
我有一些使用 的原型(prototype) Scala 代码 .withColumn("column_name_dod", $"column_name".getItem("dod")) 我知道with
如何在多个 when 条件下实现以下目标。 from pyspark.sql import functions as F df = spark.createDataFrame([(5000, 'US'
当多个 withColumn 时,Spark 是执行一次还是多次传递数据?函数是链式的? 例如: val dfnew = df.withColumn("newCol1", f1(col("a")))
我正在使用 Spark 和 PySpark。我正在尝试实现等效于以下伪代码的结果: df = df.withColumn('new_column', IF fruit1 == fruit2 T
我有一个 DataFrame,它有多个列,其中一些是结构。像这样的事情 root |-- foo: struct (nullable = true) | |-- bar: string (n
今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了相当奇怪的问题。 我有一个 UDF(),计算 2 点之间的距离 private static UDF4 calcDistan
我有一个用户定义函数的问题,该函数是为连接来自一个数据帧的值而构建的,该数据帧与来自另一个数据帧的索引值相匹配。 以下是我尝试匹配的简化数据框: a_df: +-------+------+ | in
我有这样一个数据集: a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"]) 我想要一个数据集,它添加一
我有一个具有以下结构的 Spark 数据框。 bodyText_token 具有标记(已处理/单词集)。我有一个定义关键字的嵌套列表 root |-- id: string (nullable =
如果我有一个名为 df 的 DataFrame,它看起来像: +---+---+ | a1+ a2| +---+---+ |foo|bar| |N/A|baz| +---+---+ 我期望: val
我使用的是 UCI 的成人年收入。 我有一个数据框,其中一列中有一个类别变量,我想将其分组为不同的类别(一些常见的特征工程)。 df.groupBy('education').count().show
我是一名优秀的程序员,十分优秀!