gpt4 book ai didi

python - 在 Python 中使用 LombScargle 对 HRV 数据进行频谱分析

转载 作者:太空宇宙 更新时间:2023-11-03 15:03:29 28 4
gpt4 key购买 nike

我正在处理 RR 峰值,并希望导出 HRV 的频域测量值,以重新创建 Physionet(WFDB 工具)的 native C 软件包的结果。信号处理和频谱分析对我来说都是新领域,但经过漫长的一周尝试错误后,我已经根据 Astropy 编写了一些代码。尝试了其他几种解决方案后。

from astropy.stats import LombScargle
import random
dy = 0.1 * random.randint(1,100)
t = drive01["time"].values
y = drive01["intervals"].values
frequency, power = LombScargle(t, y,dy).autopower(minimum_frequency=0.0,maximum_frequency=4)

plt.plot(frequency, power)

这将创建一个看起来与 Physionets 包中的图非常相似的图。 Matplot lib

Physionets HRV 工具使用代码 get_hrv 绘制了此图 enter image description here

然后通过计算常见的频域测量,我得到了截然不同的结果。

Pxx = np.nan_to_num(power)
Fxx = np.nan_to_num(frequency)

ulf = 0.003
vlf = 0.04
lf = 0.15
hf = 0.4
Fs = 15.5 # the sampling rate of the drive file
# find the indexes corresponding to the VLF, LF, and HF bands
vlf_freq_band = (Fxx >= ulf) & (Fxx <= vlf)
lf_freq_band = (Fxx >= vlf) & (Fxx <= lf)
hf_freq_band = (Fxx >= lf) & (Fxx <= hf)
tp_freq_band = (Fxx >= 0) & (Fxx <= hf)
# Calculate the area under the given frequency band
dy = 1.0 / Fs
VLF = np.trapz(y=abs(Pxx[vlf_freq_band]), x=None, dx=dy)
LF = np.trapz(y=abs(Pxx[lf_freq_band]), x=None, dx=dy)
HF = np.trapz(y=abs(Pxx[hf_freq_band]), x=None, dx=dy)
TP = np.trapz(y=abs(Pxx[tp_freq_band]), x=None, dx=dy)
LF_HF = float(LF) / HF

Python

 'HF': 0.10918703853414605,
'LF': 0.050074418080717789,
'LF/HF': 0.45861137689028925,
'TP': 0.20150514290250854,
'VLF': 0.025953350304821571

来自 Physionet 包:

TOT PWR  = 0.0185973
VLF PWR = 0.00372733
LF PWR = 0.00472635
HF PWR = 0.0101436
LF/HF = 0.465944

比较结果时,结果如下所示:

    Python  Physionet   
TP 0.201505143 0.0185973 Quite similar + decimal dif
HF 0.109187039 0.0101436 Quite similar + decimal dif
LF 0.050074418 0.00472635 Quite similar + decimal dif
VLF 0.02595335 0.00372733 Not similar
LF/HF 0.458611377 0.465944 Quite similar

Python中的计算基于另一个Stackoverflow post的代码但他从受访者那里得到的修复是基于一个 python 模块,我无法工作,而且他没有使用 Lomb periodgram。我也非常愿意尝试其他东西,只要它能处理不均匀的样本。我正在使用的数据是 drivedb from Physionet我使用 Physionet 包制作了一个包含 RR 峰值和时间的文本文件,该文件被读入 Pandas DataFrame。 The textfile can be found here

最佳答案

LombScargle 基于 Astropy 计算功率,与 Physionet 的 C 软件包(WFDB 工具)不同。我再次用 python 编写 lombscargle,结果与 Physionet(WFDB 工具)的 C 包相同。

import numpy as np
import os
import math
import csv
from itertools import zip_longest
import time


DATA_PATH = '/home/quangpc/Desktop/Data/PhysionetData/mitdb/'


class FreqDomainClass:

@staticmethod
def power(freq, mag):
lo = [0, 0.0033, 0.04, 0.15]
hi = [0.0033, 0.04, 0.15, 0.4]
pr = np.zeros(4)
nbands = 4
for index in range(0, len(freq)):
pwr = np.power(mag[index], 2)
for n in range(0, nbands):
if (freq[index] >= lo[n]) and freq[index] <= hi[n]:
pr[n] += pwr
break
return pr

@staticmethod
def avevar(y):
var = 0.0
ep = 0.0
ave = np.mean(y)
for i in range(len(y)):
s = y[i] - ave
ep += s
var += s * s
var = (var - ep * ep / len(y)) / (len(y) - 1)
return var

