python - 创建表示 3 位二进制字符串的所有组合的最小图形

我有一个算法可以创建一个图形,该图形具有以最短图形路径形式编码的 3 位二进制字符串的所有表示,其中路径中的偶数表示 0,而奇数表示 1:

from itertools import permutations, product
import networkx as nx
import progressbar
import itertools

def groups(sources, template):
func = permutations
keys = sources.keys()
combos = [func(sources[k], template.count(k)) for k in keys]
for t in product(*combos):
d = {k: iter(n) for k, n in zip(keys, t)}
yield [next(d[k]) for k in template]

g = nx.Graph()

added = []
good = []
index = []
# I create list with 3-bit binary strings
# I do not include one of the pairs of binary strings that have a mirror image
list_1 = [list(i) for i in itertools.product(tuple(range(2)), repeat=3) if tuple(reversed(i)) >= tuple(i)]
count = list(range(len(list_1)))

h = 0
while len(added) < len(list_1):
# In each next step I enlarge the list 'good` by the next even and odd number
if h != 0:
for q in range(2):
good.append([i for i in good if i%2 == q][-1] + 2)
# I create a list `c` with string indices from the list` list_1`, that are not yet used.
# Whereas the `index` list stores the numbering of strings from the list` list_1`, whose representations have already been correctly added to the `added` list.
c = [item for item in count if item not in index]

for m in c:
# I create representations of binary strings, where 0 is 'v0' and 1 is 'v1'. For example, the '001' combination is now 'v0v0v1'
a = ['v{}'.format(x%2) for x in list_1[m]]

if h == 0:
for w in range(2):
if len([i for i in good if i%2 == w]) < a.count('v{}'.format(w)):
for j in range(len([i for i in good if i%2 == w]), a.count('v{}'.format(w))):
good.insert(j,2*j + w)

for x in range(2):
sources["v{0}".format(x)] = [n for n in good if n%2 == x]
# for each representation in the form 'v0v0v1' for example, I examine all combinations of strings where 'v0' is an even number 'a' v1 'is an odd number, choosing values from the' dobre2 'list and checking the following conditions.
for aaa_binary in groups(sources, a):
# Here, the edges and nodes are added to the graph from the combination of `aaa_binary` and checking whether the combination meets the conditions. If so, it is added to the `added` list. If not, the newly added edges are removed and the next `aaa_binary` combination is taken.
g.add_nodes_from (aaa_binary)
t1 = (aaa_binary[0],aaa_binary[1])
t2 = (aaa_binary[1],aaa_binary[2])

added_now = []
for edge in (t1,t2):
if not g.has_edge(*edge):


for f in range(len(added)):
if nx.shortest_path(g, aaa_binary[0], aaa_binary[2]) != aaa_binary or nx.shortest_path(g, added[f][0], added[f][2]) != added[f]:
for edge in added_now:
# Calling a good combination search interrupt if it was found and the result added to the `added` list, while the index from the list 'list_1` was added to the` index` list
if m in index:


h = h+1

表示来自 added 的 3 位二进制字符串的输出路径:
[[0, 2, 4], [0, 2, 1], [2, 1, 3], [1, 3, 5], [0, 3, 6], [3, 0, 7]]

所以这些是 3 位二进制字符串的表示:
[[0, 0, 0], [0, 0, 1], [0, 1, 1], [1, 1, 1], [0, 1, 0], [1, 0, 1]]

在哪一步 h = 0找到前 4 个子列表,并且在步骤 h = 1 中添加了最后两个子列表。



enter image description here


现在我想在 for m in c 上使用多处理循环,因为查找元素的顺序在这里无关紧要。

from multiprocessing import Pool

added = []

def foo(i):
added = []
# do something
return added

if __name__ == '__main__':

h = 0
while len(added)<len(c):

pool = Pool(4)
result = pool.imap_unordered(foo, c)


h = h + 1

多处理发生在 while 循环和 foo 中。函数, added列表已创建。在每个后续步骤中 h在循环中,列表 added应按后续值递增,当前列表 added应该在函数 foo 中使用.是否可以在循环的每个后续步骤中将列表的当前内容传递给函数?因为在上面的代码中, foo函数创建 added 的新内容每次都从头开始列出。如何解决这个问题?

[[0, 2, 4], [0, 2, 1], [2, 1, 3], [1, 3, 5], [0, 1, 2], [1, 0, 3]]

因为对于这样的图,节点和边,不满足条件 nx.shortest_path (graph, i, j) == added[k]对于每个最终节点 i, j来自 added[k] for k in added list .

去哪里 h = 0到元素 [0, 2, 4], [0, 2, 1], [2, 1, 3], [1, 3, 5]很好,而在步骤中添加的元素 h = 1 , 即 [0, 1, 2], [1, 0, 3]显然可以在不影响上一步中的元素的情况下找到。


我意识到这是一种顺序算法,但我也对部分解决方案感兴趣,即即使在算法的某些部分也可以并行处理。例如, h的步骤while 循环按顺序运行,但 for m in c循环是多处理的。或其他部分解决方案,可以改进更大组合的整个算法。



