gpt4 book ai didi

Python:如何导入 .csv 并通过代码运行内容?

转载 作者:行者123 更新时间:2023-12-01 00:26:29 24 4
gpt4 key购买 nike

我对 Python 还很陌生,我一直在尝试运行“代码”(见下文)

尽管它生成随机数据,但该代码运行良好。

我在 csv 文件中有自己的数据,我想运行它并查看我的手动计算是否一致。所以,我所做的是:

我已从代码中删除了 import numpy.random as nrand 并添加了两行以查看是否可以手动输入 csv 列中的范围:

numpy.arrange(15)
numpy.array([0,1,2,3,4])

然后将原始代码(代码)中的nrand替换为numpy

不幸的是,这产生了一个错误:

enter image description here

如果有人可以向我展示如何将示例 csv 文件(包含 1 列数据)从 C:\驱动器位置导入到 Python 中并运行代码,那么我将非常感激,因此它会选择它(无论我在列中有多少数据点)。有人可以帮忙吗?

代码

import math
import numpy
import numpy.random as nrand

"""
Note - for some of the metrics the absolute value is returns. This is because if the risk (loss) is higher we want to
discount the expected excess return from the portfolio by a higher amount. Therefore risk should be positive.
"""


def vol(returns):
# Return the standard deviation of returns
return numpy.std(returns)


def beta(returns, market):
# Create a matrix of [returns, market]
m = numpy.matrix([returns, market])
# Return the covariance of m divided by the standard deviation of the market returns
return numpy.cov(m)[0][1] / numpy.std(market)


def lpm(returns, threshold, order):
# This method returns a lower partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the threshold and the returns
diff = threshold_array - returns
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)


def hpm(returns, threshold, order):
# This method returns a higher partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the returns and the threshold
diff = returns - threshold_array
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)


def var(returns, alpha):
# This method calculates the historical simulation var of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# VaR should be positive
return abs(sorted_returns[index])


def cvar(returns, alpha):
# This method calculates the condition VaR of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# Calculate the total VaR beyond alpha
sum_var = sorted_returns[0]
for i in range(1, index):
sum_var += sorted_returns[i]
# Return the average VaR
# CVaR should be positive
return abs(sum_var / index)


def prices(returns, base):
# Converts returns into prices
s = [base]
for i in range(len(returns)):
s.append(base * (1 + returns[i]))
return numpy.array(s)


def dd(returns, tau):
# Returns the draw-down given time period tau
values = prices(returns, 100)
pos = len(values) - 1
pre = pos - tau
drawdown = float('+inf')
# Find the maximum drawdown given tau
while pre >= 0:
dd_i = (values[pos] / values[pre]) - 1
if dd_i < drawdown:
drawdown = dd_i
pos, pre = pos - 1, pre - 1
# Drawdown should be positive
return abs(drawdown)


def max_dd(returns):
# Returns the maximum draw-down for any tau in (0, T) where T is the length of the return series
max_drawdown = float('-inf')
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
if drawdown_i > max_drawdown:
max_drawdown = drawdown_i
# Max draw-down should be positive
return abs(max_drawdown)


def average_dd(returns, periods):
# Returns the average maximum drawdown over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods


def average_dd_squared(returns, periods):
# Returns the average maximum drawdown squared over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = math.pow(dd(returns, i), 2.0)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods


def treynor_ratio(er, returns, market, rf):
return (er - rf) / beta(returns, market)


def sharpe_ratio(er, returns, rf):
return (er - rf) / vol(returns)


def information_ratio(returns, benchmark):
diff = returns - benchmark
return numpy.mean(diff) / vol(diff)


def modigliani_ratio(er, returns, benchmark, rf):
np_rf = numpy.empty(len(returns))
np_rf.fill(rf)
rdiff = returns - np_rf
bdiff = benchmark - np_rf
return (er - rf) * (vol(rdiff) / vol(bdiff)) + rf


def excess_var(er, returns, rf, alpha):
return (er - rf) / var(returns, alpha)


def conditional_sharpe_ratio(er, returns, rf, alpha):
return (er - rf) / cvar(returns, alpha)


def omega_ratio(er, returns, rf, target=0):
return (er - rf) / lpm(returns, target, 1)


def sortino_ratio(er, returns, rf, target=0):
return (er - rf) / math.sqrt(lpm(returns, target, 2))


def kappa_three_ratio(er, returns, rf, target=0):
return (er - rf) / math.pow(lpm(returns, target, 3), float(1/3))


def gain_loss_ratio(returns, target=0):
return hpm(returns, target, 1) / lpm(returns, target, 1)