def lomb(self, t, h, ofac, hifac):
period = max(t) - min(t)
z = h - np.mean(h)
f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
f = f[:int(len(f) / 2) + 1]
f = np.reshape(f, (len(f), -1))

w = 2 * np.pi * f
lenght_t = len(t)
t = np.reshape(t, (lenght_t, -1))
t = np.transpose(t)
tau = np.arctan2(np.sum(np.sin(2 * w * t), axis=1), np.sum(np.cos(2 * w * t), axis=1)) / (2 * w)
tau = np.diag(tau)
tau = np.reshape(tau, (len(tau), -1))
tau = np.tile(tau, (1, lenght_t))
cos = np.cos(w * (t - tau))
sin = np.sin(w * (t - tau))
pc = np.power(np.sum(z * cos, axis=1), 2)
ps = np.power(np.sum(z * sin, axis=1), 2)
cs = pc / np.sum(np.power(cos, 2), axis=1)
ss = ps / np.sum(np.power(sin, 2), axis=1)
p = cs + ss

pwr = self.avevar(h)
nout = len(h)
p = p / (2 * pwr)
p = p / (nout / (2.0 * pwr))

return f, np.sqrt(p)

def lomb_for(self, t, h, ofac, hifac):
period = max(t) - min(t)
f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
f = f[:int(len(f) / 2) + 1]
z = h - np.mean(h)
p = np.zeros(len(f))
for i in range(len(f)):
w = 2 * np.pi * f[i]
if w > 0:
twt = 2 * w * t
y = sum(np.sin(twt))
x = sum(np.cos(twt))
tau = math.atan2(y, x) / (2 * w)
wtmt = w * (t - tau)
cs = np.power(sum(np.multiply(z, np.cos(wtmt))), 2) / sum(np.power((np.cos(wtmt)), 2))
ss = np.power(sum(np.multiply(z, np.sin(wtmt))), 2) / sum(np.power((np.sin(wtmt)), 2))
p[i] = cs + ss
else:
p[i] = np.power(sum(np.multiply(z, t)), 1) / sum(np.power(t), 1)

pwr = self.avevar(h)
nout = len(h)
p = p / (2 * pwr)
p = p / (nout / (2.0 * pwr))

return f, np.sqrt(p)

def freq_domain(self, time, rr_intervals):
frequency, mag0 = self.lomb(time, rr_intervals, 4.0, 2.0)
frequency = np.round(frequency, 8)
mag0 = mag0 / 2.0
mag0 = np.round(mag0, 8)
result = self.power(frequency, mag0)
return result[0], result[1], result[2], result[3], result[0] + result[1] + result[2] + result[3], \
result[2] / result[3]


def time_domain(time, rr_intervals, ann):
sum_rr = 0.0
sum_rr2 = 0.0
rmssd = 0.0
totnn = 0
totnnn = 0
nrr = 1
totrr = 1
nnx = 0
nnn = 0
lastann = ann[0]
lastrr = int(rr_intervals[0])
lenght = 300
t = float(time[0])
end = t + lenght
i = 0
ratbuf = np.zeros(2400)
avbuf = np.zeros(2400)
sdbuf = np.zeros(2400)
for x in range(1, len(ann)):
t = float(time[x])
while t > (end+lenght):
i += 1
end += lenght
if t >= end:
if nnn > 1:
ratbuf[i] = nnn/nrr
sdbuf[i] = np.sqrt(((sdbuf[i] - avbuf[i]*avbuf[i]/nnn) / (nnn-1)))
avbuf[i] /= nnn
i += 1
nnn = nrr = 0
end += lenght
nrr += 1
totrr += 1
if ann[x] == 'N' and ann[x-1] == 'N':
rr_intervals[x] = int(rr_intervals[x])
nnn += 1
avbuf[i] += rr_intervals[x]
sdbuf[i] += (rr_intervals[x] * rr_intervals[x])
sum_rr += rr_intervals[x]
sum_rr2 += (rr_intervals[x] * rr_intervals[x])
totnn += 1
if lastann == 'N':
totnnn += 1
rmssd += (rr_intervals[x] - lastrr) * (rr_intervals[x] - lastrr)
# nndif[0] = NNDIF
if abs(rr_intervals[x] - lastrr) - 0.05 > (10 ** -10):
nnx += 1
lastann = ann[x-1]
lastrr = rr_intervals[x]
if totnn == 0:
return 0, 0, 0, 0
sdnn = np.sqrt((sum_rr2 - sum_rr * sum_rr / totnn) / (totnn - 1))
rmssd = np.sqrt(rmssd/totnnn)
pnn50 = nnx / totnnn
if nnn > 1:
ratbuf[i] = nnn / nrr
sdbuf[i] = np.sqrt((sdbuf[i] - avbuf[i] * avbuf[i] / nnn) / (nnn - 1))
avbuf[i] /= nnn
nb = i + 1
sum_rr = 0.0
sum_rr2 = 0.0
k = 0
h = 0
while k < nb:
if ratbuf[k] != 0:
h += 1
sum_rr += avbuf[k]
sum_rr2 += (avbuf[k] * avbuf[k])
k += 1
sdann = np.sqrt((sum_rr2 - sum_rr * sum_rr / h) / (h - 1))
return sdnn, sdann, rmssd, pnn50


