gpt4 book ai didi

python - 从重叠的池中挑选无序组合

转载 作者:行者123 更新时间:2023-12-03 00:51:45 27 4
gpt4 key购买 nike

我有值的池,我想通过从某些池中选择来生成每种可能的无序组合。

例如,我想从池0,池0和池1中进行选择:

>>> pools = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
>>> part = (0, 0, 1)
>>> list(product(*(pools[i] for i in part)))
[(1, 1, 2), (1, 1, 3), (1, 1, 4), (1, 2, 2), (1, 2, 3), (1, 2, 4), (1, 3, 2), (1, 3, 3), (1, 3, 4), (2, 1, 2), (2, 1, 3), (2, 1, 4), (2, 2, 2), (2, 2, 3), (2, 2, 4), (2, 3, 2), (2, 3, 3), (2, 3, 4), (3, 1, 2), (3, 1, 3), (3, 1, 4), (3, 2, 2), (3, 2, 3), (3, 2, 4), (3, 3, 2), (3, 3, 3), (3, 3, 4)]

通过从池0,池0和池1中进行选择,将生成每种可能的组合。

但是顺序对我来说并不重要,因此许多组合实际上是重复的。例如,由于我使用的是笛卡尔积,因此会生成 (1, 2, 4)(2, 1, 4)

我想出了一种缓解此问题的简单方法。对于从单个池中挑选的成员,我选择时不使用 combinations_with_replacement进行排序。我计算从每个池中抽奖的次数。代码如下:
cnt = Counter()
for ind in part: cnt[ind] += 1
blocks = [combinations_with_replacement(pools[i], cnt[i]) for i in cnt]
return [list(chain(*combo)) for combo in product(*blocks)]

如果我碰巧多次从同一个池中进行选择,这将减少重复项的排序。但是,所有池都有很多重叠,并且在合并的多个池上使用 combinations_with_replacement会生成一些无效的组合。有没有更有效的方法来生成无序组合?

编辑:有关输入的额外信息:零件和池的数量很小(〜5和〜20),为简单起见,每个元素都是整数。我已经解决了实际的问题,所以这只是出于学术目的。假设每个池中有成千上万个整数,但是有些池很小,只有几十个。因此,某种结合或交叉点似乎是可行的方法。

最佳答案

这并不是一个“答案”,而是促使人们更努力地思考的问题;-)为了具体起见,我将在操作上略微打扰的OP的代码包装在一个也可以消除重复项的函数中:

def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from itertools import chain, product
from collections import Counter

assert all(0 <= i < len(pools) for i in ixs)
seen = set()
cnt = Counter(ixs) # map index to count
blocks = [cwr(pools[i], count) for i, count in cnt.items()]
for t in product(*blocks):
t = tuple(sorted(chain(*t)))
if t not in seen:
seen.add(t)
yield t

我不担心在这里排序-它的内存效率很高,对于小元组来说,它可能比创建 Counter对象所涉及的所有开销都要快。

但是无论如何,这里的重点是强调通过重新构造问题以使用 combinations_with_replacement( cwr)来获得OP的实际值(value)。考虑以下输入:
N = 64
pools = [[0, 1]]
ixs = [0] * N

只有65个唯一结果,该函数会立即生成它们,而根本没有内部重复。另一方面,基本相同
pools = [[0, 1]] * N
ixs = range(N)

也有相同的65个唯一结果,但本质上是永远运行的(例如,到目前为止给出的其他答案),困扰着2 ** 64种可能性。 cwr在这里无济于事,因为每个池索引仅出现一次。

因此,对于“仅”从完整的笛卡尔积中淘汰重复项的任何解决方案,都有很大的改进空间,其中一些可以通过执行OP已经完成的操作来赢得。

在我看来,最有前途的方法是编写一个自定义生成器(不是一个主要依赖 itertools函数的生成器),该生成器首先以字典顺序生成所有可能性(因此,从构造上讲,不会创建任何重复项)。但这首先需要对输入池进行一些“全局”分析,而且我开始编写的代码变得比现在花时间去解决的复杂得多。

