- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是我的示例数据:
库存基于产品
Customer Product Quantity Inventory
1 A 100 800
2 A 1000 800
3 A 700 800
4 A 50 800
5 B 20 100
6 B 50 100
7 B 40 100
8 B 30 100
代码需要创建此数据:
data = {
'Customer':[1,2,3,4,5,6,7,8],
'Product':['A','A','A','A','B','B','B','B'],
'Quantity':[100,1000,700,50,20,50,40,30],
'Inventory':[800,800,800,800,100,100,100,100]
}
df = pd.DataFrame(data)
我需要一个新列,它是已知的可 promise ,它是通过从以前可 promise 的数量中减去数量来计算的,只有在以前可用的库存大于订单数量时才会计算.
这是我的预期输出:
Customer Product Quantity Inventory Available to Promise
1 A 100 800 700 (800-100 = 700)
2 A 1000 800 700 (1000 greater than 700 so same value)
3 A 700 800 0 (700-700 = 0)
4 A 50 800 0 (50 greater than 0)
5 B 20 100 80 (100-20 = 80)
6 B 50 100 30 (80-50 = 30)
7 B 40 100 30 (40 greater than 30)
8 B 30 100 0 (30 - 30 = 0)
我已经在 python pandas 中使用 for 循环和 itterows 实现了这一点
这是我的代码:
master_df = df[['Product','Inventory']].drop_duplicates()
master_df['free'] = df['Inventory']
df['available_to_promise']=np.NaN
for i,row in df.iterrows():
if i%1000==0:
print(i)
try:
available = master_df[row['Product']==master_df['Product']]['free'].reset_index(drop=True).iloc[0]
if available-row['Quantity']>=0:
df.at[i,'available_to_promise']=available-row['Quantity']
a = master_df.loc[row['Product']==master_df['Product']].reset_index()['index'].iloc[0]
master_df.at[a,'free'] = available-row['Quantity']
else:
df.at[i,'available_to_promise']=available
except Exception as e:
print(i)
print(e)
print((df.columns))
df = df.fillna(0)
由于 for 循环在 python 中非常慢,当有大量数据输入时,这个循环需要很长时间才能执行,因此我的 aws lambda 函数失败了
你们可以通过引入一个可以在几秒钟内执行的更好的循环替代方案来帮助我优化这段代码吗?
最佳答案
我不确定编写复制所需逻辑的矢量化和高性能代码是否简单。
但是,用Numba容易加速的方式写起来相对简单。
首先,让我们将您的代码编写为数据框的(纯)函数,返回值以最终放入 df["Available to Promise"]
。最终,很容易将其结果整合到原始数据框中:
df["Available to Promise"] = calc_avail_OP(df)
OP 的代码,除了异常处理和打印(以及合并到刚刚讨论的原始数据框中)等同于以下内容:
import numpy as np
import pandas as pd
def calc_avail_OP(df):
temp_df = df[["Product", "Inventory"]].drop_duplicates()
temp_df["free"] = df["Inventory"]
result = np.zeros(len(df), dtype=df["Inventory"].dtype)
for i, row in df.iterrows():
available = (
temp_df[row["Product"] == temp_df["Product"]]["free"]
.reset_index(drop=True)
.iloc[0]
)
if available - row["Quantity"] >= 0:
result[i] = available - row["Quantity"]
a = (
temp_df.loc[row["Product"] == temp_df["Product"]]
.reset_index()["index"]
.iloc[0]
)
temp_df.at[a, "free"] = available - row["Quantity"]
else:
result[i] = available
return result
现在,如果对输入进行排序以便唯一产品连续出现,则可以使用原生 NumPy 对象上的一些标量临时变量来实现相同的目的,并且可以使用 Numba 有效加速:
import numba as nb
@nb.njit
def _calc_avail_nb(products, quantities, stocks):
n = len(products)
avails = np.empty(n, dtype=stocks.dtype)
last_product = products[0]
avail = stocks[0]
for i in range(n):
if products[i] != last_product:
last_product = products[i]
avail = stocks[i]
qty = quantities[i]
if avail >= qty:
avail -= qty
avails[i] = avail
return avails
def calc_avail_nb(df):
return _calc_avail_nb(
df["Product"].to_numpy(dtype="U"),
df["Quantity"].to_numpy(),
df["Inventory"].to_numpy()
)
如果不能保证输入被排序,可以使用 dict()
跟踪库存信息:
import numba as nb
@nb.njit
def _calc_avail_dict_nb(products, quantities, stocks):
inventory = {products[0]: stocks[0]}
n = len(products)
avails = np.empty(n, dtype=stocks.dtype)
for i in range(n):
product = products[i]
avail = inventory.setdefault(products[i], stocks[i])
qty = quantities[i]
if avail >= qty:
avail -= qty
inventory[products[i]] = avail
avails[i] = avail
return avails
def calc_avail_dict_nb(df):
return _calc_avail_dict_nb(
df["Product"].to_numpy(dtype="U"),
df["Quantity"].to_numpy(),
df["Inventory"].to_numpy()
)
以下文本包括与其他答案中的一些方法的比较:
def stock(val):
s = val
q = yield
while True:
s = s - q if s >= q else s
q = yield s
def exaust_stock(df):
st = stock(df.iloc[0]['Inventory']).send
st(None)
return df['Quantity'].apply(st)
def calc_avail_gen(df):
return (
df
.groupby('Product')
.apply(exaust_stock)
.reset_index(level=0, drop=True)
.to_numpy()
)
@nb.njit
def _calc_avail_grouped_nb(quant, inv):
stock = inv[0]
n = len(quant)
out = np.zeros((n,), dtype=np.int_)
for i in range(n):
if stock > 0 and quant[i] <= stock:
stock -= quant[i]
out[i] = stock
else:
out[i] = stock
return out
def calc_avail_grouped_nb(df):
return (
df
.groupby('Product')
.apply(lambda x: _calc_avail_grouped_nb(x['Quantity'].to_numpy(), x['Inventory'].to_numpy()))
.explode()
.to_numpy(dtype=np.int_)
)
测试表明,虽然它们确实提供了相同的结果,但 calc_avail_nb()
和 calc_avail_dict_nb()
在测试输入上提供了大约 200 倍的速度提升。
data = {
'Customer':[1,2,3,4,5,6,7,8],
'Product':['A','A','A','A','B','B','B','B'],
'Quantity':[100,1000,700,50,20,50,40,30],
'Inventory':[800,800,800,800,100,100,100,100]
}
df = pd.DataFrame(data)
funcs = calc_avail_OP, calc_avail_nb, calc_avail_dict_nb, calc_avail_gen, calc_avail_grouped_nb
base = funcs[0](df)
timings = {}
n = len(df)
timings[n] = []
for func in funcs:
res = func(df)
is_good = np.allclose(base, res)
timed = %timeit -n 8 -r 8 -q -o func(df)
is_good = True
timing = timed.best * 1e6
timings[n].append(timing if is_good else None)
print(f"{func.__name__:>24} {is_good!s:5} {timing:10.3f} µs {timings[n][0] / timing:5.1f}x")
# calc_avail_OP True 11699.373 µs 1.0x
# calc_avail_nb True 52.821 µs 221.5x
# calc_avail_dict_nb True 57.198 µs 204.5x
# calc_avail_gen True 3360.806 µs 3.5x
# calc_avail_grouped_nb True 1099.665 µs 10.6x
对更大输入的类似测试似乎指向更大的速度增益。
时间计算如下:
import string
import random
def gen_df(n, m=None, max_stock=None):
if not m:
m = 2 + n // 16
if not max_stock:
max_stock = n
k = n.bit_length()
inventory = {
"".join(
random.choices(string.ascii_letters, k=random.randint(1, 2 + k))
): random.randint(max_stock // 2, max_stock)
for _ in range(m)
}
products = random.choices(list(inventory.keys()), k=n)
return pd.DataFrame(
{
"Customer": np.random.randint(1, int(1.1 * max_stock), n),
"Product": products,
"Quantity": np.random.randint(1, int(1.1 * max_stock), n),
"Inventory": [inventory[product] for product in products],
}
)
np.random.seed(0)
random.seed(0)
timings = {}
for i in range(3, 18, 3):
n = 2 ** i
print(f"i={i}, n={n}")
df = gen_df(n)
base = funcs[0](df)
timings[n] = []
for func in funcs:
res = func(df)
is_good = np.allclose(base, res)
timed = %timeit -n 1 -r 1 -q -o func(df)
is_good = True
timing = timed.best * 1e3
timings[n].append(timing if is_good else None)
print(f"{func.__name__:>24} {is_good!s:5} {timing:10.3f} ms {timings[n][0] / timing:5.1f}x")
并绘制成:
import pandas as pd
import matplotlib.pyplot as plt
df = pd.DataFrame(data=timings, index=[func.__name__ for func in funcs]).transpose()
df.plot(marker='o', xlabel='Input Size / #', ylabel='Best timing / µs', figsize=(6, 4))
fig = plt.gcf()
fig.patch.set_facecolor('white')
df = pd.DataFrame(data=timings, index=[func.__name__ for func in funcs]).transpose()
df = df[[funcs[0].__name__]].to_numpy() / df
df.plot(marker='o', xlabel='Input Size / #', ylabel='Speed increase / %x', figsize=(6, 4))
fig = plt.gcf()
fig.patch.set_facecolor('white')
分别获取:
和
关于python - 无需迭代即可遍历数据框的每一行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73332149/
我是一名优秀的程序员,十分优秀!