def get_result_from_get_hrv(filename):
with open(filename, 'r') as f:
csv_reader = csv.reader(f, delimiter=',')
index = 0
for row in csv_reader:
if index > 0:
output = [s.strip() for s in row[0].split('=') if s]
# print('output = ', output)
if output[0] == 'SDNN':
sdnn = output[1]
if output[0] == 'SDANN':
sdann = output[1]
if output[0] == 'rMSSD':
rmssd = output[1]
if output[0] == 'pNN50':
pnn50 = output[1]
if output[0] == 'ULF PWR':
ulf = output[1]
if output[0] == 'VLF PWR':
vlf = output[1]
if output[0] == 'LF PWR':
lf = output[1]
if output[0] == 'HF PWR':
hf = output[1]
if output[0] == 'TOT PWR':
tp = output[1]
if output[0] == 'LF/HF':
ratio_lf_hf = output[1]

index += 1

return float(sdnn), float(sdann), float(rmssd), float(pnn50), float(ulf), float(vlf), \
float(lf), float(hf), float(tp), float(ratio_lf_hf)


def save_file():
extension = "atr"

result_all = []
file_process = ['File']
sdnn_l = ['sdnn']
sdann_l = ['sdann']
rmssd_l = ['rmssd']
pnn50_l = ['pnn50']
ulf_l = ['ulf']
vlf_l = ['vlf']
lf_l = ['lf']
hf_l = ['hf']
tp_l = ['tp']
ratio_lf_hf_l = ['ratio_lf_hf']

sdnn_l_p = ['sdnn']
sdann_l_p = ['sdann']
rmssd_l_p = ['rmssd']
pnn50_l_p = ['pnn50']
ulf_l_p = ['ulf']
vlf_l_p = ['vlf']
lf_l_p = ['lf']
hf_l_p = ['hf']
tp_l_p = ['tp']
ratio_lf_hf_l_p = ['ratio_lf_hf']

test_file = ['103', '113', '117', '121', '123', '200', '202', '210', '212', '213',
'219', '221', '213', '228', '231', '233', '234',
'101', '106', '108', '112', '114', '115', '116', '119', '122', '201', '203',
'205', '208', '209', '215', '220', '223', '230',
'105', '100']
file_dis = ['109', '111', '118', '124', '207', '214', '232']
for root, dirs, files in os.walk(DATA_PATH):
files = np.sort(files)

for name in files:
if extension in name:
if os.path.basename(name[:-4]) not in test_file:
continue
print('Processing file...', os.path.basename(name))

cur_dir = os.getcwd()
os.chdir(DATA_PATH)
os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))

time_m = []
rr_intervals = []
ann = []
with open(name.split('.')[0] + '.rr', 'r') as rr_file:
for line in rr_file:
time_m.append(line.split(' ')[0])
rr_intervals.append(line.split(' ')[1])
ann.append(line.split(' ')[2].split('\n')[0])

time_m = np.asarray(time_m, dtype=float)
rr_intervals = np.asarray(rr_intervals, dtype=float)

sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)

if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
print('No result hrv')
file_dis.append(os.path.basename(name[:-4]))
continue

print('sdnn', sdnn)
print('rmssd', rmssd)
print('pnn50', pnn50)
print('sdann', sdann)

time_m = time_m - time_m[0]
time_m = np.round(time_m, 3)
time_nn = []
nn_intervals = []
for i in range(1, len(ann)):
if ann[i] == 'N' and ann[i - 1] == 'N':
nn_intervals.append(rr_intervals[i])
time_nn.append(time_m[i])
time_nn = np.asarray(time_nn, dtype=float)
nn_intervals = np.asarray(nn_intervals, dtype=float)

fc = FreqDomainClass()
ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)

sdnn_l.append(sdnn)
sdann_l.append(sdann)
rmssd_l.append(rmssd)
pnn50_l.append(pnn50)
ulf_l.append(ulf)
vlf_l.append(vlf)
lf_l.append(lf)
hf_l.append(hf)
tp_l.append(tp)
ratio_lf_hf_l.append(ratio_lf_hf)

print('ULF PWR: ', ulf)
print('VLF PWR: ', vlf)
print('LF PWR: ', lf)
print('HF PWR: ', hf)
print('TOT PWR: ', tp)
print('LF/HF: ', ratio_lf_hf)

if os.path.exists('physionet_hrv.txt'):
os.remove('physionet_hrv.txt')
os.system('get_hrv -R ' + name.split('.')[0] + '.rr >> ' + 'physionet_hrv.txt')
sdnn, sdann, rmssd, pnn50, ulf, vlf, lf, hf, tp, ratio_lf_hf = \
get_result_from_get_hrv('physionet_hrv.txt')

file_process.append(os.path.basename(name[:-4]))
sdnn_l_p.append(sdnn)
sdann_l_p.append(sdann)
rmssd_l_p.append(rmssd)
pnn50_l_p.append(pnn50)
ulf_l_p.append(ulf)
vlf_l_p.append(vlf)
lf_l_p.append(lf)
hf_l_p.append(hf)
tp_l_p.append(tp)
ratio_lf_hf_l_p.append(ratio_lf_hf)

os.chdir(cur_dir)

result_all.append(file_process)
result_all.append(sdnn_l)
result_all.append(sdnn_l_p)
result_all.append(sdann_l)
result_all.append(sdann_l_p)
result_all.append(rmssd_l)
result_all.append(rmssd_l_p)
result_all.append(pnn50_l)
result_all.append(pnn50_l_p)
result_all.append(ulf_l)
result_all.append(ulf_l_p)
result_all.append(vlf_l)
result_all.append(vlf_l_p)
result_all.append(lf_l)
result_all.append(lf_l_p)
result_all.append(hf_l)
result_all.append(hf_l_p)
result_all.append(tp_l)
result_all.append(tp_l_p)
result_all.append(ratio_lf_hf_l)
result_all.append(ratio_lf_hf_l_p)
print(file_dis)
with open('hrv2.csv', 'w+') as f:
writer = csv.writer(f)
for values in zip_longest(*result_all):
writer.writerow(values)


def main():
extension = "atr"
for root, dirs, files in os.walk(DATA_PATH):
files = np.sort(files)
for name in files:
if extension in name:
if name not in ['101.atr']:
continue
cur_dir = os.getcwd()
os.chdir(DATA_PATH)
os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))
time_m = []
rr_intervals = []
ann = []
with open(name.split('.')[0] + '.rr', 'r') as rr_file:
for line in rr_file:
time_m.append(line.split(' ')[0])
rr_intervals.append(line.split(' ')[1])
ann.append(line.split(' ')[2].split('\n')[0])
time_m = np.asarray(time_m, dtype=float)
rr_intervals = np.asarray(rr_intervals, dtype=float)
sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)
if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
print('No result hrv')
return 0
print('sdnn', sdnn)
print('rmssd', rmssd)
print('pnn50', pnn50)
print('sdann', sdann)

time_m = time_m - time_m[0]
time_m = np.round(time_m, 3)
time_nn = []
nn_intervals = []
for i in range(1, len(ann)):
if ann[i] == 'N' and ann[i - 1] == 'N':
nn_intervals.append(rr_intervals[i])
time_nn.append(time_m[i])
time_nn = np.asarray(time_nn, dtype=float)
nn_intervals = np.asarray(nn_intervals, dtype=float)

start = time.time()
fc = FreqDomainClass()
ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)
end = time.time()

print('ULF PWR: ', ulf)
print('VLF PWR: ', vlf)
print('LF PWR: ', lf)
print('HF PWR: ', hf)
print('TOT PWR: ', tp)
print('LF/HF: ', ratio_lf_hf)
print('finish', end - start)
os.chdir(cur_dir)

关于python - 在 Python 中使用 LombScargle 对 HRV 数据进行频谱分析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44863146/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com