gpt4 book ai didi

java - ConcurrentModificationException - Memcached 设置并从多线程获取

转载 作者:行者123 更新时间:2023-12-04 13:52:16 26 4
gpt4 key购买 nike

当我使用 Memcached 从多个线程同时设置和获取值时,出现 java.util.ConcurrentModificationException。

异常堆栈跟踪:

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:894)
at java.util.HashMap$EntryIterator.next(HashMap.java:934)
at java.util.HashMap$EntryIterator.next(HashMap.java:932)
at java.util.HashMap.writeObject(HashMap.java:1098)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
at com.danga.MemCached.MemCachedClient.set(MemCachedClient.java:763)
at com.danga.MemCached.MemCachedClient.set(MemCachedClient.java:542)
at com.cacheengine.MemcachedThreadUtil.addToMemCache(MemcachedThreadUtil.java:129)
at com.cacheengine.MemcachedThreadUtil.memcacheDataProcessing(MemcachedThreadUtil.java:59)

示例程序:

内存缓存实用程序:

public class MemcachedThreadUtil {

/**
* synchronized method
*/

public synchronized void memcacheDataProcessing(String key, TermBean objUpdateTermBean) {
System.out.println(Thread.currentThread().getName() + " Start");

Object retrivedValue = retrieveFromMemCache(key);
HashMap<String, TermBean> hmData = (HashMap<String, TermBean>) retrivedValue;

hmData.put(key, objUpdateTermBean); // Data update

addToMemCache(key, hmData);

System.out.println(Thread.currentThread().getName() + " End");
}

private Object retrieveFromMemCache(String key) {
// MemCachedManager.getMcc().get(sKey);
}

private void addToMemCache(String key, HashMap<String, TermBean> hmData) {
// MemCachedManager.getMcc().set(sKey, obj);
}
}

可运行线程:

    public class SendNForgotMemcacheHelperRunnable implements Runnable {
private String key;
private TermBean objTermBean;
private MemcachedThreadUtil memcacheThreadUtil;

public SendNForgotMemcacheHelperRunnable(String key, TermBean data, MemcachedThreadUtil memcacheThreadUtil) {
this.key = key;
this.objTermBean = data;
this.memcacheThreadUtil = memcacheThreadUtil;
}

@Override
public void run() {

try {

this.memcacheThreadUtil.memcacheDataProcessing(this.key, this.objTermBean);

} catch (Exception iex) {
System.out.println("Exception in thread: " + iex.getMessage());
}
}
}

主要方法:

public static void main(String[] args) {
MemcachedThreadUtil memcachedThreadUtil = new MemcachedThreadUtil();

for (int threadCount = 0; threadCount < 10; threadCount++) {
String key = ..//Get key
TermBean objTermBean = ..// get value
SendNForgotMemcacheHelperRunnable runnable = new SendNForgotMemcacheHelperRunnable(key, objTermBean, memcachedThreadUtil);
Thread t = new Thread(runnable, "Thread"+threadCount);
t.start();
}
}

