- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
LongAdder通过创建多个副本对象,解决了多线程使用CAS更新同一个对象造成的CPU阻塞,加快了对线程处理的速度。当多个线程同一时刻更新一个AtomicLong类型的变量时,只有一个线程能够更新成功,其他线程则更新失败,继续尝试更新。
当使用LongAdder类型的变量时,由于副本数组的存在,线程不一定直接更新变量的本身而是更新副本数组,这样多线程请求的对象变多了,从而减少了更新时间,当需要使用变量值时,返回的值是基础变量的值加上数组内每一个副本的值的和。
LongAdder继承自Striped64并实现了Serializable接口,而在Striped64类中有一个Cell类
首先从LongAdder类的add方法入手
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
从上面的代码可以看到LongAdder的实现主要依靠的是cells数组,如果cells数组为空的话,则尝试使用cas更新基础变量base,如果成功了,则add成功,方法结束,如果cas更新base失败了,则证明此时有其他线程参与base变量的更新,此后的处理与cells不为空一致(如果cells不为空,则在此次方法执行前就已经有多线程参与了更新)。
当cells数组不为空或者更新base变量失败后,则转而更新cells数组中的副本,此时先判断cells数组是否为空或长度为0,如果为空或长度为0则说明这是第一次操作cells数组,应先初始化cells数组,因此调用方法longAccumulate(x, null, true);
如果cells数组不为空,则尝试直接访问数组中的副本,getProbe方法代码:
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
从上面代码可以看到getProbe方法获取了当前线程内的PROBE变量,而PROBE定义在Striped64类中
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
从上面的代码可以看出PROBE来自于Thread类的threadLocalRandomProbe变量,是一个线程级变量。getProbe() & m
则是获取当前要更新的cell,如果cell为空的话则调用longAccumulate(x, null, true);
方法设置cell的值,如果cell不为空的话则使用cas直接更新cell的值,并将更新结果保存在uncontended
中,如果uncontended
的值为false(即cas更新失败了,此时应该有多个线程同时访问了一个cell),那么继续调用longAccumulate(x, null, false);
方法。
从上面的add方法可以看到,getProbe()
取得了Thread类中的threadLocalRandomProbe变量,而threadLocalRandomProbe变量的初始值为0,因为getProbe()
方法参与了多线程访问哪一个cell的定位,因此getProbe()的值不可能为0,那么threadLocalRandomProbe变量是在哪里赋值的呢?
在add方法中观察到,没当方法进行不下去时(base变量更新失败,cells为空,cell更新失败),都会调用longAccumulate
方法,因此longAccumulate
一定是涉及了cells数组的初始化和扩容,观察longAccumulate
方法代码:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 此处进行数组的扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 此处进行数组的初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
longAccumulate代码的设计非常复杂,刚进入方法的代码进行了threadLocalRandomProbe变量的初始化
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
如果getProbe()
取得的值为0,说明threadLocalRandomProbe变量并未被初始化过,此时调用ThreadLocalRandom.current();
方法进行初始化,并且将参数wasUncontended设置为true。ThreadLocalRandom.current()
方法代码:
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
此时发现,threadLocalRandomProbe变量的初始化实在ThreadLocalRandom类中进行的,使用ThreadLocalRandom类的好处是为每一个线程维护一个随机数种子,不涉及多线程竞争种子的问题。而在longAccumulate
方法中初始化threadLocalRandomProbe变量是一种延迟初始化的操作,如果cells为空,即使threadLocalRandomProbe变量有值也是没有意义的。
从上面add
方法调用longAccumulate
处可以发现,cells数组为null或当前线程要更新的cell为null时wasUncontended的值为true,如果更新cell失败,则cell的值为false,那么wasUncontended的值一定是cells数组进行扩容的依据。
因为是多个线程同时操作cells数组,那么对数组的初始化和扩容一定只能由一个线程来完成,因此定义了cellsBusy变量,当检测到cellsBusy的值为0并使用casCellsBusy()
方法成功将其设置为1后才可以进行初始化和扩容操作。数组的初始化将cells长度设置为2,并且初始化将要访问的cell,另一个cell则保持默认值null,完成后将cellsBusy的值重新设置为0,方便其他线程之后进行扩容(此处设置并不是cas操作,因为当前初始化代码只有一个线程能执行到)。
而扩容要检查当前cells数组的长度小于cpu的个数时才可以进行(当数组当都等于cpu个数时效率才最高),扩容操作完成后调用advanceProbe()
方法从新计算threadLocalRandomProbe变量的值,以减少访问celll冲突的个数。
另外,在定义Cell类时使用了@sun.misc.Contended
注解,这样保证了一个Cell类对象占满一个缓存行,从而避免了伪共享问题,提升了性能。
我正在尝试读取一个大型日志文件,该文件已使用不同的分隔符(遗留更改)进行了解析。 此代码有效 import os, subprocess, time, re import pandas as pd f
我试图理解在 Linux 下以 Turbo 模式(特别是 fpc -Mtp -vw)编译的 Free Pascal 中看到的有点神奇的行为。代码来自 Jack Crenshaw 的“让我们构建一个编译
我有一个具有以下结构的 txt 文件: NAME DATA1 DATA2 a 10 1,2,3 b 6 8,9 c 2
我试图理解在 Linux 下以 Turbo 模式(特别是 fpc -Mtp -vw)编译的 Free Pascal 中看到的有点神奇的行为。代码来自 Jack Crenshaw 的“让我们构建一个编译
public class Bug1 { private String s; public void Bug1(){ s = "hello"; } public Stri
我们有这样一种情况,我们的应用程序需要处理一系列文件,而不是同步执行此功能,我们希望采用多线程将工作负载分配给不同的线程。 每一项工作是: 1.以只读方式打开文件 2.处理文件中的数据 3.将处理后的
我正在尝试读取 .php 文件并替换十六进制字符。php文件格式如下: 问题是它弄乱了转义字符 (\") 到目前为止我的代码: while(i=48 && str[i+2]=97 && str[i+
我正在用 C# 开发一个程序,我需要一些帮助。我正在尝试创建一个数组或项目列表,显示在某个网站上。我想要做的是阅读 anchor 文本,它是 href。例如,这是 HTML:
我有一个偏好设置,它控制我的应用程序是否在用户单击按钮时播放声音(这种情况经常发生,想想计算器)。每次用户单击按钮时,都会调用以下方法: private void playButtonClickSou
我正在尝试在我的标签末尾创建一个阅读更多按钮。我希望它默认显示 3 行。我正在用 swift 而不是 objective c 编写代码。只有当用户点击标签的阅读更多部分时,标签才会展开。它的外观和工作
当您获得第三方库(c、c++)、开源(LGPL 说)但没有很好的文档时,了解它以便能够集成到您的应用程序中的最佳方法是什么? 该库通常有一些示例程序,我最终使用 gdb 浏览了代码。还有其他建议/最佳
同时从 2 个或更多不同线程对同一个文件描述符使用 pread 是否有问题? 最佳答案 pread 本身是线程安全的,因为它不在 list of unsafe functions 上.所以调用它是安全
当您使用命令 pd.read_csv 读取 csv 时,如何跳过连续包含特定值的行?如果在第 50、55 行,第一列的值为 100,那么我想在读取 csv 文件时跳过这些行。我如何将这些命令放入像 p
我迫切需要在 C# 中使用 T4 生成 HTML 输出。 我正在使用 Runtime-T4-Files 并选择“TextTemplatingFilePreprocessor”而不是“TextTempl
今年夏天我在实习期间一直在学习 ERP 应用程序。由于我是一名即将毕业的程序员,我希望有一个可靠的软件分支可以帮助我完成工作,直到我确定下一步该做什么(直到我对大局有一个很好的了解)。到现在为止,我刚
将包含列(例如“a”、“b”)的数据帧保存为 parquet,然后在稍后的时间点读取 parquet 不会提供相同的列顺序(可能是“b”、“a”fe)文件保存为。 不幸的是,我无法弄清楚订单是如何受到
我正在开发一个使用谷歌表格作为数据库的应用程序,但我不知道如何让 Swift 从谷歌表格中读取。我浏览了 API 网站和一些问题,但刚开始我需要一些帮助。到目前为止,我有; 私有(private)让范
我打算阅读swing concept,如果值得一读,请推荐一些学习 Material 最佳答案 自 AWT 崩溃以来,Java 的 GUI 工具包太多了。即使是 Swing 也被评论家严重低估,但他们
我已经使用 J 几个月了,我发现阅读不熟悉的代码(例如,不是我自己写的)是该语言最具挑战性的方面之一,尤其是在默认情况下。过了一会儿,我想出了这个策略: 1)将代码段复制到word文档中 2)从(1)
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
我是一名优秀的程序员,十分优秀!