- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我如何在分布式计算中计算大型向量(系列)的算术平均值,我在多个节点上对数据进行分区。我不想使用 map reduce 范例。除了对每个节点上的单个总和进行简单计算然后将结果带到主节点并除以向量(系列)的大小之外,是否有任何分布式算法可以有效地计算平均值。
最佳答案
分布式平均共识是一种替代方法。
master 的 map-reduce 简单方法的问题是,如果你有大量的数据,本质上是为了让一切都相互依赖,计算数据可能需要很长时间,通过哪个时间的信息非常过时,因此是错误的,除非您锁定整个数据集——这对于大量分布式数据集来说是不切实际的。使用分布式平均共识(相同的方法适用于均值的替代算法),您可以在不锁定数据的情况下实时获得更新、更好的均值当前值猜测。这是一篇关于它的论文的链接,但它的数学很重: http://web.stanford.edu/~boyd/papers/pdf/lms_consensus.pdf你可以用谷歌搜索很多关于它的论文。
一般概念是这样的:假设在每个节点上都有一个套接字监听器。您评估本地总和和平均值,然后将其发布到其他节点。每个节点监听其他节点,并在合理的时间范围内接收它们的总和和平均值。然后,您可以通过 (sumForAllNodes(storedAverage[node] * storedCount[node])/(sumForAllNodes(storedCount[node])) 评估总平均值的良好猜测。如果您有一个真正大的数据集,您可以只听新的存储在节点中的值,并修改本地计数和平均值,然后发布它们。
如果这花费的时间太长,您可以对每个节点中的随机数据子集进行平均。
这里有一些 c# 代码可以让您了解(使用 fleck 可以在比仅适用于 Windows 10 的 Microsoft WebSockets 实现更多版本的 Windows 上运行)。在两个节点上运行它,一个带有
<appSettings>
<add key="thisNodeName" value="UK" />
</appSettings>
在 app.config 中,在另一个中使用“EU-North”。这是一些示例代码。这两个实例交换意味着使用 websockets。您只需要添加数据库的后端枚举。
using Fleck;
namespace WebSocketServer
{
class Program
{
static List<IWebSocketConnection> _allSockets;
static Dictionary<string,decimal> _allMeans;
static Dictionary<string,decimal> _allCounts;
private static decimal _localMean;
private static decimal _localCount;
private static decimal _localAggregate_count;
private static decimal _localAggregate_average;
static void Main(string[] args)
{
_allSockets = new List<IWebSocketConnection>();
_allMeans = new Dictionary<string, decimal>();
_allCounts = new Dictionary<string, decimal>();
var serverAddresses = new Dictionary<string,string>();
//serverAddresses.Add("USA-WestCoast", "ws://127.0.0.1:58951");
//serverAddresses.Add("USA-EastCoast", "ws://127.0.0.1:58952");
serverAddresses.Add("UK", "ws://127.0.0.1:58953");
serverAddresses.Add("EU-North", "ws://127.0.0.1:58954");
//serverAddresses.Add("EU-South", "ws://127.0.0.1:58955");
foreach (var serverAddress in serverAddresses)
{
_allMeans.Add(serverAddress.Key, 0m);
_allCounts.Add(serverAddress.Key, 0m);
}
var thisNodeName = ConfigurationSettings.AppSettings["thisNodeName"]; //for example "UK"
var serverSocketAddress = serverAddresses.First(x=>x.Key==thisNodeName);
serverAddresses.Remove(thisNodeName);
var websocketServer = new Fleck.WebSocketServer(serverSocketAddress.Value);
websocketServer.Start(socket =>
{
socket.OnOpen = () =>
{
Console.WriteLine("Open!");
_allSockets.Add(socket);
};
socket.OnClose = () =>
{
Console.WriteLine("Close!");
_allSockets.Remove(socket);
};
socket.OnMessage = message =>
{
Console.WriteLine(message + " received");
var parameters = message.Split('~');
var remoteHost = parameters[0];
var remoteMean = decimal.Parse(parameters[1]);
var remoteCount = decimal.Parse(parameters[2]);
_allMeans[remoteHost] = remoteMean;
_allCounts[remoteHost] = remoteCount;
};
});
while (true)
{
//evaluate my local average and count
Random rand = new Random(DateTime.Now.Millisecond);
_localMean = 234.00m + (rand.Next(0, 100) - 50)/10.0m;
_localCount = 222m + rand.Next(0, 100);
//evaluate my local aggregate average using means and counts sent from all other nodes
//could publish aggregate averages to other nodes, if you wanted to monitor disagreement between nodes
var total_mean_times_count = 0m;
var total_count = 0m;
foreach (var server in serverAddresses)
{
total_mean_times_count += _allCounts[server.Key]*_allMeans[server.Key];
total_count += _allCounts[server.Key];
}
//add on local mean and count which were removed from the server list earlier, so won't be processed
total_mean_times_count += (_localMean * _localCount);
total_count = total_count + _localCount;
_localAggregate_average = (total_mean_times_count/total_count);
_localAggregate_count = total_count;
Console.WriteLine("local aggregate average = {0}", _localAggregate_average);
System.Threading.Thread.Sleep(10000);
foreach (var serverAddress in serverAddresses)
{
using (var wscli = new ClientWebSocket())
{
var tokSrc = new CancellationTokenSource();
using (var task = wscli.ConnectAsync(new Uri(serverAddress.Value), tokSrc.Token))
{
task.Wait();
}
using (var task = wscli.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(thisNodeName+"~"+_localMean + "~"+_localCount)),
WebSocketMessageType.Text,
false,
tokSrc.Token
))
{
task.Wait();
}
}
}
}
}
}
}
不要忘记通过在给定时间同步来添加静态锁或单独的事件。 (为简单起见未显示)
关于distributed-computing - 如何计算分布式数据的均值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42428424/
如果我有 distributed.Client我可以用它来关闭远程集群吗?即杀死所有 worker 并关闭调度程序? 如果使用 Client 无法做到这一点例如,除了手动杀死每个远程进程之外,还有其他
我对使用 Dask Distributed 作为任务执行器很感兴趣。 在 Celery 中,可以将任务分配给特定的 worker 。如何使用 Dask Distributed? 最佳答案 有2个选项:
我正在创建我的第一个应用程序,并且对 Ad Hoc 配置文件和开发配置文件有些困惑。我知道这个问题是在此之前提出的,但需要一些澄清和确认。 查看之前关于 stackoverflow 的答案,我认为存在
我正在尝试确定如何计算两个 torch.distribution.Distribution 对象的 KL 散度。到目前为止,我找不到执行此操作的功能。这是我尝试过的: import torch as
这听起来(比方说)幼稚,但我不知道我应该为移动 (iOS) 应用使用哪个证书。 显而易见的选项是 iOS 分发证书,但在 Apple 分发证书上,解释是: ...For use with Xcode
问题: 我的数字范围是 1 到 20,000。我想从范围内采样 8 个不同数字的均匀分布,1000 次。每个分布不应有重复的数字。此外,1000 个分布中的每个分布都必须是唯一的(在对所有获得的分布进
我对 dask 文档中的并发 future 要点有疑问:https://gist.github.com/mrocklin/ef9ccd29a6ec5f4de84d6192be95042a 当我们实例化
完成 DASK 代码后,我不断收到“distributed.utils_perf - 警告 - 完整垃圾收集最近占用了 19% CPU 时间”警告消息。我正在使用 DASK 进行大型地震数据计算。计算
场景:S3 存储桶有 1000 个文件。我有两台机器。每台机器都有两个驱动器/dev/sda 和/dev/sdb。限制:没有一个单独的驱动器可以容纳所有 1000 个文件。没有一台机器可以容纳所有 1
我已将一个项目导入 android studio 3.5,但在与 Gradle 文件同步时遇到此错误 指定的Gradle发行版'https://services.gradle.org/distribu
在 Android Studio 中创建项目时,我收到以下错误消息。 Failed to import new Gradle project: Could not install Gradle dis
在android studio 2.3.2 中运行项目时显示 Error:Could not run build action using Gradle distribution 'https://s
我正在将项目中的 gradle 版本从 1.7 升级到 4.2.1。我已将 Intellij 设置为导入 gradle 项目,但是当我单击“刷新所有 Gradle 项目”时,出现以下错误 Gradle
对于一个独特的商品销售数据库,如果我们使用顺序一致性,我们就可以保证,例如,这个独特的商品永远不会被重复卖给不同的人。因果一致性能保证我们做到这一点吗? 如果有一些销售同时开始/结束,系统会中断吗?
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 9年前关闭。 Improve this q
我正在使用J2ME为Nokia手机编写应用程序。 我想知道如何分发我的诺基亚应用程序。 最佳答案 分发有两个步骤 1.)使您的应用程序签名 您需要对您的应用进行签名,以便可以将其安装在诺基亚手机上。签
顺序一致性 The result of any execution is the same as if the operations of all the processors were execut
我知道三阶段提交是为了解决“两阶段提交”的问题,当在第二阶段协调器和群组同时失败时,不可能知道协调器是否决定了提交消息。 显然,三阶段提交旨在通过添加一个额外的阶段来解决这个问题。但是,如果协调器和队
我一直在研究 a project ,它是应用服务器和对象数据库的组合,目前仅在单台机器上运行。前段时间看了a paper它描述了一个分布式关系数据库,并获得了一些关于如何将该论文中的想法应用到我的项目
我想堆叠这种类型的数据集: PATIENT_ID AA BB CC DD EE 1 22 33 44 55 66 2 77 88 99 10 11 ..
我是一名优秀的程序员,十分优秀!