一个基于@ user2357112的答案

结合 cwr和@ user2357112的增量重复数据消除,可以得到一个简短的算法,该算法可以在我拥有的所有测试用例上快速运行。例如,上面的 [0, 1] ** 64示例的两个拼写基本上都是瞬时的,并在@Joseph Wood的答案结尾处运行该示例,其速度几乎与他所说的C++代码运行的速度相同(在Python 3.7.0下,我的代码框运行了0.35秒,并且,是的,找到162295个结果):
def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from collections import Counter

assert all(0 <= i < len(pools) for i in ixs)
result = {()}
for i, count in Counter(ixs).items():
result = {tuple(sorted(old + new))
for new in cwr(pools[i], count)
for old in result}
return result

为了使其他Pythonista使用者更容易尝试上一个示例,以下是可执行Python的输入:
pools = [[1, 10, 14, 6],
[7, 2, 4, 8, 3, 11, 12],
[11, 3, 13, 4, 15, 8, 6, 5],
[10, 1, 3, 2, 9, 5, 7],
[1, 5, 10, 3, 8, 14],
[15, 3, 7, 10, 4, 5, 8, 6],
[14, 9, 11, 15],
[7, 6, 13, 14, 10, 11, 9, 4]]
ixs = range(len(pools))

但是,OP后来补充说,它们通常有大约20个池,每个池包含数千个元素。 1000 ** 20 = 1e60对于构建完整笛卡尔积的任何方法都无法实现,无论它多么巧妙地淘汰重复项。但是,仍然很清楚,他们希望有多少重复项,也不清楚,这种“增量重复数据删除”是否足够好以至于实用。

理想情况下,我们将有一个生成器,按字典顺序一次生成一个结果。

一次一次的懒惰词典编纂

在增量重复数据删除的基础上,假设我们有一个严格增加的(词典)排序的元组序列,将相同的元组 T附加到每个元组,然后再次对每个元组进行排序。然后,所导出的序列仍严格按升序排列。例如,在左列中,我们有 range(4)中的10个唯一对,在右列中,我们向每个附加(并再次排序)2个后会发生什么:
00 002
01 012
02 022
03 023
11 112
12 122
13 123
22 222
23 223
33 233

它们以排序顺序开始,派生的三元组也以排序顺序。我将跳过简单的证明(略过:如果 t1t2是相邻的元组,则 t1 < t2,并让 it1[i] != t2[i]的最小索引。然后是 t1[i] < t2[i](“lexicographic <”的含义)。然后,如果将 x放入两个元组中,按案例进行: x <= t1[i]吗?在 t1[i]t2[i]之间?是 x >= t2[i]吗?在每种情况下,很容易看到第一个派生元组严格小于第二个派生元组。)

因此,假设我们有一定数量的池中所有唯一的已排序元组的排序序列 result,当我们将新池 P的元素添加到元组时会发生什么?好吧,如上所述
[tuple(sorted(old + (P[0],))) for old in result]

也被排序了
[tuple(sorted(old + (P[i],))) for old in result]

对于 i中的所有 range(len(P))。可以通过 heapq.merge()合并这些已保证已排序的序列,并在合并结果上运行另一个生成器(以下为 killdups())以动态清除重复项。无需保留到目前为止所看到的所有元组的集合。因为合并的输出是非递减的,所以仅检查下一个结果是否与最后一个结果输出相同就足够了。

懒惰地使它工作很微妙。添加的新池的每个元素都必须访问整个结果范围序列,但是我们不想一口气将整个事情具体化。相反, itertools.tee()允许下一个池的每个元素以自己的速度遍历结果-远距离序列,并在所有新池元素完成后自动释放每个结果项的内存。

需要函数 build1()(或一些类似方法)来确保在正确的时间访问正确的值。例如,如果将 build1()的主体以内联的方式粘贴到调用的位置,则代码将严重失败(该主体将访问绑定(bind)到 rcopynew的最终值,而不是创建生成器表达式时绑定(bind)的最终值) 。

