- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有以下格式的事件数据:
event A A A A A C B C D A A A B
timestamp 0 3 4 4 5 5 6 7 7 8 8 9 10
给定序列 S
和事件 E
的列表,我如何才能高效地找到 S
在 中的非重叠出现E
在时间窗口 W
内,并且发生的每个事件都在前一个事件的间隔 L
内?
S = {A, AA, AAA, AAB, BB, CA}, W=3, L=2
的示例结果:
occurrences:
A: [0, 3, 4, 4, 5, 8, 8, 9]
AA: [(3,4), (4,5), (8,8)]
AAA: [(3,4,4), (8,8,9)]
AAB: [(4,5,6), (8,9,10)]
BB: []
CA: [(7,8)]
如您所见,事件不一定是连续的(即序列中的所有元素都出现在一个系列中)。时间戳仅以整数表示。
最佳答案
如果您跟踪有效的到目前为止不完整的子序列,并在它们完成或不可能再完成时立即忘记它们,则可以通过一次传递数据来完成。为此,我编写了一个 Sequence
类来跟踪
代码
events = 'AAAAACBCDAAAB'
timestamps = [0, 3, 4, 4, 5, 5, 6, 7, 7, 8, 8, 9, 10]
SEQUENCES = {'A', 'AA', 'AAA', 'AAB', 'BB', 'CA'}
WINDOW = 3
LENGTH = 2
class Sequence:
def __init__(self, seq, starting_index, starting_time):
self.sequence = seq
self.pos = 0
self.indices = [starting_index]
self.times = [starting_time]
self.has_expired = False
def is_next_event_acceptable(self, event, time):
if self.sequence[self.pos+1] != event:
return False
else:
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True
def add_event_if_acceptable(self, event, index, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.indices.append(index)
self.times.append(time)
def is_complete(self):
return len(self.sequence) == self.pos + 1
def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)
def find_non_overlapping_subsequences(events, timestamps):
working_sequences = []
results = {s: {'seq': [], 'last_index': -1} for s in SEQUENCES}
for index, (event, time) in enumerate(zip(events, timestamps)):
# First work with any present sequences in the queue
# and then introduce any new ones
for Seq in working_sequences:
Seq.add_event_if_acceptable(event, index, time)
for seq in SEQUENCES:
if seq.startswith(event):
working_sequences.append(Sequence(seq, index, time))
# Any successfully completed sequences, or sequences
# that can't be completed anymore are to be removed
seq_idx_to_remove = []
for i, Seq in enumerate(working_sequences):
if Seq.has_expired:
seq_idx_to_remove.append(i)
elif Seq.is_complete():
seq_idx_to_remove.append(i)
# Only add the sequence to the results if the indices
# aren't overlapping with the previous one
sequence, times, indices = Seq.sequence, Seq.times, Seq.indices
if results[sequence]['last_index'] < indices[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_index'] = indices[-1]
# We must remove the items in reverse order so that
# we don't disturb the 'forward' ordering
for i in seq_idx_to_remove[::-1]:
del working_sequences[i]
return results
results = find_non_overlapping_subsequences(events, timestamps)
for key, value in sorted(results.items()):
print(key, value['seq'])
输出
A [[0], [3], [4], [4], [5], [8], [8], [9]]
AA [[3, 4], [4, 5], [8, 8]]
AAA [[3, 4, 4], [8, 8, 9]]
AAB [[4, 5, 6], [8, 8, 10]]
BB []
CA [[7, 8]]
对于较长的事件系列,这可能需要很长时间,这取决于您在每个步骤中必须考虑的序列数量。这意味着您的序列生命周期越长,您在每次迭代中检查的内容就越多。
SEQUENCES
的长度越长,您在每一步引入的新序列就越多。虽然上述因素最终决定了每个迭代步骤的时长,但可以进行一些优化。在每一步中,我们都会遍历 working_sequences
中所有当前未完成的序列,并检查新事件对它们的影响。但是,如果我们重新编写 Sequence
类,每次更新序列时,我们都可以计算下一个事件是什么。然后,在每个步骤中,我们都可以根据该事实对这些序列进行分类。这样,如果下一个事件是“A”,我们将只检查接受该事件的任何序列。这也可以方便地拆分已经完成或过期的序列。
第二个影响较小的优化是提前计算以特定事件开始的所有序列,这样我们就不必每次都遍历 SEQUENCES
。
这应该可以避免任何不必要的检查并提高整体性能。然而,最坏的情况仍然与上面的简单版本相同。例如,如果您的 90% 的事件是“A”,并且 90% 的开始事件或序列的下一个事件是“A”,那么与之前相比,这仍然需要 90% 的时间。
代码中的以下更改反射(reflect)了这些优化。我还假设时间戳严格递增,因此可以简化任何依赖于 indices
属性的内容。
EXPIRED = '#'
COMPLETED = '='
class Sequence:
def __init__(self, seq, starting_time):
self.sequence = seq
self.pos = 0
self.times = [starting_time]
self.has_expired = False
self.next_event = self.next_event_query()
def is_next_event_acceptable(self, event, time):
if self.next_event != event:
return False
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True
def update_sequence(self, event, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.times.append(time)
self.next_event = self.next_event_query()
def next_event_query(self):
if self.has_expired:
return EXPIRED
return COMPLETED if len(self.sequence) == self.pos + 1 else self.sequence[self.pos+1]
def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)
def find_non_overlapping_subsequences(events, timestamps):
unique_events = set(events)
starting_events = {}
for seq in SEQUENCES:
unique_events.update(seq)
first_event = seq[0]
if first_event not in starting_events:
starting_events[first_event] = []
starting_events[first_event].append(seq)
for e in unique_events:
if e not in starting_events:
starting_events[e] = []
all_symbols = ''.join(unique_events) + EXPIRED + COMPLETED
working_sequences = {event: [] for event in all_symbols}
next_event_lists = {event: [] for event in all_symbols}
results = {s: {'seq': [], 'last_time': timestamps[0]-1} for s in SEQUENCES}
for event, time in zip(events, timestamps):
next_event_lists[event] = []
for S in working_sequences[event]:
S.update_sequence(event, time)
next_event_lists[S.next_event].append(S)
for seq in starting_events[event]:
S = Sequence(seq, time)
next_event_lists[S.next_event].append(S)
for S in next_event_lists[COMPLETED]:
# Only add the sequence to the results if the timestamps
# don't overlap with the previous one
sequence, times = S.sequence, S.times
if results[sequence]['last_time'] < times[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_time'] = times[-1]
next_event_lists[EXPIRED] = []
next_event_lists[COMPLETED] = []
working_sequences = next_event_lists.copy()
return results
关于python - 在具有时间限制的事件数据中查找子序列的出现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54463702/
我有一个 ServiceBusQueue(SBQ),它获取大量消息负载。我有一个具有 accessRights(manage) 的 ServiceBusTrigger(SBT),它不断轮询来自 SBQ
在下面给出的结果集中,有 2 个唯一用户 (id),并且查询中可能会出现更多此类用户: 这是多连接查询: select id, name, col1Code, col2Code, col2Va
我正在用 Python 2.7.3 编写一个带有 GRequests 的小脚本和 lxml 可以让我从各种网站收集一些收藏卡价格并进行比较。问题是其中一个网站限制了请求的数量,如果我超过它,就会发回
我想知道何时实际使用删除级联或删除限制以及更新级联或更新限制。我对使用它们或在我的数据库中应用感到很困惑。 最佳答案 在外键约束上使用级联运算符是一个热门话题。 理论上,如果您知道删除父对象也将自动删
下面是我的输出,我只想显示那些重复的名字。每个名字都是飞行员,数字是飞行员驾驶的飞机类型。我想显示驾驶不止一架飞机的飞行员的姓名。我正在使用 sql*plus PIL_PILOTNAME
我正在评估不同的移动框架,我认为 nativescript 是一个不错的选择。但我不知道开发过程是否存在限制。例如,我对样式有限制(这并不重要),但我想知道将来我是否可以有限制并且不能使用某些 nat
我正在尝试使用 grails 数据绑定(bind)将一些表单参数映射到我的模型中,但我认为在映射嵌入式集合方面可能存在一些限制。 例如,如果我提交一些这样的参数,那么映射工作正常: //this wo
是否可以将 django 自过滤器起的时间限制为 7 天。如果日期超过 7 天,则不应用过滤器 最佳答案 timesince 的源代码位于 django/django/utils/timesince.
我想在我的网站上嵌入一个 PayPal 捐赠按钮。但问题是我住在伊朗——这个国家受到制裁,人们不使用国际银行账户或主要信用卡。 有什么想法吗?请帮忙! 问候 沮丧 最佳答案 您可以在伊朗境内使用为伊朗
这是我的查询 select PhoneNumber as _data,PhoneType as _type from contact_phonenumbers where ContactID = 3
这个问题在这里已经有了答案: What is the maximum number of parameters passed to $in query in MongoDB? (4 个答案) 关闭
我的一个项目的 AndroidManifest.xml 变得越来越大(> 1000 行),因为我必须对某些文件类型使用react并且涵盖所有情况变得越来越复杂。我想知道 list 大小是否有任何限制。
在使用 Sybase、Infomix、DB2 等其他数据库产品多年后使用 MySQL 5.1 Enterprise 时;我遇到了 MySQL 不会做的事情。例如,它只能为 SELECT 查询生成 EX
这个问题在这里已经有了答案: What is the maximum number of parameters passed to $in query in MongoDB? (4 个回答) 关闭5年
通常我们是在{$apache}/conf/httpd.conf中设置Apache的参数,然而我们并没有发现可以设置日志文件大小的配置指令,通过参考http://httpd.apache.org/do
我正在搜索最大的 Android SharedPreferences 键值对,但找不到任何好的答案。其次,我想问一下,如果我有一个键,它的字符串值限制是多少。多少字符可以放入其中。如果我需要频繁更改值
我目前正在试验 SoundCloud API,并注意到我对/tracks 资源的 GET 请求一次从不返回超过 200 个结果。关于这个的几个问题: 这个限制是故意的吗? 有没有办法增加这个限制? 如
我正在与一家名为 Dwolla 的金融技术公司合作,该公司提供了一个 API,用于将银行信息附加到用户并收取/发送 ACH 付款。 他们需要我将我的 TLS 最低版本升级到 1.2(禁用 TLS 1.
我在 PHP 中有一个多维数组,如下所示: $array = Array ( [0] => Array ( [bill] => 1 ) [1] => Array ( [
我在获取下一个查询的第一行时遇到了问题: Select mar.Title MarketTitle, ololo.NUMBER, ololo.Title from Markets mar JOIN(
我是一名优秀的程序员,十分优秀!