def upside_potential_ratio(returns, target=0):
return hpm(returns, target, 1) / math.sqrt(lpm(returns, target, 2))


def calmar_ratio(er, returns, rf):
return (er - rf) / max_dd(returns)


def sterling_ration(er, returns, rf, periods):
return (er - rf) / average_dd(returns, periods)


def burke_ratio(er, returns, rf, periods):
return (er - rf) / math.sqrt(average_dd_squared(returns, periods))


def test_risk_metrics():
# This is just a testing method
r = nrand.uniform(-1, 1, 50)
m = nrand.uniform(-1, 1, 50)
print("vol =", vol(r))
print("beta =", beta(r, m))
print("hpm(0.0)_1 =", hpm(r, 0.0, 1))
print("lpm(0.0)_1 =", lpm(r, 0.0, 1))
print("VaR(0.05) =", var(r, 0.05))
print("CVaR(0.05) =", cvar(r, 0.05))
print("Drawdown(5) =", dd(r, 5))
print("Max Drawdown =", max_dd(r))


def test_risk_adjusted_metrics():
# Returns from the portfolio (r) and market (m)
r = nrand.uniform(-1, 1, 50)
m = nrand.uniform(-1, 1, 50)
# Expected return
e = numpy.mean(r)
# Risk free rate
f = 0.06
# Risk-adjusted return based on Volatility
print("Treynor Ratio =", treynor_ratio(e, r, m, f))
print("Sharpe Ratio =", sharpe_ratio(e, r, f))
print("Information Ratio =", information_ratio(r, m))
# Risk-adjusted return based on Value at Risk
print("Excess VaR =", excess_var(e, r, f, 0.05))
print("Conditional Sharpe Ratio =", conditional_sharpe_ratio(e, r, f, 0.05))
# Risk-adjusted return based on Lower Partial Moments
print("Omega Ratio =", omega_ratio(e, r, f))
print("Sortino Ratio =", sortino_ratio(e, r, f))
print("Kappa 3 Ratio =", kappa_three_ratio(e, r, f))
print("Gain Loss Ratio =", gain_loss_ratio(r))
print("Upside Potential Ratio =", upside_potential_ratio(r))
# Risk-adjusted return based on Drawdown risk
print("Calmar Ratio =", calmar_ratio(e, r, f))
print("Sterling Ratio =", sterling_ration(e, r, f, 5))
print("Burke Ratio =", burke_ratio(e, r, f, 5))


if __name__ == "__main__":
test_risk_metrics()
test_risk_adjusted_metrics()

最佳答案

好的,所以阅读您的评论时,您提到 r 可以具有与 m 相同的长度或更短。因此,我建议的解决方案是仅加载 2 个 CSV 文件,其中第一个文件包含您的 r 值,第二个文件包含您的 m 值。

确保您的 csv 文件没有标题,仅在列中列出值。

出于本次测试的目的,以下是我的 r CSV 文件。

3.223
1.313
1.023
0.333
23.311

还有我的 m CSV 文件:

1.233
0.3231
23.132
0.032
132.14

现在,您可以将它们加载到脚本中并将它们输入到您的函数中。将其放入您的 __name__ == '__main__' block 中:

import csv

# load r
with open(r'C:\path\to\r_values.csv') as csvfile: # change your filename here
r = numpy.array([float(x[0]) for x in csv.reader(csvfile)])

# load m
with open(r'C:\path\to\m_values.csv') as csvfile: # change your filename here
m = numpy.array([float(x[0]) for x in csv.reader(csvfile)])

接下来,我将重新定义您的 test_risk_metricstest_risk_adjusted_metrics 函数:

# Now you can feed them into your functions
def test_risk_metrics(r, m):
print("vol =", vol(r))
print("beta =", beta(r, m))
print("hpm(0.0)_1 =", hpm(r, 0.0, 1))
print("lpm(0.0)_1 =", lpm(r, 0.0, 1))
print("VaR(0.05) =", var(r, 0.05))
print("CVaR(0.05) =", cvar(r, 0.05))
print("Drawdown(5) =", dd(r, 5))
print("Max Drawdown =", max_dd(r))

