- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
统计累加器允许执行增量计算。例如,为了计算任意时间给定的数字流的算术平均值,可以创建一个对象来跟踪给定项目的当前数量 n
及其总和 sum
。当请求均值时,该对象仅返回 sum/n
。
像这样的累加器允许您在某种意义上进行增量计算,即当给定一个新数字时,您不需要重新计算整个总和和计数。
可以为其他统计信息编写类似的累加器(对于 C++ 实现,请参见 boost library)。
您将如何在 Python 中实现累加器? The code I came up with是:
class Accumulator(object):
"""
Used to accumulate the arithmetic mean of a stream of
numbers. This implementation does not allow to remove items
already accumulated, but it could easily be modified to do
so. also, other statistics could be accumulated.
"""
def __init__(self):
# upon initialization, the numnber of items currently
# accumulated (_n) and the total sum of the items acumulated
# (_sum) are set to zero because nothing has been accumulated
# yet.
self._n = 0
self._sum = 0.0
def add(self, item):
# the 'add' is used to add an item to this accumulator
try:
# try to convert the item to a float. If you are
# successful, add the float to the current sum and
# increase the number of accumulated items
self._sum += float(item)
self._n += 1
except ValueError:
# if you fail to convert the item to a float, simply
# ignore the exception (pass on it and do nothing)
pass
@property
def mean(self):
# the property 'mean' returns the current mean accumulated in
# the object
if self._n > 0:
# if you have more than zero items accumulated, then return
# their artithmetic average
return self._sum / self._n
else:
# if you have no items accumulated, return None (you could
# also raise an exception)
return None
# using the object:
# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated
# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0
# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5
# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')
有趣的设计问题出现了:
最佳答案
对于通用的、线程安全的高级函数,您可以将类似以下内容与 Queue.Queue
类和其他一些位结合使用:
from Queue import Empty
def Accumulator(f, q, storage):
"""Yields successive values of `f` over the accumulation of `q`.
`f` should take a single iterable as its parameter.
`q` is a Queue.Queue or derivative.
`storage` is a persistent sequence that provides an `append` method.
`collections.deque` may be particularly useful, but a `list` is quite acceptable.
>>> from Queue import Queue
>>> from collections import deque
>>> from threading import Thread
>>> def mean(it):
... vals = tuple(it)
... return sum(it) / len(it)
>>> value_queue = Queue()
>>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(LastThreeAverage)
[0, 1, 2, 4, 6, 8]
"""
try:
while True:
storage.append(q.get(timeout=0.1))
q.task_done()
yield f(storage)
except Empty:
pass
这个生成器函数通过将它委托(delegate)给其他实体来逃避它声称的大部分责任:
Queue.Queue
以线程安全的方式提供其源元素collections.deque
对象可以作为storage
参数的值传入;除其他外,这提供了一种仅使用最后一个 n
(在本例中为 3)个值mean
)作为参数传递。在某些情况下,这会导致代码效率不高,但很容易应用于各种情况。请注意,如果您的生产者线程每个值花费的时间超过 0.1 秒,则累加器可能会超时。这很容易通过传递更长的超时或通过完全删除超时参数来补救。在后一种情况下,该函数将无限期地阻塞在队列的末尾;这种用法在子线程(通常是 daemon
线程)中使用时更有意义。当然,您也可以将传递给 q.get
的参数参数化为 Accumulator
的第四个参数。
如果你想从生产者线程(这里是 putting_thread
)传递队列的末端,即没有更多的值要来,你可以传递并检查一个哨兵值或使用一些其他方法。 this thread 中有更多信息;我选择编写一个名为 CloseableQueue 的 Queue.Queue 子类提供了一个 close
方法。
还有许多其他方法可以自定义此类函数的行为,例如通过限制队列大小;这只是一个用法示例。
如上所述,由于需要重新计算,这会降低一些效率,而且我认为,这并不能真正回答您的问题。
生成器函数也可以通过其 send
方法接受值。所以你可以写一个均值生成器函数,比如
def meangen():
"""Yields the accumulated mean of sent values.
>>> g = meangen()
>>> g.send(None) # Initialize the generator
>>> g.send(4)
4.0
>>> g.send(10)
7.0
>>> g.send(-2)
4.0
"""
sum = yield(None)
count = 1
while True:
sum += yield(sum / float(count))
count += 1
这里的 yield 表达式将值——send
的参数——带入函数,同时将计算出的值作为 send
的返回值传递出去。
您可以将调用该函数返回的生成器传递给更可优化的累加器生成器函数,如下所示:
def EfficientAccumulator(g, q):
"""Similar to Accumulator but sends values to a generator `g`.
>>> from Queue import Queue
>>> from threading import Thread
>>> value_queue = Queue()
>>> g = meangen()
>>> g.send(None)
>>> mean_accumulator = EfficientAccumulator(g, value_queue)
>>> def add_to_queue(it, queue):
... for value in it:
... value_queue.put(value)
>>> putting_thread = Thread(target=add_to_queue,
... args=(range(0, 12, 2), value_queue))
>>> putting_thread.start()
>>> list(mean_accumulator)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
"""
try:
while True:
yield(g.send(q.get(timeout=0.1)))
q.task_done()
except Empty:
pass
关于python - Python 中的统计累加器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3774315/
我想用 c 编写基本计算器:我有累加器的问题(带有“+”和“-”运算符) int main(void) { float num1,num2,res; char operator;
我已经解决了 4clojure.com 上的 45 个问题,并且在尝试使用递归和累加器解决一些问题的方式中,我注意到一个反复出现的问题。 我会尽我所能解释我正在做的事情,以最终得到模糊的解决方案,希望
my_Stream 是我想要累积并分配给变量以供进一步处理的数据。我的问题:一旦流完成,如何将变量 the_string 的内容获取到 console.log? my_Stream.onValue(f
我很好奇,从这个代码片段中得到的平均值是多少?累加器旨在为空。 boost::accumulators::accumulator_set > Accumulator; int Mean = boost
在累积 struct timespec 增量的程序中,我正在执行以下逻辑: struct timespec accu, start, stop; for (...) { // record s
我正在尝试在数组上使用 foldLeft。例如: var x = some array x.foldLeft(new Array[Int](10))((a, c) => a(c) = a(c)+1)
由于没有找到在 C++ 中重置累加器的“boost ”方法,我遇到了一段似乎可以重置 boost 累加器的代码。但是不明白它是如何实现的。代码如下- #include #include #incl
这个问题在这里已经有了答案: Does a sequential stream in Java 8 use the combiner parameter on calling collect? (1
我正在实现一个需要递归调用才能获取所有数据的 API。我已经实现了一个具有 recursive transformer 的 Bloc 组件。但是,转换器似乎一直在递归调用中返回空累加器。 commen
我永远找不到 F# 核心库的源代码。我知道它应该是开放的,但谷歌在帮助我找到它时对我并不友好,如果是这样,我会查找 Seq.fold 的实现 - 但问题就在这里。 有没有人看到以下代码段有任何问题:
最近我学习了很多 Haskell,并想尝试一些它在 Python 中的巧妙技巧。据我了解,Python的reduce会自动将函数中的迭代变量和累加器设置为reduce中给出的列表的前两个值。在 Has
documentation boost 累加器的 error_of 特性说明它通过以下公式计算平均值的误差: 平方(方差/(计数 - 1)), 其中方差的计算方式是: variance = 1/cou
我正在使用 LongAccumulator 来计算我在 Cassandra 中保存的记录数。 object Main extends App { val conf = args(0) val
Spark 有一个有用的 API,用于以线程安全的方式积累数据 https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.
我想从任意长度的列表中选择任意数量的项目。下拉列表 (QComboBox) 不允许选中项目。如果有很多项目,可检查项目的列表会变得笨拙。 我找到了 this question在用户体验 SE 子站点和
是否可以在分组时通过集合收集字符串?这就是它在 Java 8 中的工作方式: Map discountOptions = p.getDiscountOptions().Stream() .
我是一名优秀的程序员,十分优秀!