- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试解决这个 super 简单的问题,但我已经厌倦了,我希望有人能帮我解决这个问题。我有一个形状像这样的数据框:
---------------------------| Category | Product_ID ||------------+------------+| a | product 1 || a | product 2 || a | product 3 || a | product 1 || a | product 4 || b | product 5 || b | product 6 |---------------------------
How do i group these rows by category and apply complicated function in Scala?Maybe something like this:
val result = df.groupBy("Category").apply(myComplexFunction)
+--------------------------------------------------+| | Product_1 | Product_2 | Product_3 |+------------+------------+------------------------+| Product_1 | 1.0 | 0.1 | 0.8 || Product_2 | 0.1 | 1.0 | 0.5 || Product_3 | 0.8 | 0.5 | 1.0 |+--------------------------------------------------+
Here is the function i want to apply (it is just computing item-item cosine similarity within each category):
def myComplexFunction(context_data : DataFrame, country_name: String,
context_id: String, table_name_correlations: String,
context_layer: String, context_index: String) : Boolean = {
val unique_identifier = country_name + "_" + context_layer + "_" + context_index
val temp_table_vocabulary = "temp_vocabulary_" + unique_identifier
val temp_table_similarities = "temp_similarities_" + unique_identifier
val temp_table_correlations = "temp_correlations_" + unique_identifier
//context.count()
// fit a CountVectorizerModel from the corpus
//println("Creating sparse incidence matrix")
val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").fit(context_data)
val incidence = cvModel.transform(context_data)
// ========================================================================================
// create dataframe of mapping from indices into the item id
//println("Creating vocabulary")
val vocabulary_rdd = sc.parallelize(cvModel.vocabulary)
val rows_vocabulary_rdd = vocabulary_rdd.zipWithIndex.map{ case (s,i) => Row(s,i)}
val vocabulary_field1 = StructField("Product_ID", StringType, true)
val vocabulary_field2 = StructField("Product_Index", LongType, true)
val schema_vocabulary = StructType(Seq(vocabulary_field1, vocabulary_field2))
val df_vocabulary = hiveContext.createDataFrame(rows_vocabulary_rdd, schema_vocabulary)
// ========================================================================================
//println("Computing similarity matrix")
val myvectors = incidence.select("features").rdd.map(r => r(0).asInstanceOf[Vector])
val mat: RowMatrix = new RowMatrix(myvectors)
val sims = mat.columnSimilarities(0.0)
// ========================================================================================
// Convert records of the Matrix Entry RDD into Rows
//println("Extracting paired similarities")
val rowRdd = sims.entries.map{case MatrixEntry(i, j, v) => Row(i, j, v)}
// ========================================================================================
// create dataframe schema
//println("Creating similarity dataframe")
val field1 = StructField("Product_Index", LongType, true)
val field2 = StructField("Neighbor_Index", LongType, true)
var field3 = StructField("Similarity_Score", DoubleType, true)
val schema_similarities = StructType(Seq(field1, field2, field3))
// create the dataframe
val df_similarities = hiveContext.createDataFrame(rowRdd, schema_similarities)
// ========================================================================================
//println("Register vocabulary and correlations as spark temp tables")
df_vocabulary.registerTempTable(temp_table_vocabulary)
df_similarities.registerTempTable(temp_table_similarities)
// ========================================================================================
//println("Extracting Product_ID")
val temp_corrs = hiveContext.sql(
s"SELECT T1.Product_ID, T2.Neighbor_ID, T1.Similarity_Score " +
s"FROM " +
s"(SELECT Product_ID, Neighbor_Index, Similarity_Score " +
s"FROM $temp_table_similarities LEFT JOIN $temp_table_vocabulary " +
s"WHERE $temp_table_similarities.Product_Index = $temp_table_vocabulary.Product_Index) AS T1 " +
s"LEFT JOIN " +
s"(SELECT Product_ID AS Neighbor_ID, Product_Index as Neighbor_Index FROM $temp_table_vocabulary) AS T2 " +
s"ON " +
s"T1.Neighbor_Index = T2.Neighbor_Index")
// ========================================================================================
val context_corrs = temp_corrs.withColumn("Context_Layer", lit(context_layer)).withColumn("Context_ID", lit(context_id)).withColumn("Country", lit(country_name))
context_corrs.registerTempTable(temp_table_correlations)
// ========================================================================================
hiveContext.sql(s"INSERT INTO TABLE $table_name_correlations SELECT * FROM $temp_table_correlations")
// ========================================================================================
// clean up environment
//println("Cleaning up temp tables")
hiveContext.dropTempTable(temp_table_correlations)
hiveContext.dropTempTable(temp_table_similarities)
hiveContext.dropTempTable(temp_table_vocabulary)
return true
}
val partitioned = tokenized.repartition(tokenized("context_id"))
val context_counts = partitioned.mapPartitions()
//val context_counts = model_code_ids.zipWithIndex.map{case (model_code_id, context_index) => compute_similarity(tokenized.filter(tokenized("context_id") === model_code_id), country_name, model_code_id.asInstanceOf[String], table_name_correlations, context_layer, context_index.toString)}
}
val category_ids = df.select("Category").distinct.collect()
val result = category_ids.map(category_id => myComplexFunction(df.filter(df("Category") <=> category_id)))
最佳答案
余弦相似度不是一个复杂的函数,可以使用标准 SQL 聚合来表示。让我们考虑以下示例:
val df = Seq(
("feat1", 1.0, "item1"),
("feat2", 1.0, "item1"),
("feat6", 1.0, "item1"),
("feat1", 1.0, "item2"),
("feat3", 1.0, "item2"),
("feat4", 1.0, "item3"),
("feat5", 1.0, "item3"),
("feat1", 1.0, "item4"),
("feat6", 1.0, "item4")
).toDF("feature", "value", "item")
feature
是特征标识符,
value
是一个对应的值和
item
是对象标识符和
feature
,
item
pair 只有一个对应的值。
val numer = df.as("this").withColumnRenamed("item", "this")
.join(df.as("other").withColumnRenamed("item", "other"), Seq("feature"))
.where($"this" < $"other")
.groupBy($"this", $"other")
.agg(sum($"this.value" * $"other.value").alias("dot"))
import org.apache.spark.sql.functions.sqrt
val norms = df.groupBy($"item").agg(sqrt(sum($"value" * $"value")).alias("norm"))
val cosine = ($"dot" / ($"this_norm.norm" * $"other_norm.norm")).as("cosine")
val similarities = numer
.join(norms.alias("this_norm").withColumnRenamed("item", "this"), Seq("this"))
.join(norms.alias("other_norm").withColumnRenamed("item", "other"), Seq("other"))
.select($"this", $"other", cosine)
+-----+-----+-------------------+
| this|other| cosine|
+-----+-----+-------------------+
|item1|item4| 0.8164965809277259|
|item1|item2|0.40824829046386296|
|item2|item4| 0.4999999999999999|
+-----+-----+-------------------+
import org.apache.spark.sql.functions.array
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.Vectors
val pivoted = df.groupBy("item").pivot("feature").sum()
.na.fill(0.0)
.orderBy("item")
val mat = new IndexedRowMatrix(pivoted
.select(array(pivoted.columns.tail.map(col): _*))
.rdd
.zipWithIndex
.map {
case (row, idx) =>
new IndexedRow(idx, Vectors.dense(row.getSeq[Double](0).toArray))
})
mat.toCoordinateMatrix.transpose
.toIndexedRowMatrix.columnSimilarities
.toBlockMatrix.toLocalMatrix
0.0 0.408248290463863 0.0 0.816496580927726
0.0 0.0 0.0 0.4999999999999999
0.0 0.0 0.0 0.0
0.0 0.0 0.0 0.0
collected
) 集合上运行。 myComplexFunction
不能进一步分布式,因为它是分布式数据结构和上下文。 关于apache-spark - Spark Scala - 如何对数据帧行进行分组并将复杂函数应用于组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40681794/
初学者 android 问题。好的,我已经成功写入文件。例如。 //获取文件名 String filename = getResources().getString(R.string.filename
我已经将相同的图像保存到/data/data/mypackage/img/中,现在我想显示这个全屏,我曾尝试使用 ACTION_VIEW 来显示 android 标准程序,但它不是从/data/dat
我正在使用Xcode 9,Swift 4。 我正在尝试使用以下代码从URL在ImageView中显示图像: func getImageFromUrl(sourceUrl: String) -> UII
我的 Ubuntu 安装 genymotion 有问题。主要是我无法调试我的数据库,因为通过 eclipse 中的 DBMS 和 shell 中的 adb 我无法查看/data/文件夹的内容。没有显示
我正在尝试用 PHP 发布一些 JSON 数据。但是出了点问题。 这是我的 html -- {% for x in sets %}
我观察到两种方法的结果不同。为什么是这样?我知道 lm 上发生了什么,但无法弄清楚 tslm 上发生了什么。 > library(forecast) > set.seed(2) > tts lm(t
我不确定为什么会这样!我有一个由 spring data elasticsearch 和 spring data jpa 使用的类,但是当我尝试运行我的应用程序时出现错误。 Error creatin
在 this vega 图表,如果我下载并转换 flare-dependencies.json使用以下 jq 到 csv命令, jq -r '(map(keys) | add | unique) as
我正在提交一个项目,我必须在其中创建一个带有表的 mysql 数据库。一切都在我这边进行,所以我只想检查如何将我所有的压缩文件发送给使用不同计算机的人。基本上,我如何为另一台计算机创建我的数据库文件,
我有一个应用程序可以将文本文件写入内部存储。我想仔细看看我的电脑。 我运行了 Toast.makeText 来显示路径,它说:/数据/数据/我的包 但是当我转到 Android Studio 的 An
我喜欢使用 Genymotion 模拟器以如此出色的速度加载 Android。它有非常好的速度,但仍然有一些不稳定的性能。 如何从 Eclipse 中的文件资源管理器访问 Genymotion 模拟器
我需要更改 Silverlight 中文本框的格式。数据通过 MVVM 绑定(bind)。 例如,有一个 int 属性,我将 1 添加到 setter 中的值并调用 OnPropertyChanged
我想向 Youtube Data API 提出请求,但我不需要访问任何用户信息。我只想浏览公共(public)视频并根据搜索词显示视频。 我可以在未经授权的情况下这样做吗? 最佳答案 YouTube
我已经设置了一个 Twilio 应用程序,我想向人们发送更新,但我不想回复单个文本。我只是想让他们在有问题时打电话。我一切正常,但我想在发送文本时显示传入文本,以确保我不会错过任何问题。我正在使用 p
我有一个带有表单的网站(目前它是纯 HTML,但我们正在切换到 JQuery)。流程是这样的: 接受用户的输入 --- 5 个整数 通过 REST 调用网络服务 在服务器端运行一些计算...并生成一个
假设我们有一个名为 configuration.js 的文件,当我们查看内部时,我们会看到: 'use strict'; var profile = { "project": "%Projec
这部分是对 Previous Question 的扩展我的: 我现在可以从我的 CI Controller 成功返回 JSON 数据,它返回: {"results":[{"id":"1","Sourc
有什么有效的方法可以删除 ios 中 CBL 的所有文档存储?我对此有疑问,或者,如果有人知道如何从本质上使该应用程序像刚刚安装一样,那也会非常有帮助。我们正在努力确保我们的注销实际上将应用程序设置为
我有一个 Rails 应用程序,它与其他 Rails 应用程序通信以进行数据插入。我使用 jQuery $.post 方法进行数据插入。对于插入,我的其他 Rails 应用程序显示 200 OK。但在
我正在为服务于发布请求的 API 调用运行单元测试。我正在传递请求正文,并且必须将响应作为帐户数据返回。但我只收到断言错误 注意:数据是从 Azure 中获取的 spec.js const accou
我是一名优秀的程序员,十分优秀!