gpt4 book ai didi

python - 在具有时间限制的事件数据中查找子序列的出现

转载 作者:太空狗 更新时间:2023-10-30 00:08:53 26 4
gpt4 key购买 nike

我有以下格式的事件数据:

event     A A A A A C B C D A A A B
timestamp 0 3 4 4 5 5 6 7 7 8 8 9 10

给定序列 S 和事件 E 的列表,我如何才能高效地找到 S 中的非重叠出现E 在时间窗口 W 内,并且发生的每个事件都在前一个事件的间隔 L 内?

S = {A, AA, AAA, AAB, BB, CA}, W=3, L=2 的示例结果:

occurrences:
A: [0, 3, 4, 4, 5, 8, 8, 9]
AA: [(3,4), (4,5), (8,8)]
AAA: [(3,4,4), (8,8,9)]
AAB: [(4,5,6), (8,9,10)]
BB: []
CA: [(7,8)]

如您所见,事件不一定是连续的(即序列中的所有元素都出现在一个系列中)。时间戳仅以整数表示。

最佳答案

如果您跟踪有效的到目前为止不完整的子序列,并在它们完成或不可能再完成时立即忘记它们,则可以通过一次传递数据来完成。为此,我编写了一个 Sequence 类来跟踪

  • 序列名称
  • 其事件发生的索引,以确定它是否与之前完成的序列重叠
  • 事件的时间,因为那是我们的输出,我们需要它们来检查我们的约束
  • 我们当前在序列名称中的位置,以便我们知道下一个事件应该是什么以及序列何时完成,以及
  • 如果超出我们的窗口/长度限制,则忘记序列的标志。

代码

events = 'AAAAACBCDAAAB'
timestamps = [0, 3, 4, 4, 5, 5, 6, 7, 7, 8, 8, 9, 10]

SEQUENCES = {'A', 'AA', 'AAA', 'AAB', 'BB', 'CA'}
WINDOW = 3
LENGTH = 2

class Sequence:
def __init__(self, seq, starting_index, starting_time):
self.sequence = seq
self.pos = 0
self.indices = [starting_index]
self.times = [starting_time]
self.has_expired = False

def is_next_event_acceptable(self, event, time):
if self.sequence[self.pos+1] != event:
return False
else:
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True

def add_event_if_acceptable(self, event, index, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.indices.append(index)
self.times.append(time)

def is_complete(self):
return len(self.sequence) == self.pos + 1

def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps):
working_sequences = []
results = {s: {'seq': [], 'last_index': -1} for s in SEQUENCES}

for index, (event, time) in enumerate(zip(events, timestamps)):
# First work with any present sequences in the queue
# and then introduce any new ones
for Seq in working_sequences:
Seq.add_event_if_acceptable(event, index, time)
for seq in SEQUENCES:
if seq.startswith(event):
working_sequences.append(Sequence(seq, index, time))
# Any successfully completed sequences, or sequences
# that can't be completed anymore are to be removed
seq_idx_to_remove = []
for i, Seq in enumerate(working_sequences):
if Seq.has_expired:
seq_idx_to_remove.append(i)
elif Seq.is_complete():
seq_idx_to_remove.append(i)
# Only add the sequence to the results if the indices
# aren't overlapping with the previous one
sequence, times, indices = Seq.sequence, Seq.times, Seq.indices
if results[sequence]['last_index'] < indices[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_index'] = indices[-1]
# We must remove the items in reverse order so that
# we don't disturb the 'forward' ordering
for i in seq_idx_to_remove[::-1]:
del working_sequences[i]

return results

results = find_non_overlapping_subsequences(events, timestamps)
for key, value in sorted(results.items()):
print(key, value['seq'])

输出

A [[0], [3], [4], [4], [5], [8], [8], [9]]
AA [[3, 4], [4, 5], [8, 8]]
AAA [[3, 4, 4], [8, 8, 9]]
AAB [[4, 5, 6], [8, 8, 10]]
BB []
CA [[7, 8]]

更新

对于较长的事件系列,这可能需要很长时间,这取决于您在每个步骤中必须考虑的序列数量。这意味着您的序列生命周期越长,您在每次迭代中检查的内容就越多。

  • 如果序列需要完成更多事件,则需要更多迭代才能完成。
  • SEQUENCES 的长度越长,您在每一步引入的新序列就越多。
  • 如果 window 或 length duration 时间较长,序列将在过期前存活更长时间。
  • 您拥有的独特事件越多(如果它们统一出现在系列中),完成给定序列所需的时间就越长。例如,如果在每次迭代中您只遇到 As 和 B,那么序列“AB”的完成速度会比字母表中的任何字母都快。

虽然上述因素最终决定了每个迭代步骤的时长,但可以进行一些优化。在每一步中,我们都会遍历 working_sequences 中所有当前未完成的序列,并检查新事件对它们的影响。但是,如果我们重新编写 Sequence 类,每次更新序列时,我们都可以计算下一个事件是什么。然后,在每个步骤中,我们都可以根据该事实对这些序列进行分类。这样,如果下一个事件是“A”,我们将只检查接受该事件的任何序列。这也可以方便地拆分已经完成或过期的序列。

第二个影响较小的优化是提前计算以特定事件开始的所有序列,这样我们就不必每次都遍历 SEQUENCES

这应该可以避免任何不必要的检查并提高整体性能。然而,最坏的情况仍然与上面的简单版本相同。例如,如果您的 90% 的事件是“A”,并且 90% 的开始事件或序列的下一个事件是“A”,那么与之前相比,这仍然需要 90% 的时间。

代码中的以下更改反射(reflect)了这些优化。我还假设时间戳严格递增,因此可以简化任何依赖于 indices 属性的内容。

EXPIRED = '#'
COMPLETED = '='

class Sequence:
def __init__(self, seq, starting_time):
self.sequence = seq
self.pos = 0
self.times = [starting_time]
self.has_expired = False
self.next_event = self.next_event_query()

def is_next_event_acceptable(self, event, time):
if self.next_event != event:
return False
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True

def update_sequence(self, event, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.times.append(time)
self.next_event = self.next_event_query()

def next_event_query(self):
if self.has_expired:
return EXPIRED
return COMPLETED if len(self.sequence) == self.pos + 1 else self.sequence[self.pos+1]

def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps):
unique_events = set(events)
starting_events = {}
for seq in SEQUENCES:
unique_events.update(seq)
first_event = seq[0]
if first_event not in starting_events:
starting_events[first_event] = []
starting_events[first_event].append(seq)
for e in unique_events:
if e not in starting_events:
starting_events[e] = []

all_symbols = ''.join(unique_events) + EXPIRED + COMPLETED
working_sequences = {event: [] for event in all_symbols}
next_event_lists = {event: [] for event in all_symbols}
results = {s: {'seq': [], 'last_time': timestamps[0]-1} for s in SEQUENCES}

for event, time in zip(events, timestamps):
next_event_lists[event] = []
for S in working_sequences[event]:
S.update_sequence(event, time)
next_event_lists[S.next_event].append(S)
for seq in starting_events[event]:
S = Sequence(seq, time)
next_event_lists[S.next_event].append(S)
for S in next_event_lists[COMPLETED]:
# Only add the sequence to the results if the timestamps
# don't overlap with the previous one
sequence, times = S.sequence, S.times
if results[sequence]['last_time'] < times[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_time'] = times[-1]
next_event_lists[EXPIRED] = []
next_event_lists[COMPLETED] = []
working_sequences = next_event_lists.copy()

return results

关于python - 在具有时间限制的事件数据中查找子序列的出现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54463702/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com