总之,由于延迟的生成器调用和堆合并的层,这当然会稍慢一些。作为返回,它以字典顺序返回结果,可以非常迅速地开始提供结果,并且如果没有其他原因导致最终结果序列根本没有实现,则峰值内存负担较低(直到调用者迭代结束才完成)返回的生成器)。

技术说明:不要在这里担心 sorted()。通过 old + new进行附加是有原因的: old已经排序,并且 new通常为1元组。在这种情况下,Python的排序是线性时间,而不是 O(N log N)
def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from itertools import tee
from collections import Counter
from heapq import merge

def killdups(xs):
last = None
for x in xs:
if x != last:
yield x
last = x

def build1(rcopy, new):
return (tuple(sorted(old + new)) for old in rcopy)

assert all(0 <= i < len(pools) for i in ixs)
result = [()]
for i, count in Counter(ixs).items():
poolelts = list(cwr(pools[i], count))
xs = [build1(rcopy, new)
for rcopy, new in zip(tee(result, len(poolelts)),
poolelts)]
result = killdups(merge(*xs))
return result

2路输入

事实证明,对于2输入的情况,有一种从集合代数派生的简单方法。如果 xy相同,则 cwr(x, 2)是答案。如果 xy不相交,则 product(x, y)。否则 cx的交集 y是非空的,答案是从3个成对不相交的集合 cx-cy-c中获得的4个叉积的级联: cwr(c, 2)product(x-c, c)product(y-c, c)product(x-c, y-c)。证明很简单但是很乏味,因此我将跳过它。例如, cwr(c, 2)product(x-c, c)之间没有重复项,因为后者中的每个元组都包含 x-c中的元素,但是前者中的每个元组仅包含 c中的元素,并且 x-cc在构造上不相交。 product(x-c, y-c)中没有重复项,因为两个输入是不相交的(如果它们包含一个共同的元素,那将是 xy的交集,这与 x-c的交集中没有任何元素相矛盾)。等等。

las,除了2个输入之外,我还没有找到一种方法可以概括这一点,这让我感到惊讶。它可以单独使用,也可以在其他方法中用作构建块。例如,如果输入很多,则可以搜索具有大交集的对,并且这种2输入方案用于直接处理整个产品的那些部分。

即使只有3个输入,我仍然不清楚如何获得正确的结果
[1, 2], [2, 3], [1, 3]

完整的笛卡尔积具有2 ** 3 = 8个元素,其中只有一个重复: (1, 2, 3)出现两次(分别为 (1, 2, 3)和再次为 (2, 3, 1))。每对输入都有一个1元素的交集,但所有3的交集都是空的。

这是一个实现:
def pair(x, y):
from itertools import product, chain
from itertools import combinations_with_replacement
x = set(x)
y = set(y)
c = x & y
chunks = []
if c:
x -= c
y -= c
chunks.append(combinations_with_replacement(c, 2))
if x:
chunks.append(product(x, c))
if y:
chunks.append(product(y, c))
if x and y:
chunks.append(product(x, y))
return chain.from_iterable(chunks)

基于最大匹配的概念验证

这融合了@Leon的草图中的思想和@JosephWoods在评论中绘制的方法。它没有打磨,显然可以加快速度,但是在我尝试过的所有情况下,速度都相当快。因为它相当复杂,所以以一种已经很难理解的未优化形式发布它可能会更有用!

这并没有尝试确定“空闲”池的集合(如@Leon的草图所示)。主要是因为我没有代码可言,部分原因是目前尚不清楚如何有效地完成该任务。我确实有代码在周围寻找二分图中的匹配项,在这种情况下只需要进行一些更改即可。

因此,这会按照字典顺序尝试可行的结果前缀,就像@JosephWood的草图一样,并且每个人都通过检查是否存在二部图匹配来查看是否实际上可能构造。