def test_risk_adjusted_metrics(r, m):
# Returns from the portfolio (r) and market (m)
# Expected return
e = numpy.mean(r)
# Risk free rate
f = 0.06
# Risk-adjusted return based on Volatility
print("Treynor Ratio =", treynor_ratio(e, r, m, f))
print("Sharpe Ratio =", sharpe_ratio(e, r, f))
print("Information Ratio =", information_ratio(r, m))
# Risk-adjusted return based on Value at Risk
print("Excess VaR =", excess_var(e, r, f, 0.05))
print("Conditional Sharpe Ratio =", conditional_sharpe_ratio(e, r, f, 0.05))
# Risk-adjusted return based on Lower Partial Moments
print("Omega Ratio =", omega_ratio(e, r, f))
print("Sortino Ratio =", sortino_ratio(e, r, f))
print("Kappa 3 Ratio =", kappa_three_ratio(e, r, f))
print("Gain Loss Ratio =", gain_loss_ratio(r))
print("Upside Potential Ratio =", upside_potential_ratio(r))
# Risk-adjusted return based on Drawdown risk
print("Calmar Ratio =", calmar_ratio(e, r, f))
print("Sterling Ratio =", sterling_ration(e, r, f, 5))
print("Burke Ratio =", burke_ratio(e, r, f, 5))

整个代码应如下所示:

import math
import numpy

"""
Note - for some of the metrics the absolute value is returns. This is because if the risk (loss) is higher we want to
discount the expected excess return from the portfolio by a higher amount. Therefore risk should be positive.
"""


def vol(returns):
# Return the standard deviation of returns
return numpy.std(returns)


def beta(returns, market):
# Create a matrix of [returns, market]
m = numpy.matrix([returns, market])
# Return the covariance of m divided by the standard deviation of the market returns
return numpy.cov(m)[0][1] / numpy.std(market)


def lpm(returns, threshold, order):
# This method returns a lower partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the threshold and the returns
diff = threshold_array - returns
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)


def hpm(returns, threshold, order):
# This method returns a higher partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = numpy.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the returns and the threshold
diff = returns - threshold_array
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return numpy.sum(diff ** order) / len(returns)


def var(returns, alpha):
# This method calculates the historical simulation var of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# VaR should be positive
return abs(sorted_returns[index])


def cvar(returns, alpha):
# This method calculates the condition VaR of the returns
sorted_returns = numpy.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# Calculate the total VaR beyond alpha
sum_var = sorted_returns[0]
for i in range(1, index):
sum_var += sorted_returns[i]
# Return the average VaR
# CVaR should be positive
return abs(sum_var / index)


def prices(returns, base):
# Converts returns into prices
s = [base]
for i in range(len(returns)):
s.append(base * (1 + returns[i]))
return numpy.array(s)


def dd(returns, tau):
# Returns the draw-down given time period tau
values = prices(returns, 100)
pos = len(values) - 1
pre = pos - tau
drawdown = float('+inf')
# Find the maximum drawdown given tau
while pre >= 0:
dd_i = (values[pos] / values[pre]) - 1
if dd_i < drawdown:
drawdown = dd_i
pos, pre = pos - 1, pre - 1
# Drawdown should be positive
return abs(drawdown)


def max_dd(returns):
# Returns the maximum draw-down for any tau in (0, T) where T is the length of the return series
max_drawdown = float('-inf')
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
if drawdown_i > max_drawdown:
max_drawdown = drawdown_i
# Max draw-down should be positive
return abs(max_drawdown)


def average_dd(returns, periods):
# Returns the average maximum drawdown over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = dd(returns, i)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods


def average_dd_squared(returns, periods):
# Returns the average maximum drawdown squared over n periods
drawdowns = []
for i in range(0, len(returns)):
drawdown_i = math.pow(dd(returns, i), 2.0)
drawdowns.append(drawdown_i)
drawdowns = sorted(drawdowns)
total_dd = abs(drawdowns[0])
for i in range(1, periods):
total_dd += abs(drawdowns[i])
return total_dd / periods


def treynor_ratio(er, returns, market, rf):
return (er - rf) / beta(returns, market)


def sharpe_ratio(er, returns, rf):
return (er - rf) / vol(returns)


def information_ratio(returns, benchmark):
diff = returns - benchmark
return numpy.mean(diff) / vol(diff)


def modigliani_ratio(er, returns, benchmark, rf):
np_rf = numpy.empty(len(returns))
np_rf.fill(rf)
rdiff = returns - np_rf
bdiff = benchmark - np_rf
return (er - rf) * (vol(rdiff) / vol(bdiff)) + rf


def excess_var(er, returns, rf, alpha):
return (er - rf) / var(returns, alpha)


def conditional_sharpe_ratio(er, returns, rf, alpha):
return (er - rf) / cvar(returns, alpha)


def omega_ratio(er, returns, rf, target=0):
return (er - rf) / lpm(returns, target, 1)


def sortino_ratio(er, returns, rf, target=0):
return (er - rf) / math.sqrt(lpm(returns, target, 2))