我认为您不能像目前那样并行化代码。您想要并行化的部分,for m in c循环访问三个全局列表 good , addedindex和图形g本身。您可以使用 multiprocessing.Array对于列表,但这会破坏并行化的整个点,如 multiprocessing.Array ( docs ) 是同步的,因此进程实际上不会并行运行。


  • 初始化以设置需要执行的作业队列(按顺序运行)
  • 有一个 worker 池,它们都从该队列中提取作业(并行运行)
  • 作业队列用完后,汇总结果并构建最终解决方案(按顺序运行)

  • 在这种情况下 1.将是 list_1 的设置代码, count可能还有 h == 0案件。之后,您将构建一个“作业订单”队列,这将是 c列表 -> 将该列表传递给一群 worker -> 返回结果并汇总。问题是每次执行 for m in c loop 可以访问全局状态,并且每次迭代后全局状态都会发生变化。这在逻辑上意味着您不能并行运行代码,因为第一次迭代会更改全局状态并影响第二次迭代的操作。也就是说,根据定义,一种顺序算法。您不能(至少不容易)并行化迭代构建图的算法。

    您可以使用 multiprocessing.starmapmultiprocessing.Array ,但这并不能解决问题。你还有图 g这也在所有进程之间共享。所以整个事情需要以这样的方式重构,每次迭代 for m in c循环独立于该循环的任何其他迭代,或者必须更改整个逻辑,以便 for m in c不需要循环开始。



    目前,对于新的三元组(例如 '101'),您将在现有图中生成所有可能的连接点,然后将新的三元组添加到图中并根据测量最短路径消除节点。这需要检查图形上的最短路径并进行修改,这会阻止并行化。


    注 2:在下面的讨论中 '101' (注意引号 '' 是二进制字符串, '00''1' 也是如此,其中 104 等(图中没有引号)是顶点标签。

  • 生成 job_queue其中包含所有三元组
  • 取第一个并插入,例如,'000'这将是 ( 0 , 2 , 4 ) - 这是微不足道的,无需检查任何内容,因为在您开始时图形是空的,因此最短路径是根据定义您插入的路径。

  • 此时,您还有 '011' 的部分路径, '001' , '010'相反( '110''001' 因为图是无向的)。我们将利用现有图包含 job_queue 中剩余三元组的子解决方案这一事实。 .假设下一个三元组是 '010' ,您遍历二进制字符串 '010'list('010')
  • 如果 '0' 的路径/顶点图中已存在 --> 继续
  • 如果 '01' 的路径/顶点图中已存在 --> 继续
  • 如果 '010' 的路径/顶点存在你已经完成了,不需要添加任何东西(这实际上是一个失败案例:'010' 不应该再出现在作业队列中,因为它已经被解决了)。

  • 第二个要点会失败,因为 '01'图中不存在。插入 '1'在这种情况下将是节点 1到图形并将其连接到三个 even 之一节点,我认为哪个节点并不重要,但您必须记录它连接到哪个节点,假设您选择了 0 .该图现在看起来像
     0 - 2 - 4
    \ *
    \ *

    完成路径的最佳边是 1 - 2 (标有星号)获取路径 0 - 1 - 2'010' ,如果边 1-2,这是最大化编码的三元组数量的路径被添加到图形中。如果您添加 1-4你只编码 '010'三重,如 1 - 2编码 '010'还有 '001''100' .

    顺便说一句,假设您已连接 12起初,而不是 0 (第一个连接是随机选择的),你现在有一个图表
     0 - 2 - 4

    您可以连接 14或到 0 ,但您再次得到一个图形,该图形对 job_queue 中剩余的最大三元组数进行编码。 .

    那么如何检查潜在的新路径编码了多少个三元组呢?您可以相对轻松地检查这一点,更重要的是,检查可以在不修改图形的情况下并行完成 g ,对于 3 位字符串,并行的节省不是那么大,但对于 32 位字符串,它们会。这是它的工作原理。
  • (顺序)从子路径 0-1 生成所有可能的完整路径-> (0-1-2), (0-1-4) .
  • (并行)对于每个潜在的完整路径,检查该路径解决了多少个其他三元组,即对于每个路径候选生成图解决的所有三元组,并检查这些三元组是否仍在 job_queue 中。 .
  • (0-1-2)解决另外两个三元组 '001' (4-2-1) or (2-0-1)'100' (1-0-2) or (1-2-4) .
  • (0-1-4)只解决了三重'010' ,即它本身

  • 解决 job_queue 中剩余的最多三元组的边/路径是最佳解决方案(我没有证据)。

    你跑 2.以上并行复制图形到每个 worker 。因为您没有修改图形,只检查它解决了多少个三元组,所以您可以并行执行此操作。每个 worker 都应该有一个类似的签名
    check_graph(g, path, copy_of_job_queue):
    # do some magic
    return (n_paths_solved, paths_solved)
    path要么是 (0-1-2)(0-1-4) , copy_of_job_queue应该是 job_queue 上剩余路径的副本.对于 K 个 worker ,您创建 K 个队列副本。一旦工作池完成,您就知道哪条路径 (0-1-2)(0-1-4)解决最多的三元组。


    冲洗 - 重复直到作业队列为空。

    上面有一些明显的问题,其中之一是您对 job_queue 进行了大量复制和循环。 , 如果您要处理大位空间,例如 32 位,则 job_queue很长,所以你可能不想继续复制给所有的 worker 。

    对于上面 (2.) 的并行步骤,您可能需要 job_queue实际上是 dict其中关键是三元组,比如 '010' ,该值是一个 bool 标志,表示该三元组是否已在图中编码。

    关于python - 创建表示 3 位二进制字符串的所有组合的最小图形,我们在Stack Overflow上找到一个类似的问题:

