gpt4 book ai didi

python - 如何在 python 中使用 kNN 的动态时间扭曲

转载 作者:太空宇宙 更新时间:2023-11-04 01:54:56 25 4
gpt4 key购买 nike

我有一个包含两个标签(01)的时间序列数据集。我正在使用动态时间规整 (DTW) 作为使用 k 最近邻 (kNN) 进行分类的相似性度量,如这两篇精彩的博客文章中所述:

  • https://nbviewer.jupyter.org/github/markdregan/K-Nearest-Neighbors-with-Dynamic-Time-Warping/blob/master/K_Nearest_Neighbor_Dynamic_Time_Warping.ipynb
  • http://alexminnaar.com/2014/04/16/Time-Series-Classification-and-Clustering-with-Python.html

    Arguments
    ---------
    n_neighbors : int, optional (default = 5)
    Number of neighbors to use by default for KNN

    max_warping_window : int, optional (default = infinity)
    Maximum warping window allowed by the DTW dynamic
    programming function

    subsample_step : int, optional (default = 1)
    Step size for the timeseries array. By setting subsample_step = 2,
    the timeseries length will be reduced by 50% because every second
    item is skipped. Implemented by x[:, ::subsample_step]
    """

    def __init__(self, n_neighbors=5, max_warping_window=10000, subsample_step=1):
    self.n_neighbors = n_neighbors
    self.max_warping_window = max_warping_window
    self.subsample_step = subsample_step

    def fit(self, x, l):
    """Fit the model using x as training data and l as class labels

    Arguments
    ---------
    x : array of shape [n_samples, n_timepoints]
    Training data set for input into KNN classifer

    l : array of shape [n_samples]
    Training labels for input into KNN classifier
    """

    self.x = x
    self.l = l

    def _dtw_distance(self, ts_a, ts_b, d = lambda x,y: abs(x-y)):
    """Returns the DTW similarity distance between two 2-D
    timeseries numpy arrays.

    Arguments
    ---------
    ts_a, ts_b : array of shape [n_samples, n_timepoints]
    Two arrays containing n_samples of timeseries data
    whose DTW distance between each sample of A and B
    will be compared

    d : DistanceMetric object (default = abs(x-y))
    the distance measure used for A_i - B_j in the
    DTW dynamic programming function

    Returns
    -------
    DTW distance between A and B
    """

    # Create cost matrix via broadcasting with large int
    ts_a, ts_b = np.array(ts_a), np.array(ts_b)
    M, N = len(ts_a), len(ts_b)
    cost = sys.maxint * np.ones((M, N))

    # Initialize the first row and column
    cost[0, 0] = d(ts_a[0], ts_b[0])
    for i in xrange(1, M):
    cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])

    for j in xrange(1, N):
    cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])

    # Populate rest of cost matrix within window
    for i in xrange(1, M):
    for j in xrange(max(1, i - self.max_warping_window),
    min(N, i + self.max_warping_window)):
    choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
    cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])

    # Return DTW distance given window
    return cost[-1, -1]

    def _dist_matrix(self, x, y):
    """Computes the M x N distance matrix between the training
    dataset and testing dataset (y) using the DTW distance measure

    Arguments
    ---------
    x : array of shape [n_samples, n_timepoints]

    y : array of shape [n_samples, n_timepoints]

    Returns
    -------
    Distance matrix between each item of x and y with
    shape [training_n_samples, testing_n_samples]
    """

    # Compute the distance matrix
    dm_count = 0

    # Compute condensed distance matrix (upper triangle) of pairwise dtw distances
    # when x and y are the same array
    if(np.array_equal(x, y)):
    x_s = np.shape(x)
    dm = np.zeros((x_s[0] * (x_s[0] - 1)) // 2, dtype=np.double)

    p = ProgressBar(shape(dm)[0])

    for i in xrange(0, x_s[0] - 1):
    for j in xrange(i + 1, x_s[0]):
    dm[dm_count] = self._dtw_distance(x[i, ::self.subsample_step],
    y[j, ::self.subsample_step])

    dm_count += 1
    p.animate(dm_count)

    # Convert to squareform
    dm = squareform(dm)
    return dm

    # Compute full distance matrix of dtw distnces between x and y
    else:
    x_s = np.shape(x)
    y_s = np.shape(y)
    dm = np.zeros((x_s[0], y_s[0]))
    dm_size = x_s[0]*y_s[0]

    p = ProgressBar(dm_size)

    for i in xrange(0, x_s[0]):
    for j in xrange(0, y_s[0]):
    dm[i, j] = self._dtw_distance(x[i, ::self.subsample_step],
    y[j, ::self.subsample_step])
    # Update progress bar
    dm_count += 1
    p.animate(dm_count)

    return dm

    def predict(self, x):
    """Predict the class labels or probability estimates for
    the provided data

    Arguments
    ---------
    x : array of shape [n_samples, n_timepoints]
    Array containing the testing data set to be classified

    Returns
    -------
    2 arrays representing:
    (1) the predicted class labels
    (2) the knn label count probability
    """

    dm = self._dist_matrix(x, self.x)

    # Identify the k nearest neighbors
    knn_idx = dm.argsort()[:, :self.n_neighbors]

    # Identify k nearest labels
    knn_labels = self.l[knn_idx]

    # Model Label
    mode_data = mode(knn_labels, axis=1)
    mode_label = mode_data[0]
    mode_proba = mode_data[1]/self.n_neighbors

    return mode_label.ravel(), mode_proba.ravel()

但是,对于使用 kNN 进行分类,这两篇文章使用了他们自己的 kNN 算法。

我想在我的分类中使用 sklearn 的选项,例如 gridsearchcv。因此,我想知道如何将动态时间规整 (DTW) 与 sklearn kNN 结合使用。

注意:我不仅限于 sklearn,也很高兴在其他库中收到答案

如果需要,我很乐意提供更多详细信息。

最佳答案

您可以为 KNN 使用自定义指标。因此,您只需要自己实现 DTW(或使用/改编 Python 中任何现有的 DTW 实现) [gist of this code] .

import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

#toy dataset
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

#custom metric
def DTW(a, b):
an = a.size
bn = b.size
pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
cumdist[0,0] = 0

for ai in range(an):
for bi in range(bn):
minimum_cost = np.min([cumdist[ai, bi+1],
cumdist[ai+1, bi],
cumdist[ai, bi]])
cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost

return cumdist[an, bn]

#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)



#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

哪个产量

Fitting 3 folds for each of 3 candidates, totalling 9 fits        

[Parallel(n_jobs=1)]: Done 9 out of 9 | elapsed: 29.0s finished

precision recall f1-score support

0 0.57 0.89 0.70 18
1 0.60 0.20 0.30 15

avg / total 0.58 0.58 0.52 33

关于python - 如何在 python 中使用 kNN 的动态时间扭曲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57015499/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com