gpt4 book ai didi

python - 迭代性能

转载 作者:行者123 更新时间:2023-12-01 09:32:46 25 4
gpt4 key购买 nike

我创建了一个函数来通过实验评估以下问题,取自 A Primer for the Mathematics of Financial Engineering

问题:设 X 为您必须抛掷一枚均匀硬币直到正面朝上的次数。什么是 E[X](期望值)和 var(X)(方差)?

按照教科书的解决方案,以下代码得出正确答案:

from sympy import *

k = symbols('k')

Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work

Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2

为了验证这个答案,我决定尝试制作一个函数来模拟这个实验。以下代码是我想出的。

def coin_toss(toss, probability=[0.5, 0.5]): 

"""Computes expected value and variance for coin toss experiment"""

flips = [] # Collects total number of throws until heads appear per experiment.

for _ in range(toss): # Simulate n flips

number_flips=[] # Number of flips until heads is tossed

while sum(number_flips) == 0: # Continue simulation while Tails are thrown

number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips

flips.append(len(number_flips)) #Append number of flips until lands heads to flips

Expected_Value, Variance = np.mean(flips), np.var(flips)

print('E[X]: {}'.format(Expected_Value),
'\nvar[X]: {}'.format(Variance)) # Return expected value

如果我使用以下代码模拟 1e6 实验,运行时间约为 35.9 秒。

   from timeit import Timer

t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")

print(t1.timeit(1))

为了加深我对 Python 的理解,这是解决此类问题的特别有效/Pythonic 的方法吗?如何利用现有库来提高效率/流程执行?

最佳答案

为了以高效且 Python 的方式进行编码,您必须查看 PythonSpeedNumPy 。下面是使用 numpy 的更快代码的一个示例。

python+numpy 中优化的abc就是向量化操作,在本例中这是相当困难的,因为有一个while实际上可能是无限的,硬币可以连续翻转 40 次。然而,而不是做 fortoss迭代,工作可以以 block 的形式完成。这是 coin_toss 之间的主要区别从问题和这个coin_toss_2d方法。

coin_toss_2d

coin_toss_2d的主要优点按 block 工作,这些 block 的大小有一些默认值,但可以修改它们(因为它们会影响速度)。因此,它只会迭代 while current_toss<toss多次toss%flips_at_a_time 。这是通过 numpy 实现的,它允许生成一个具有重复 flips_at_a_time 的结果的矩阵。乘以抛硬币的实验flips_per_try次。该矩阵将包含 0(尾部)和 1(正面)。

# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out
[[0 0 0] # still no head, we will have to keep trying
[0 1 1] # head at the 2nd try (position 1 in python)
[1 0 0]
[1 0 1]
[0 0 1]]

一旦获得此结果, argmax 叫做。这会找到与每行(重复)的最大值(将为 1,头)相对应的索引,并且在多次出现的情况下,返回第一个,这正是所需要的,尾部序列之后的第一个头.

maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2]
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!

但是必须考虑所有行都是0的情况。在这种情况下,最大值将为 0,并且它的第一次出现也将为 0,即第一列(尝试)。因此,我们检查第一次尝试时找到的所有最大值是否与第一次尝试时的头部相对应。

not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished

如果情况并非如此,我们会循环重复相同的过程,但仅限于任何尝试中都没有头的重复。

n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest

然后存储与第一个头出现相对应的索引(我们必须添加 1,因为 python 索引从零而不是 1 开始)。有一个try ... except ...阻止处理 toss 的情况不是 repetitions_at_a_time 的倍数.

def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
# Initialize and preallocate data
current_toss = 0
flips = np.empty(toss)
# loop by chunks
while current_toss<toss:
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
# try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
try:
flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
except ValueError:
flips[current_toss:] = maxs[:toss-current_toss]+1
# Update current_toss and move to the next chunk
current_toss += repetitions_at_a_time
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance

coin_toss_map

这里的代码基本相同,但是现在, intrinsec while 是在一个单独的函数中完成的,该函数是从包装函数 coin_toss_map 调用的。使用map

def toss_chunk(args):
probability,repetitions_at_a_time,flips_per_try = args
# repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# store first head ocurrence
maxs = flip_events.argmax(axis=1)
# Check for all tails sequences
not_finished = (maxs==0) & (flip_events[:,0]!=1)
n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
maxs2 = flip_events.argmax(axis=1)
maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
not_finished2 = (maxs2==0) & (flip_events[:,0]!=1)
not_finished[not_finished] = not_finished2
n = np.sum(not_finished)
return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
n_chunks, remainder = divmod(toss,repetitions_at_a_time)
args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
if remainder:
args.append((probability,remainder,flips_per_try))
flips = np.concatenate(map(toss_chunk,args))
# Once all values are obtained, average and return them
Expected_Value, Variance = np.mean(flips), np.var(flips)
return Expected_Value, Variance

性能比较

在我的计算机中,我得到以下计算时间:

In [1]: %timeit coin_toss(10**6)
# Out
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop

In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop

In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop

均值和方差的结果为:

In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
# (2.000654, 2.0046035722839997),
# (1.999835, 2.0072329727749993),
# (1.999277, 2.001566477271)]

In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
# (2.001733, 2.011159996711001),
# (2.002308, 2.012128673136001),
# (2.000738, 2.003613455356)]

关于python - 迭代性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49816671/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com