Memcached 版本 是 - 1.4.17 { https://danga.com/http://memcached.org )

似乎 Memcached 库内部存在一些问题,未处理并发问题,或者可能只是 HashMap 迭代器问题。

能否请您告知是多线程导致了此异常还是 HashMap 的一般问题。

注意:我认为同步方法会产生问题,所以我使用 ReentrantReadWriteLock 概念更改了并发线程实现,但遇到了同样的问题。

如何解决这个问题 - 请提出任何建议。

编辑:添加 MemCachedManager 代码

public class MemCachedManager {

protected static MemCachedClient mcc = null;

static {
init();
}

public static void init() {

String[] servers = new String[Integer.parseInt(LocaleUtil.loadPropertyFile("MEMCACHED_SERVERS_COUNT", "cacheconfig"))];
StringTokenizer stServers = new StringTokenizer(LocaleUtil.loadPropertyFile("MEMCACHED_SERVERS_COMMA_SEPERATED", "cacheconfig"), ",");
int iServersCnt = 0;
while (null != stServers && stServers.hasMoreTokens()) {
servers[iServersCnt] = stServers.nextToken();
iServersCnt++;
}

Integer[] weights = new Integer[Integer.parseInt(LocaleUtil.loadPropertyFile("MEMCACHED_SERVERS_COUNT", "cacheconfig"))];
StringTokenizer stWeightage = new StringTokenizer(LocaleUtil.loadPropertyFile("MEMCACHED_SERVERS_MEMORY_COMMA_SEPERATED", "cacheconfig"), ",");
int iServersWeightageCnt = 0;
while (null != stWeightage && stWeightage.hasMoreTokens()) {
weights[iServersWeightageCnt] = Integer.valueOf(stWeightage.nextToken());
iServersWeightageCnt++;
}

String sPoolName = LocaleUtil.loadPropertyFile("MEMCACHED_POOL_NAME", "cacheconfig");
SockIOPool pool = SockIOPool.getInstance(sPoolName);

pool.setServers(servers);
pool.setWeights(weights);

pool.setInitConn(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_INIT_CONN", "cacheconfig")));
pool.setMinConn(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_MIN_CONN", "cacheconfig")));
pool.setMaxConn(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_MAX_CONN", "cacheconfig")));
pool.setMaxIdle(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_MAX_IDLE", "cacheconfig"))); // 1000 * 60 * 60 * 6 == 6 hrs

pool.setMaintSleep(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_MAINTAIN_THREAD_SLEEP", "cacheconfig")));

pool.setNagle(Boolean.parseBoolean(LocaleUtil.loadPropertyFile("POOL_NAGLE_ENABLE_FLAG", "cacheconfig")));
pool.setSocketTO(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_SOCKET_READ_TIMEOUT", "cacheconfig")));
pool.setSocketConnectTO(Integer.parseInt(LocaleUtil.loadPropertyFile("POOL_SOCKET_CONNECT_TIMEOUT", "cacheconfig")));
pool.setFailover(Boolean.parseBoolean(LocaleUtil.loadPropertyFile("POOL_FAILOVER_ENABLE_FLAG", "cacheconfig")));
pool.setAliveCheck(Boolean.parseBoolean(LocaleUtil.loadPropertyFile("POOL_ALIVE_CHECK_ENABLE_FLAG", "cacheconfig")));

// initialize the connection pool
pool.initialize();

mcc = new MemCachedClient(sPoolName);
mcc.setCompressEnable(Boolean.parseBoolean(LocaleUtil.loadPropertyFile("MEMCACHED_COMPRESSION_ENABLE_FLAG", "cacheconfig")));
mcc.setCompressThreshold(Integer.parseInt(LocaleUtil.loadPropertyFile("MEMCACHED_COMPRESSION_THRESHOLD_SIZE", "cacheconfig"))); //64 * 1024 == 64 kb

int iLogLevel = com.danga.MemCached.Logger.LEVEL_WARN;
if (GenTools.isNumber(LocaleUtil.loadPropertyFile("MEMCACHED_LOG_LEVEL", "cacheconfig"))) {
iLogLevel = Integer.parseInt(LocaleUtil.loadPropertyFile("MEMCACHED_LOG_LEVEL", "cacheconfig"));
}
setLogLevel(iLogLevel);
}

public static MemCachedClient getMcc() {
return mcc;
}

public static void setMcc(MemCachedClient mcc) {
MemCachedManager.mcc = mcc;
}

public static void setLogLevel(int iLogLevel) {
com.danga.MemCached.Logger.getLogger(MemCachedClient.class.getName()).setLevel(iLogLevel);
}

public static long getNormalCounter(String sTabName) {
return getMcc().incr(sTabName);
}

public static String getPrefixCounter(String sTabName) {
return sTabName.concat(String.valueOf(getMcc().incr(sTabName)));
}

public static void main(String[] args) {
}

最佳答案

嗯,我不确定,但尝试使 memcacheDataProcessing 方法静态同步以获得类级别的锁。

关于java - ConcurrentModificationException - Memcached 设置并从多线程获取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37905851/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com