- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在处理 RR 峰值,并希望导出 HRV 的频域测量值,以重新创建 Physionet(WFDB 工具)的 native C 软件包的结果。信号处理和频谱分析对我来说都是新领域,但经过漫长的一周尝试错误后,我已经根据 Astropy 编写了一些代码。尝试了其他几种解决方案后。
from astropy.stats import LombScargle
import random
dy = 0.1 * random.randint(1,100)
t = drive01["time"].values
y = drive01["intervals"].values
frequency, power = LombScargle(t, y,dy).autopower(minimum_frequency=0.0,maximum_frequency=4)
plt.plot(frequency, power)
这将创建一个看起来与 Physionets 包中的图非常相似的图。
Physionets HRV 工具使用代码 get_hrv 绘制了此图
然后通过计算常见的频域测量,我得到了截然不同的结果。
Pxx = np.nan_to_num(power)
Fxx = np.nan_to_num(frequency)
ulf = 0.003
vlf = 0.04
lf = 0.15
hf = 0.4
Fs = 15.5 # the sampling rate of the drive file
# find the indexes corresponding to the VLF, LF, and HF bands
vlf_freq_band = (Fxx >= ulf) & (Fxx <= vlf)
lf_freq_band = (Fxx >= vlf) & (Fxx <= lf)
hf_freq_band = (Fxx >= lf) & (Fxx <= hf)
tp_freq_band = (Fxx >= 0) & (Fxx <= hf)
# Calculate the area under the given frequency band
dy = 1.0 / Fs
VLF = np.trapz(y=abs(Pxx[vlf_freq_band]), x=None, dx=dy)
LF = np.trapz(y=abs(Pxx[lf_freq_band]), x=None, dx=dy)
HF = np.trapz(y=abs(Pxx[hf_freq_band]), x=None, dx=dy)
TP = np.trapz(y=abs(Pxx[tp_freq_band]), x=None, dx=dy)
LF_HF = float(LF) / HF
Python
'HF': 0.10918703853414605,
'LF': 0.050074418080717789,
'LF/HF': 0.45861137689028925,
'TP': 0.20150514290250854,
'VLF': 0.025953350304821571
来自 Physionet 包:
TOT PWR = 0.0185973
VLF PWR = 0.00372733
LF PWR = 0.00472635
HF PWR = 0.0101436
LF/HF = 0.465944
比较结果时,结果如下所示:
Python Physionet
TP 0.201505143 0.0185973 Quite similar + decimal dif
HF 0.109187039 0.0101436 Quite similar + decimal dif
LF 0.050074418 0.00472635 Quite similar + decimal dif
VLF 0.02595335 0.00372733 Not similar
LF/HF 0.458611377 0.465944 Quite similar
Python中的计算基于另一个Stackoverflow post的代码但他从受访者那里得到的修复是基于一个 python 模块,我无法工作,而且他没有使用 Lomb periodgram。我也非常愿意尝试其他东西,只要它能处理不均匀的样本。我正在使用的数据是 drivedb from Physionet我使用 Physionet 包制作了一个包含 RR 峰值和时间的文本文件,该文件被读入 Pandas DataFrame。 The textfile can be found here
最佳答案
LombScargle 基于 Astropy 计算功率,与 Physionet 的 C 软件包(WFDB 工具)不同。我再次用 python 编写 lombscargle,结果与 Physionet(WFDB 工具)的 C 包相同。
import numpy as np
import os
import math
import csv
from itertools import zip_longest
import time
DATA_PATH = '/home/quangpc/Desktop/Data/PhysionetData/mitdb/'
class FreqDomainClass:
@staticmethod
def power(freq, mag):
lo = [0, 0.0033, 0.04, 0.15]
hi = [0.0033, 0.04, 0.15, 0.4]
pr = np.zeros(4)
nbands = 4
for index in range(0, len(freq)):
pwr = np.power(mag[index], 2)
for n in range(0, nbands):
if (freq[index] >= lo[n]) and freq[index] <= hi[n]:
pr[n] += pwr
break
return pr
@staticmethod
def avevar(y):
var = 0.0
ep = 0.0
ave = np.mean(y)
for i in range(len(y)):
s = y[i] - ave
ep += s
var += s * s
var = (var - ep * ep / len(y)) / (len(y) - 1)
return var
def lomb(self, t, h, ofac, hifac):
period = max(t) - min(t)
z = h - np.mean(h)
f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
f = f[:int(len(f) / 2) + 1]
f = np.reshape(f, (len(f), -1))
w = 2 * np.pi * f
lenght_t = len(t)
t = np.reshape(t, (lenght_t, -1))
t = np.transpose(t)
tau = np.arctan2(np.sum(np.sin(2 * w * t), axis=1), np.sum(np.cos(2 * w * t), axis=1)) / (2 * w)
tau = np.diag(tau)
tau = np.reshape(tau, (len(tau), -1))
tau = np.tile(tau, (1, lenght_t))
cos = np.cos(w * (t - tau))
sin = np.sin(w * (t - tau))
pc = np.power(np.sum(z * cos, axis=1), 2)
ps = np.power(np.sum(z * sin, axis=1), 2)
cs = pc / np.sum(np.power(cos, 2), axis=1)
ss = ps / np.sum(np.power(sin, 2), axis=1)
p = cs + ss
pwr = self.avevar(h)
nout = len(h)
p = p / (2 * pwr)
p = p / (nout / (2.0 * pwr))
return f, np.sqrt(p)
def lomb_for(self, t, h, ofac, hifac):
period = max(t) - min(t)
f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
f = f[:int(len(f) / 2) + 1]
z = h - np.mean(h)
p = np.zeros(len(f))
for i in range(len(f)):
w = 2 * np.pi * f[i]
if w > 0:
twt = 2 * w * t
y = sum(np.sin(twt))
x = sum(np.cos(twt))
tau = math.atan2(y, x) / (2 * w)
wtmt = w * (t - tau)
cs = np.power(sum(np.multiply(z, np.cos(wtmt))), 2) / sum(np.power((np.cos(wtmt)), 2))
ss = np.power(sum(np.multiply(z, np.sin(wtmt))), 2) / sum(np.power((np.sin(wtmt)), 2))
p[i] = cs + ss
else:
p[i] = np.power(sum(np.multiply(z, t)), 1) / sum(np.power(t), 1)
pwr = self.avevar(h)
nout = len(h)
p = p / (2 * pwr)
p = p / (nout / (2.0 * pwr))
return f, np.sqrt(p)
def freq_domain(self, time, rr_intervals):
frequency, mag0 = self.lomb(time, rr_intervals, 4.0, 2.0)
frequency = np.round(frequency, 8)
mag0 = mag0 / 2.0
mag0 = np.round(mag0, 8)
result = self.power(frequency, mag0)
return result[0], result[1], result[2], result[3], result[0] + result[1] + result[2] + result[3], \
result[2] / result[3]
def time_domain(time, rr_intervals, ann):
sum_rr = 0.0
sum_rr2 = 0.0
rmssd = 0.0
totnn = 0
totnnn = 0
nrr = 1
totrr = 1
nnx = 0
nnn = 0
lastann = ann[0]
lastrr = int(rr_intervals[0])
lenght = 300
t = float(time[0])
end = t + lenght
i = 0
ratbuf = np.zeros(2400)
avbuf = np.zeros(2400)
sdbuf = np.zeros(2400)
for x in range(1, len(ann)):
t = float(time[x])
while t > (end+lenght):
i += 1
end += lenght
if t >= end:
if nnn > 1:
ratbuf[i] = nnn/nrr
sdbuf[i] = np.sqrt(((sdbuf[i] - avbuf[i]*avbuf[i]/nnn) / (nnn-1)))
avbuf[i] /= nnn
i += 1
nnn = nrr = 0
end += lenght
nrr += 1
totrr += 1
if ann[x] == 'N' and ann[x-1] == 'N':
rr_intervals[x] = int(rr_intervals[x])
nnn += 1
avbuf[i] += rr_intervals[x]
sdbuf[i] += (rr_intervals[x] * rr_intervals[x])
sum_rr += rr_intervals[x]
sum_rr2 += (rr_intervals[x] * rr_intervals[x])
totnn += 1
if lastann == 'N':
totnnn += 1
rmssd += (rr_intervals[x] - lastrr) * (rr_intervals[x] - lastrr)
# nndif[0] = NNDIF
if abs(rr_intervals[x] - lastrr) - 0.05 > (10 ** -10):
nnx += 1
lastann = ann[x-1]
lastrr = rr_intervals[x]
if totnn == 0:
return 0, 0, 0, 0
sdnn = np.sqrt((sum_rr2 - sum_rr * sum_rr / totnn) / (totnn - 1))
rmssd = np.sqrt(rmssd/totnnn)
pnn50 = nnx / totnnn
if nnn > 1:
ratbuf[i] = nnn / nrr
sdbuf[i] = np.sqrt((sdbuf[i] - avbuf[i] * avbuf[i] / nnn) / (nnn - 1))
avbuf[i] /= nnn
nb = i + 1
sum_rr = 0.0
sum_rr2 = 0.0
k = 0
h = 0
while k < nb:
if ratbuf[k] != 0:
h += 1
sum_rr += avbuf[k]
sum_rr2 += (avbuf[k] * avbuf[k])
k += 1
sdann = np.sqrt((sum_rr2 - sum_rr * sum_rr / h) / (h - 1))
return sdnn, sdann, rmssd, pnn50
def get_result_from_get_hrv(filename):
with open(filename, 'r') as f:
csv_reader = csv.reader(f, delimiter=',')
index = 0
for row in csv_reader:
if index > 0:
output = [s.strip() for s in row[0].split('=') if s]
# print('output = ', output)
if output[0] == 'SDNN':
sdnn = output[1]
if output[0] == 'SDANN':
sdann = output[1]
if output[0] == 'rMSSD':
rmssd = output[1]
if output[0] == 'pNN50':
pnn50 = output[1]
if output[0] == 'ULF PWR':
ulf = output[1]
if output[0] == 'VLF PWR':
vlf = output[1]
if output[0] == 'LF PWR':
lf = output[1]
if output[0] == 'HF PWR':
hf = output[1]
if output[0] == 'TOT PWR':
tp = output[1]
if output[0] == 'LF/HF':
ratio_lf_hf = output[1]
index += 1
return float(sdnn), float(sdann), float(rmssd), float(pnn50), float(ulf), float(vlf), \
float(lf), float(hf), float(tp), float(ratio_lf_hf)
def save_file():
extension = "atr"
result_all = []
file_process = ['File']
sdnn_l = ['sdnn']
sdann_l = ['sdann']
rmssd_l = ['rmssd']
pnn50_l = ['pnn50']
ulf_l = ['ulf']
vlf_l = ['vlf']
lf_l = ['lf']
hf_l = ['hf']
tp_l = ['tp']
ratio_lf_hf_l = ['ratio_lf_hf']
sdnn_l_p = ['sdnn']
sdann_l_p = ['sdann']
rmssd_l_p = ['rmssd']
pnn50_l_p = ['pnn50']
ulf_l_p = ['ulf']
vlf_l_p = ['vlf']
lf_l_p = ['lf']
hf_l_p = ['hf']
tp_l_p = ['tp']
ratio_lf_hf_l_p = ['ratio_lf_hf']
test_file = ['103', '113', '117', '121', '123', '200', '202', '210', '212', '213',
'219', '221', '213', '228', '231', '233', '234',
'101', '106', '108', '112', '114', '115', '116', '119', '122', '201', '203',
'205', '208', '209', '215', '220', '223', '230',
'105', '100']
file_dis = ['109', '111', '118', '124', '207', '214', '232']
for root, dirs, files in os.walk(DATA_PATH):
files = np.sort(files)
for name in files:
if extension in name:
if os.path.basename(name[:-4]) not in test_file:
continue
print('Processing file...', os.path.basename(name))
cur_dir = os.getcwd()
os.chdir(DATA_PATH)
os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))
time_m = []
rr_intervals = []
ann = []
with open(name.split('.')[0] + '.rr', 'r') as rr_file:
for line in rr_file:
time_m.append(line.split(' ')[0])
rr_intervals.append(line.split(' ')[1])
ann.append(line.split(' ')[2].split('\n')[0])
time_m = np.asarray(time_m, dtype=float)
rr_intervals = np.asarray(rr_intervals, dtype=float)
sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)
if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
print('No result hrv')
file_dis.append(os.path.basename(name[:-4]))
continue
print('sdnn', sdnn)
print('rmssd', rmssd)
print('pnn50', pnn50)
print('sdann', sdann)
time_m = time_m - time_m[0]
time_m = np.round(time_m, 3)
time_nn = []
nn_intervals = []
for i in range(1, len(ann)):
if ann[i] == 'N' and ann[i - 1] == 'N':
nn_intervals.append(rr_intervals[i])
time_nn.append(time_m[i])
time_nn = np.asarray(time_nn, dtype=float)
nn_intervals = np.asarray(nn_intervals, dtype=float)
fc = FreqDomainClass()
ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)
sdnn_l.append(sdnn)
sdann_l.append(sdann)
rmssd_l.append(rmssd)
pnn50_l.append(pnn50)
ulf_l.append(ulf)
vlf_l.append(vlf)
lf_l.append(lf)
hf_l.append(hf)
tp_l.append(tp)
ratio_lf_hf_l.append(ratio_lf_hf)
print('ULF PWR: ', ulf)
print('VLF PWR: ', vlf)
print('LF PWR: ', lf)
print('HF PWR: ', hf)
print('TOT PWR: ', tp)
print('LF/HF: ', ratio_lf_hf)
if os.path.exists('physionet_hrv.txt'):
os.remove('physionet_hrv.txt')
os.system('get_hrv -R ' + name.split('.')[0] + '.rr >> ' + 'physionet_hrv.txt')
sdnn, sdann, rmssd, pnn50, ulf, vlf, lf, hf, tp, ratio_lf_hf = \
get_result_from_get_hrv('physionet_hrv.txt')
file_process.append(os.path.basename(name[:-4]))
sdnn_l_p.append(sdnn)
sdann_l_p.append(sdann)
rmssd_l_p.append(rmssd)
pnn50_l_p.append(pnn50)
ulf_l_p.append(ulf)
vlf_l_p.append(vlf)
lf_l_p.append(lf)
hf_l_p.append(hf)
tp_l_p.append(tp)
ratio_lf_hf_l_p.append(ratio_lf_hf)
os.chdir(cur_dir)
result_all.append(file_process)
result_all.append(sdnn_l)
result_all.append(sdnn_l_p)
result_all.append(sdann_l)
result_all.append(sdann_l_p)
result_all.append(rmssd_l)
result_all.append(rmssd_l_p)
result_all.append(pnn50_l)
result_all.append(pnn50_l_p)
result_all.append(ulf_l)
result_all.append(ulf_l_p)
result_all.append(vlf_l)
result_all.append(vlf_l_p)
result_all.append(lf_l)
result_all.append(lf_l_p)
result_all.append(hf_l)
result_all.append(hf_l_p)
result_all.append(tp_l)
result_all.append(tp_l_p)
result_all.append(ratio_lf_hf_l)
result_all.append(ratio_lf_hf_l_p)
print(file_dis)
with open('hrv2.csv', 'w+') as f:
writer = csv.writer(f)
for values in zip_longest(*result_all):
writer.writerow(values)
def main():
extension = "atr"
for root, dirs, files in os.walk(DATA_PATH):
files = np.sort(files)
for name in files:
if extension in name:
if name not in ['101.atr']:
continue
cur_dir = os.getcwd()
os.chdir(DATA_PATH)
os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))
time_m = []
rr_intervals = []
ann = []
with open(name.split('.')[0] + '.rr', 'r') as rr_file:
for line in rr_file:
time_m.append(line.split(' ')[0])
rr_intervals.append(line.split(' ')[1])
ann.append(line.split(' ')[2].split('\n')[0])
time_m = np.asarray(time_m, dtype=float)
rr_intervals = np.asarray(rr_intervals, dtype=float)
sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)
if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
print('No result hrv')
return 0
print('sdnn', sdnn)
print('rmssd', rmssd)
print('pnn50', pnn50)
print('sdann', sdann)
time_m = time_m - time_m[0]
time_m = np.round(time_m, 3)
time_nn = []
nn_intervals = []
for i in range(1, len(ann)):
if ann[i] == 'N' and ann[i - 1] == 'N':
nn_intervals.append(rr_intervals[i])
time_nn.append(time_m[i])
time_nn = np.asarray(time_nn, dtype=float)
nn_intervals = np.asarray(nn_intervals, dtype=float)
start = time.time()
fc = FreqDomainClass()
ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)
end = time.time()
print('ULF PWR: ', ulf)
print('VLF PWR: ', vlf)
print('LF PWR: ', lf)
print('HF PWR: ', hf)
print('TOT PWR: ', tp)
print('LF/HF: ', ratio_lf_hf)
print('finish', end - start)
os.chdir(cur_dir)
关于python - 在 Python 中使用 LombScargle 对 HRV 数据进行频谱分析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44863146/
我对复值频谱执行 iFFT 并通过让第一个样本归零来更改相应的时域信号。最后,我通过 FFT 将其转换回频域。 我想知道这里使用两侧(对称)频谱或仅使用一侧频谱(仅正频率)之间的(物理)差异在哪里,因
我正在尝试提高使用 cython 计算 Jonswap 频谱的性能。但它似乎比原始代码慢得多。我该如何改进? 赛通代码: from libc.math cimport exp from libc.st
我目前可以使用 JavaScript Web Audio API 播放音轨。在播放此轨道时,我可以提取 FFT 频谱数据、峰值、RMS 值等。 但是,出于我的应用程序的目的,我需要能够在开始播放轨道之
我正在尝试将频谱保存在我的 FMOD_DSP_PARAMETER_FFT 中,但我只收到充满零的频谱,如果你能看到我的错误我会同意,我认为我没有很好地将 DSP 连接到 channel 或类似的东西,
我有一个复杂的信号,我想对其进行 FFT。使用 Mathematica 我得到以下结果: (* Some acquisition params *) fS = 100. 10^6; time = 10
我是一名优秀的程序员,十分优秀!