def kappa_three_ratio(er, returns, rf, target=0):
return (er - rf) / math.pow(lpm(returns, target, 3), float(1/3))


def gain_loss_ratio(returns, target=0):
return hpm(returns, target, 1) / lpm(returns, target, 1)


def upside_potential_ratio(returns, target=0):
return hpm(returns, target, 1) / math.sqrt(lpm(returns, target, 2))


def calmar_ratio(er, returns, rf):
return (er - rf) / max_dd(returns)


def sterling_ration(er, returns, rf, periods):
return (er - rf) / average_dd(returns, periods)


def burke_ratio(er, returns, rf, periods):
return (er - rf) / math.sqrt(average_dd_squared(returns, periods))


def test_risk_metrics(r, m):
print("vol =", vol(r))
print("beta =", beta(r, m))
print("hpm(0.0)_1 =", hpm(r, 0.0, 1))
print("lpm(0.0)_1 =", lpm(r, 0.0, 1))
print("VaR(0.05) =", var(r, 0.05))
print("CVaR(0.05) =", cvar(r, 0.05))
print("Drawdown(5) =", dd(r, 5))
print("Max Drawdown =", max_dd(r))


def test_risk_adjusted_metrics(r, m):
# Returns from the portfolio (r) and market (m)
# Expected return
e = numpy.mean(r)
# Risk free rate
f = 0.06
# Risk-adjusted return based on Volatility
print("Treynor Ratio =", treynor_ratio(e, r, m, f))
print("Sharpe Ratio =", sharpe_ratio(e, r, f))
print("Information Ratio =", information_ratio(r, m))
# Risk-adjusted return based on Value at Risk
print("Excess VaR =", excess_var(e, r, f, 0.05))
print("Conditional Sharpe Ratio =", conditional_sharpe_ratio(e, r, f, 0.05))
# Risk-adjusted return based on Lower Partial Moments
print("Omega Ratio =", omega_ratio(e, r, f))
print("Sortino Ratio =", sortino_ratio(e, r, f))
print("Kappa 3 Ratio =", kappa_three_ratio(e, r, f))
print("Gain Loss Ratio =", gain_loss_ratio(r))
print("Upside Potential Ratio =", upside_potential_ratio(r))
# Risk-adjusted return based on Drawdown risk
print("Calmar Ratio =", calmar_ratio(e, r, f))
print("Sterling Ratio =", sterling_ration(e, r, f, 5))
print("Burke Ratio =", burke_ratio(e, r, f, 5))


if __name__ == "__main__":
import csv

# load r
with open(r'test.csv') as csvfile: # change your filename here
r = numpy.array([float(x[0]) for x in csv.reader(csvfile)])

# load m
with open(r'test2.csv') as csvfile: # change your filename here
m = numpy.array([float(x[0]) for x in csv.reader(csvfile)])

test_risk_metrics(r, m)
test_risk_adjusted_metrics(r, m)

这是我的测试文件的输出:

vol = 8.787591196681829
beta = 10.716740105069574
hpm(0.0)_1 = 5.8406
lpm(0.0)_1 = 0.0
VaR(0.05) = 0.333
test.py:69: RuntimeWarning: divide by zero encountered in double_scalars
return abs(sum_var / index)
CVaR(0.05) = inf
Drawdown(5) = 23.311
Max Drawdown = 0.684347620175231
Treynor Ratio = 0.5393991030225205
Sharpe Ratio = 0.6578139413429632
Information Ratio = -0.5991798008409744
Excess VaR = 17.35915915915916
Conditional Sharpe Ratio = 0.0
test.py:163: RuntimeWarning: divide by zero encountered in double_scalars
return (er - rf) / lpm(returns, target, 1)
Omega Ratio = inf
test.py:167: RuntimeWarning: divide by zero encountered in double_scalars
return (er - rf) / math.sqrt(lpm(returns, target, 2))
Sortino Ratio = inf
test.py:171: RuntimeWarning: divide by zero encountered in double_scalars
return (er - rf) / math.pow(lpm(returns, target, 3), float(1/3))
Kappa 3 Ratio = inf
test.py:175: RuntimeWarning: divide by zero encountered in double_scalars
return hpm(returns, target, 1) / lpm(returns, target, 1)
Gain Loss Ratio = inf
test.py:179: RuntimeWarning: divide by zero encountered in double_scalars
return hpm(returns, target, 1) / math.sqrt(lpm(returns, target, 2))
Upside Potential Ratio = inf
Calmar Ratio = 8.446876747404843
Sterling Ratio = 14.51982017208844
Burke Ratio = 12.583312697186637

关于Python:如何导入 .csv 并通过代码运行内容?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58560297/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com