因此,虽然@Leon草图的详细信息在这里基本上未实现,但可见的行为却大体相同:它以字典顺序产生结果,它不需要检查重复项,它是一个惰性生成器,峰值内存使用与池长度的总和,显然可以通过多种方式并行化(将不同的流程设置为在结果空间的不同区域上工作),而使其更快的关键在于减少由以下步骤完成的大量冗余工作:图匹配功能(在每次调用中所做的很多工作仅重现了在先前调用中所做的工作)。
def matchgen(pools, ixs):
from collections import Counter
from collections import defaultdict
from itertools import chain, repeat, islice

elt2pools = defaultdict(set)
npools = 0
for i, count in Counter(ixs).items():
set_indices = set(range(npools, npools + count))
for elt in pools[i]:
elt2pools[elt] |= set_indices
npools += count
elt2count = {elt : len(ps) for elt, ps in elt2pools.items()}

cands = sorted(elt2pools.keys())
ncands = len(cands)

result = [None] * npools

# Is it possible to match result[:n] + [elt]*count?
# We already know it's possible to match result[:n], but
# this code doesn't exploit that.
def match(n, elt, count):

def extend(x, seen):
for y in elt2pools[x]:
if y not in seen:
seen.add(y)
if y in y2x:
if extend(y2x[y], seen):
y2x[y] = x
return True
else:
y2x[y] = x
return True
return False

y2x = {}
freexs = []
# A greedy pass first to grab easy matches.
for x in chain(islice(result, n), repeat(elt, count)):
for y in elt2pools[x]:
if y not in y2x:
y2x[y] = x
break
else:
freexs.append(x)
# Now do real work.
seen = set()
for x in freexs:
seen.clear()
if not extend(x, seen):
return False
return True

def inner(i, j): # fill result[j:] with elts from cands[i:]
if j >= npools:
yield tuple(result)
return
for i in range(i, ncands):
elt = cands[i]
# Find the most times `elt` can be added.
count = min(elt2count[elt], npools - j)
while count:
if match(j, elt, count):
break
count -= 1
# Since it can be added `count` times, it can also
# be added any number of times less than `count`.
for k in range(count):
result[j + k] = elt
while count:
yield from inner(i + 1, j + count)
count -= 1

return inner(0, 0)

编辑:请注意,这里有一个潜在的陷阱,由一对池 range(10_000)range(100_000)说明。生成 (9999, 99999)后,第一个位置增加到10000,然后持续很长时间,推断出第二个位置中10001 .. 99999中的任何可能性都不匹配;然后在第一位置的10001与第二位置的10002 .. 99999中的任何可能性都不匹配;等等。 @Leon的方案可能会注意到 range(10_000)是唯一剩下的唯一空闲池,它在第一个位置上选择了10000,然后立即注意到 range(10_000)不包含任何大于10000的值。显然,它需要对10001重新进行一次操作。 ,10002,...,99999在第一位置。这是线性时间而不是二次时间的浪费,但都是一样的浪费。这个故事的寓意:在您拥有实际的代码尝试之前,请不要信任任何东西;-)

还有一个基于@Leon的方案

