- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Pyspark 和来自 graphframes 的预凝胶包装器实现来自 Rocha & Thatte ( http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf ) 的算法。在这里,我遇到了消息聚合的正确语法问题。
这个想法是直截了当的:
...In each pass, each active vertex of G sends a set of sequences of vertices to its out- neighbours as described next. In the first pass, each vertex v sends the message (v) to all its out- neighbours. In subsequent iterations, each active vertex v appends v to each sequence it received in the previous iteration. It then sends all the updated sequences to its out-neighbours. If v has not received any message in the previous iteration, then v deactivates itself. The algorithm terminates when all the vertices have been deactivated. ...
我的想法是将顶点 ID 发送到目标顶点 (dst),然后在聚合函数中将它们收集到一个列表中。然后在我的顶点列“序列”中,我想将这个新列表项与现有列表项附加/合并,然后检查 when 语句是否当前顶点 ID 已经在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但是我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实现了类似的东西?
我当前的代码
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when
from graphframes import GraphFrame
from graphframes.lib import *
SimpleCycle=[
("1","2"),
("2","3"),
("3","4"),
("4","5"),
("5","2"),
("5","6")
]
edges = sqlContext.createDataFrame(SimpleCycle,["src","dst"]) \
.withColumn("self_loop",when(col("src")==col("dst"),True).otherwise(False))
edges.show()
+---+---+---------+
|src|dst|self_loop|
+---+---+---------+
| 1| 2| false|
| 2| 3| false|
| 3| 4| false|
| 4| 5| false|
| 5| 2| false|
| 5| 6| false|
+---+---+---------+
vertices=edges.select("src").union(edges.select("dst")).distinct().distinct().withColumnRenamed('src', 'id')
#vertices = spark.createDataFrame([[1], [2], [3], [4],[5],[6],[7],[8],[9]], ["id"])
#vertices.sort("id").show()
graph = GraphFrame(vertices, edges)
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""),lit("logic to be added")) \
.withVertexColumn("sequence", lit(""),Pregel.msg()) \
.sendMsgToDst(Pregel.src("id")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
cycles.show()
+---+-----------------+--------+
| id| is_cycle|sequence|
+---+-----------------+--------+
| 3|logic to be added| [2]|
| 5|logic to be added| [4]|
| 6|logic to be added| [5]|
| 1|logic to be added| null|
| 4|logic to be added| [3]|
| 2|logic to be added| [5, 1]|
+---+-----------------+--------+
代码行不通但是我觉得逻辑应该是这样
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""), \
when(Pregel.src("id").isin(Pregel.src(sequence)),True).otherwise(False) \
.withVertexColumn("sequence", lit("null"),Append_To_Existing_List(Pregel.msg()) \
.sendMsgToDst(
when(Pregel.src("sequence").isNull(),Pregel.src("id")) \
.otherwise(Pregel.src("sequence")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
# I would like to have a result like
+---+-----------------+---------+
| id| is_cycle|sequence |
+---+-----------------+---------+
| 1|false | [1] |
| 2|true |[2,3,4,5]|
| 3|true |[2,3,4,5]|
| 4|true |[2,3,4,5]|
| 5|true |[2,3,4,5]|
| 6|false | null |
+---+-----------------+---------+
最佳答案
最后我实现了 Rocha-Thatte 算法,不是通过预凝胶,而是通过底层graphframe/graphX的消息聚合功能。如果有人感兴趣,我想分享解决方案
这个解决方案工作正常,可以处理非常大的图而不会失败然而,如果周期长度或图表很长,它会变得很慢。现在不确定如何改进。可能以智能方式使用检查点或广播
对任何改进意见感到高兴
# spark modules
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f
# graphframes modules
from graphframes import GraphFrame
from graphframes.lib import *
AM=AggregateMessages
def find_cycles(sqlContext,sc,vertices,edges,max_iter=100000):
# Cycle detection via message aggregation
"""
This code is an implementation of the Rocha-Thatte algorithm for large-scale sparce graphs
Source:
==============
wiki: https://en.wikipedia.org/wiki/Rocha%E2%80%93Thatte_cycle_detection_algorithm
paper: https://www.researchgate.net/publication/283642998_Distributed_cycle_detection_in_large-scale_sparse_graphs
The basic idea:
===============
We propose a general algorithm for detecting cycles in a directed graph G by message passing among its vertices,
based on the bulk synchronous message passing abstraction. This is a vertex-centric approach in which the vertices
of the graph work together for detecting cycles. The bulk synchronous parallel model consists of a sequence of iterations,
in each of which a vertex can receive messages sent by other vertices in the previous iteration, and send messages to other
vertices.
In each pass, each active vertex of G sends a set of sequences of vertices to its out- neighbours as described next.
In the first pass, each vertex v sends the message (v) to all its out- neighbours. In subsequent iterations, each active vertex v
appends v to each sequence it received in the previous iteration. It then sends all the updated sequences to its out-neighbours.
If v has not received any message in the previous iteration, then v deactivates itself. The algorithm terminates when all the
vertices have been deactivated.
For a sequence (v1, v2, . . . , vk) received by vertex v, the appended sequence is not for- warded in two cases: (i) if v = v1,
then v has detected a cycle, which is reported (see line 9 of Algorithm 1); (ii) if v = vi for some i ∈ {2, 3, . . . , k},
then v has detected a sequence that contains the cycle (v = vi, vi+1, . . . , vk, vk+1 = v); in this case,
the sequence is discarded, since the cycle must have been detected in an earlier iteration (see line 11 of Algorithm 1);
to be precise, this cycle must have been detected in iteration k − i + 1. Every cycle (v1, v2, . . . , vk, vk+1 = v1)
is detected by all vi,i = 1 to k in the same iteration; it is reported by the vertex min{v1,...,vk} (see line 9 of Algorithm 1).
The total number of iterations of the algorithm is the number of vertices in the longest path in the graph, plus a few more steps
for deactivating the final vertices. During the analysis of the total number of iterations, we ignore the few extra iterations
needed for deactivating the final vertices and detecting the end of the computation, since it is O(1).
Pseudocode of the algorithm:
============================
M(v): Message received from vertex v
N+(v): all dst verties from v
functionCOMPUTE(M(v)):
if i=0 then:
for each w ∈ N+(v) do:
send (v) to w
else if M(v) = ∅ then:
deactivate v and halt
else:
for each (v1,v2,...,vk) ∈ M(v) do:
if v1 = v and min{v1,v2,...,vk} = v then:
report (v1 = v,v2,...,vk,vk+1 = v)
else if v not ∈ {v2,...,vk} then:
for each w ∈ N+(v) do:
send (v1,v2,...,vk,v) to w
Scalablitiy of the algorithm:
============================
the number of iteration depends on the path of the longest cycle
the scaling it between
O(log(n)) up to maxium O(n) where n=number of vertices
so the number of iterations is less to max linear to the number of vertices,
if there are more edges (parallel etc.) it will not affect the the runtime
for more details please refer to the oringinal publication
"""
_logger.warning("+++ find_cycles(): starting cycle search ...")
# create emtpy dataframe to collect all cycles
cycles = sqlContext.createDataFrame(sc.emptyRDD(),StructType([StructField("cycle",ArrayType(StringType()),True)]))
# initialize the messege column with own source id
init_vertices=(vertices
.withColumn("message",f.array(f.col("id")))
)
init_edges=(edges
.where(f.col("src")!=f.col("dst"))
.select("src","dst")
)
# create graph object that will be update each iteration
gx = GraphFrame(init_vertices, init_edges)
# iterate until max_iter
# max iter is used in case that the3 break condition is never reached during this time
# defaul value=100.000
for iter_ in range(max_iter):
# message that should be send to destination for aggregation
msgToDst = AM.src["message"]
# aggregate all messages that where received into a python set (drops duplicate edges)
agg = gx.aggregateMessages(
f.collect_set(AM.msg).alias("aggMess"),
sendToSrc=None,
sendToDst=msgToDst)
# BREAK condition: if no more messages are received all cycles where found
# and we can quit the loop
if(len(agg.take(1))==0):
#print("THE END: All cycles found in " + str(iter_) + " iterations")
break
# apply the alorithm logic
# filter for cycles that should be reported as found
# compose new message to be send for next iteration
# _column name stands for temporary columns that are only used in the algo and then dropped again
checkVerties=(
agg
# flatten the aggregated message from [[2]] to [] in order to have proper 1D arrays
.withColumn("_flatten1",f.explode(f.col("aggMess")))
# take first element of the array
.withColumn("_first_element_agg",f.element_at(f.col("_flatten1"), 1))
# take minimum element of th array
.withColumn("_min_agg",f.array_min(f.col("_flatten1")))
# check if it is a cycle
# it is cycle when v1 = v and min{v1,v2,...,vk} = v
.withColumn("_is_cycle",f.when(
(f.col("id")==f.col("_first_element_agg")) &
(f.col("id")==f.col("_min_agg"))
,True)
.otherwise(False)
)
# pick cycle that should be reported=append to cylce list
.withColumn("_cycle_to_report",f.when(f.col("_is_cycle")==True,f.col("_flatten1")).otherwise(None))
# sort array to have duplicates the same
.withColumn("_cycle_to_report",f.sort_array("_cycle_to_report"))
# create column where first array is removed to check if the current vertices is part of v=(v2,...vk)
.withColumn("_slice",f.array_except(f.col("_flatten1"), f.array(f.element_at(f.col("_flatten1"), 1))))
# check if vertices is part of the slice and set True/False column
.withColumn("_is_cycle2",f.lit(f.size(f.array_except(f.array(f.col("id")), f.col("_slice"))) == 0))
)
#print("checked Vertices")
#checkVerties.show(truncate=False)
# append found cycles to result dataframe via union
cycles=(
# take existing cycles dataframe
cycles
.union(
# union=append all cyles that are in the current reporting column
checkVerties
.where(f.col("_cycle_to_report").isNotNull())
.select("_cycle_to_report")
)
)
# create list of new messages that will be send in the next iteration to the vertices
newVertices=(
checkVerties
# append current vertex id on position 1
.withColumn("message",f.concat(
f.coalesce(f.col("_flatten1"), f.array()),
f.coalesce(f.array(f.col("id")), f.array())
))
# only send where it is no cycle duplicate
.where(f.col("_is_cycle2")==False)
.select("id","message")
)
print("vertics to send forward")
newVertices.sort("id").show(truncate=False)
# cache new vertices using workaround for SPARK-1334
cachedNewVertices = AM.getCachedDataFrame(newVertices)
# update graphframe object for next round
gx = GraphFrame(cachedNewVertices, gx.edges)
# materialize results and get number of found cycles
#cycles_count=cycles.persist().count()
_cycle_statistics=(
cycles
.withColumn("cycle_length",f.size(f.col("cycle")))
.agg(f.count(f.col("cycle")),f.max(f.col("cycle_length")),f.min(f.col("cycle_length")))
).collect()
cycle_statistics={"count":_cycle_statistics[0]["count(cycle)"],"max":_cycle_statistics[0]["max(cycle_length)"],"min":_cycle_statistics[0]["min(cycle_length)"]}
end_time =time.time()
_logger.warning("+++ find_cycles(): " + str(cycle_statistics["count"]) + " cycles found in " + str(iter_) + " iterations (min length=" + str(cycle_statistics["min"]) +", max length="+ str(cycle_statistics["max"]) +") in " + str(end_time-start_time) + " seconds")
_logger.warning("+++ #########################################################################################")
return cycles, cycle_statistics
这个函数采用像这样的图形
简单循环:
嵌套循环:
SimpleCycle=[
("0","1"),
("1","2"),
("2","3"),
("3","4"),
("3","1")]
NestedCycle=[
("1","2"),
("2","3"),
("3","4"),
("4","1"),
("3","1"),
("5","1"),
("5","2")]
edges = sqlContext.createDataFrame(SimpleCycle,["src","dst"])
vertices=edges.select("src").union(edges.select("dst")).distinct().distinct().withColumnRenamed('src', 'id')
edges.show()
# +---+---+
# |src|dst|
# +---+---+
# | 1| 2|
# | 2| 3|
# | 3| 4|
# | 4| 1|
# | 3| 1|
# | 5| 1|
# | 5| 2|
# +---+---+
raw_cycles=find_cycles(sqlContext,sc,vertices,edges,max_iter=1000)
raw_cycles.show()
# +------------+
# | cycle|
# +------------+
# | [1, 2, 3]|
# |[1, 2, 3, 4]|
#+------------+
关于pyspark - 如何使用 pyspark graphframe pregel API 实现循环检测,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58895858/
我已经设置了 Azure API 管理服务,并在自定义域上配置了它。在 Azure 门户中 API 管理服务的配置部分下,我设置了以下内容: 因为这是一个客户端系统,我必须屏蔽细节,但以下是基础知识:
我是一名习惯 React Native 的新程序员。我最近开始学习 Fetch API 及其工作原理。我的问题是,我找不到人们使用 API key 在他们的获取语句中访问信息的示例(我很难清楚地表达有
这里有很多关于 API 是什么的东西,但是我找不到我需要的关于插件 API 和类库 API 之间的区别。反正我不明白。 在 Documenting APIs 一书中,我读到:插件 API 和类库 AP
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve thi
我正在尝试找出设计以下场景的最佳方法。 假设我已经有了一个 REST API 实现,它将从不同的供应商那里获取书籍并将它们返回给我自己的客户端。 每个供应商都提供单独的 API 来向其消费者提供图书。
请有人向我解释如何使用 api key 以及它有什么用处。 我对此进行了很多搜索,但得到了不同且相互矛盾的答案。有人说 API key 是保密的,它从不作为通信的一部分发送,而其他人则将它发送给客户端
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 4年前关闭。 Improve this
谁能告诉我为什么 WSo2 API 管理器不进行身份验证?我已经设置了两个 WSo2 API Manager 1.8.0 实例并创建了一个 api。它作为原型(prototype) api 工作正常。
我在学习 DSL 的过程中遇到了 Fluent API。 我在流利的 API 上搜索了很多……我可以得出的基本结论是,流利的 API 使用方法链来使代码流利。 但我无法理解——在面向对象的语言中,我们
基本上,我感兴趣的是在多个区域设置 WSO2 API 管理器;例如亚洲、美国和欧洲。一些 API 将部署在每个区域的数据中心内,而其他 API 将仅部署在特定区域内。 理想情况下,我想要的是一个单一的
我正在构建自己的 API,供以下用户使用: 1) 安卓应用 2) 桌面应用 我的网址之一是:http://api.chatapp.info/order_api/files/getbeers.php我的
我需要向所有用户显示我的站点的分析,但使用 OAuth 它显示为登录用户配置的站点的分析。如何使用嵌入 API 实现仪表板但仅显示我的网站分析? 我能想到的最好的可能性是使用 API key 而不是客
我正在研究大公司如何管理其公共(public) API。我想到的是拥有成熟 API 的公司,例如 Google、Facebook、Twitter 和 Amazon。 这些公司向公众公开了许多不同的 A
在定义客户可访问的 API 时,以下是首选的行业惯例: a) 定义一组显式 API 方法,每个方法都有非常狭窄和特定的目的,例如: SetUserName SetUserAge Se
这在本地 deserver 和部署时都会发生。我成功地能够通过留言簿教程使用 API 资源管理器,但现在我已经创建了自己的项目并尝试访问我编写的第一个 API,它从未出现过。搜索栏旁边的黄色“正在加载
我正在尝试使用 http://ip-api.com/ api通过我的ip地址获取经度和纬度。当我访问 http://ip-api.com/json从我的浏览器或使用 curl,它以 json 格式返回
这里的典型示例是 Twitter 的 API。我从概念上理解 REST API 的工作原理,本质上它只是针对您的特定请求向他们的服务器查询,然后您会在其中收到响应(JSON、XML 等),很棒。 但是
我能想到的最好的标题,但要澄清的是,情况是这样的: 我正在开发一种类似短 url 的服务,该服务允许用户使用他们的 Twitter 帐户“登录”并发布内容。现在这项服务可以包含在 Tweetdeck
我正在设计用于管理评论和讨论线程的 API 方案。我想有一个点 /discussions/:discussionId 当您GET 时,它会返回一组评论和一些元数据。评论也许可以单独访问 /discus
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭去年。 Improve this quest
我是一名优秀的程序员,十分优秀!