以下是@Leon的想法的忠实实现。我喜欢上面的代码比上面的“概念验证”(POC)代码更好,但惊讶地发现新代码的运行速度明显慢(在类似于@ JospephWood's randomized的各种情况下,运行速度慢了3到4倍)例如)相对于POC代码的“优化”变体。

主要原因似乎是对匹配函数的更多调用。 POC代码针对每个“合理”前缀调用一次。新代码不会生成任何不可能的前缀,但是对于它生成的每个前缀,可能需要进行多次 match()调用,以确定剩余的可能较小的空闲池集。也许有一种更聪明的方法可以做到这一点。

请注意,我添加了一个变种:如果到目前为止,空闲池的元素都小于前缀的最后一个元素,则相对于前缀而言,它仍然是“空闲池”,但是它没有用,因为其任何元素都不能出现在前缀中。考生。这与结果无关紧要,但是它意味着对于所有剩余的递归调用,该池仍保留在空闲池集中,这又意味着他们可以浪费时间确定它仍然是“空闲池”。因此,当空闲池不能再用于任何内容时,此版本会将其从空闲池集中删除。这大大提高了速度。

注意:有很多尝试匹配的方法,其中有些具有更好的理论 O()最坏情况行为。以我的经验,在典型情况下,简单的深度优先(如此处)搜索在现实生活中运行得更快。但这在很大程度上取决于当前应用程序中“典型”图形的特征​​。我没有在这里尝试其他方法。

底线,忽略“2输入”特殊情况的代码:
  • 如果您拥有RAM,那么这里没有什么比提高重复数据删除速度更快。但是没有什么比峰值内存负担更糟糕的了。
  • 没有什么比基于匹配的方法节省了节俭的内存负担。从这个角度来看,他们处于一个完全不同的世界。它们也是最慢的,尽管至少在同一宇宙中;-)

  • 代码:
    def matchgen(pools, ixs):
    from collections import Counter
    from collections import defaultdict
    from itertools import islice

    elt2pools = defaultdict(list)
    allpools = []
    npools = 0
    for i, count in Counter(ixs).items():
    indices = list(range(npools, npools + count))
    plist = sorted(pools[i])
    for elt in plist:
    elt2pools[elt].extend(indices)
    for i in range(count):
    allpools.append(plist)
    npools += count
    pools = allpools
    assert npools == len(pools)

    result = [None] * npools

    # Is it possible to match result[:n] not using pool
    # bady? If not, return None. Else return a matching,
    # a dict whose keys are pool indices and whose values
    # are a permutation of result[:n].
    def match(n, bady):

    def extend(x, seen):
    for y in elt2pools[x]:
    if y not in seen:
    seen.add(y)
    if y not in y2x or extend(y2x[y], seen):
    y2x[y] = x
    return True
    return False

    y2x = {}
    freexs = []
    # A greedy pass first to grab easy matches.
    for x in islice(result, n):
    for y in elt2pools[x]:
    if y not in y2x and y != bady:
    y2x[y] = x
    break
    else:
    freexs.append(x)

    # Now do real work.
    for x in freexs:
    if not extend(x, {bady}):
    return None
    return y2x

    def inner(j, freepools): # fill result[j:]
    from bisect import bisect_left
    if j >= npools:
    yield tuple(result)
    return
    if j:
    new_freepools = set()
    allcands = set()
    exhausted = set() # free pools with elts too small
    atleast = result[j-1]
    for pi in freepools:
    if pi not in new_freepools:
    m = match(j, pi)
    if not m: # match must use pi
    continue
    # Since `m` is a match to result[:j],
    # any pool in freepools it does _not_
    # use must still be free.
    new_freepools |= freepools - m.keys()
    assert pi in new_freepools
    # pi is free with respect to result[:j].
    pool = pools[pi]
    if pool[-1] < atleast:
    exhausted.add(pi)
    else:
    i = bisect_left(pool, atleast)
    allcands.update(pool[i:])
    if exhausted:
    freepools -= exhausted
    new_freepools -= exhausted
    else: # j == 0
    new_freepools = freepools
    allcands = elt2pools.keys()

    for result[j] in sorted(allcands):
    yield from inner(j + 1, new_freepools)

    return inner(0, set(range(npools)))

    注意:这有其自己的“坏情况”类。例如,传递128份 [0, 1]会花费大约2分钟(!)的时间来查找129个结果。 POC代码花了不到一秒钟的时间,而某些不匹配的方法则是瞬间出现的。

    我不会详细说明原因。只需说一句,因为所有池都是相同的,因此无论递归进行的深度如何,它们都保持为“空闲池”。 match()从来没有遇到困难,总是会在第一次(贪婪)遍历中找到前缀的完全匹配,但是即使这样,时间也与当前前缀长度的平方成正比(==当前递归深度)。

    实用混合

    还有一个如前所述,基于匹配的方法由于频繁地重复进行基本操作而遭受图匹配的开销,并且存在一些很容易被偶然发现的不幸情况。

    高度相似的池导致一组空闲池缓慢收缩(或完全不收缩)。但是在那种情况下,池是如此相似,以至于从哪个池中获取元素几乎没有关系。因此,下面的方法不会尝试准确跟踪空闲池,只要能够明显选择可用池,就选择任意池,并且仅在遇到阻塞时才诉诸图形匹配。看来运作良好。举一个极端的例子,来自128个 [0, 1]池的129个结果在不到十分之一秒而不是两分钟的时间内交付。事实证明,在这种情况下,不需要进行图匹配。

    POC代码的另一个问题(在其他基于匹配的方法中则较少)是在提供最后一个结果后很长时间转动轮子。一个实用的技巧可以完全解决一个问题;-)可以很容易地预先计算序列的最后一个元组,并且代码引发一个内部异常,以便在最后一个元组交付后立即结束所有操作。

    对我来说就是这样!对“两个输入”情况的一般化对我来说仍然非常有趣,但是我从其他方法获得的所有痕迹现在都已被刮掉。
    def combogen(pools, ixs):
    from collections import Counter
    from collections import defaultdict
    from itertools import islice

    elt2pools = defaultdict(set)
    npools = 0
    cands = []
    MAXTUPLE = []
    for i, count in Counter(ixs).items():
    indices = set(range(npools, npools + count))
    huge = None
    for elt in pools[i]:
    elt2pools[elt] |= indices
    for i in range(count):
    cands.append(elt)
    if huge is None or elt > huge:
    huge = elt
    MAXTUPLE.extend([huge] * count)
    npools += count
    MAXTUPLE = tuple(sorted(MAXTUPLE))
    cands.sort()
    ncands = len(cands)
    ALLPOOLS = set(range(npools))
    availpools = ALLPOOLS.copy()
    result = [None] * npools

    class Finished(Exception):
    pass

    # Is it possible to match result[:n]? If not, return None. Else
    # return a matching, a dict whose keys are pool indices and
    # whose values are a permutation of result[:n].
    def match(n):

    def extend(x, seen):
    for y in elt2pools[x]:
    if y not in seen:
    seen.add(y)
    if y not in y2x or extend(y2x[y], seen):
    y2x[y] = x
    return True
    return False

    y2x = {}
    freexs = []
    # A greedy pass first to grab easy matches.
    for x in islice(result, n):
    for y in elt2pools[x]:
    if y not in y2x:
    y2x[y] = x
    break
    else:
    freexs.append(x)

    # Now do real work.
    seen = set()
    for x in freexs:
    seen.clear()
    if not extend(x, seen):
    return None
    return y2x

    def inner(i, j): # fill result[j:] with cands[i:]
    nonlocal availpools
    if j >= npools:
    r = tuple(result)
    yield r
    if r == MAXTUPLE:
    raise Finished
    return
    restore_availpools = None
    last = None
    jp1 = j + 1
    for i in range(i, ncands):
    elt = cands[i]
    if elt == last:
    continue
    last = result[j] = elt
    pools = elt2pools[elt] & availpools
    if pools:
    pool = pools.pop() # pick one - arbitrary
    availpools.remove(pool)
    else:
    # Find _a_ matching, and if that's possible fiddle
    # availpools to pretend that's the one we used all
    # along.
    m = match(jp1)
    if not m: # the prefix can't be extended with elt
    continue
    if restore_availpools is None:
    restore_availpools = availpools.copy()
    availpools = ALLPOOLS - m.keys()
    # Find a pool from which elt was taken.
    for pool, v in m.items():
    if v == elt:
    break
    else:
    assert False
    yield from inner(i+1, jp1)
    availpools.add(pool)

    if restore_availpools is not None:
    availpools = restore_availpools

    try:
    yield from inner(0, 0)
    except Finished:
    pass

    关于python - 从重叠的池中挑选无序组合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51834467/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com