gpt4 book ai didi

java - 并发查询和删除,导致Neo4j 2.0.3社区内存泄漏

转载 作者:搜寻专家 更新时间:2023-11-01 03:23:08 26 4
gpt4 key购买 nike

这个程序模拟一个服务器,它根据用户输入并发查询节点并删除它们。每个用户请求(查询和删除)都在单独的线程上处理。

没有编译或运行时问题,但是,DiffSets 的内存泄漏会导致最终崩溃。 请建议修复/解决方法。

在程序执行期间监视堆:

enter image description here

enter image description here

源代码:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
private static final int CHILDREN = 10000;
private static final Logger LOG = Logger.getLogger(DeleteLab.class);

public static void main(String[] args) throws Exception {
FileUtils.deleteRecursively(new File("graphdb"));
final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
.setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
.newGraphDatabase();
registerShutdownHook(graphDb);
LOG.info(">>>> STARTED GRAPHDB");
createIndex("Parent", "name", graphDb);
createIndex("Child", "name", graphDb);
final Node parent;

try (Transaction tx = graphDb.beginTx()) {
parent = graphDb.createNode(DynamicLabel.label("Parent"));
parent.setProperty("name", "parent");
tx.success();
}

try (Transaction tx = graphDb.beginTx()) {
for (int i = 0; i < CHILDREN; i++) {
Node child = graphDb.createNode(DynamicLabel.label("Child"));
child.setProperty("name", "child" + i);
child.setProperty("count", i);
parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
}
tx.success();
}
LOG.info(">>>> CREATED NODES");
final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
ExecutorService es = Executors.newFixedThreadPool(50);
final CountDownLatch cdl = new CountDownLatch(CHILDREN);

for (int i = 0; i < CHILDREN; i++) {
final int count = i;
es.execute(new Runnable() {
@Override
public void run() {
try (Transaction tx = graphDb.beginTx()) {
tx.acquireWriteLock(parent);
Map<String, Object> params = new HashMap<String, Object>();
params.put("cCount", count);
ExecutionResult result = engine.execute(
"match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
for (Map<String, Object> row : result) {
String cName = (String) row.get("m.name");
Node child = findNode("Child", "name", cName, graphDb);
Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
r.delete();
child.delete();
}
tx.success();
} finally {
cdl.countDown();
}
}
});
}
cdl.await();
LOG.info(">>>> DELETED NODES");
es.shutdown();
}

private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
IndexDefinition indexDefinition;
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
tx.success();
}
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
tx.success();
}
}

private static void registerShutdownHook(final GraphDatabaseService graphDb) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
graphDb.shutdown();
}
});
}

private enum RelationshipTypes implements RelationshipType {
PARENT_CHILD
}

public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
if (propertyValue != null) {
Label label = DynamicLabel.label(labelName);
ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
if (ri != null) {
try {
ResourceIterator<Node> iter = ri.iterator();
try {
if (iter != null && iter.hasNext()) {
return iter.next();
}
} finally {
iter.close();
}
} catch (Exception e) {
LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
}
}
}
return null;
}
}

内存泄漏的一个合理解决方案是不在执行 cypher 的同一事务中执行写锁定。只需捕获 NotFoundException 并重试查询似乎就可以正常工作。

具有合理修复的源代码:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
private static final int CHILDREN = 10000;
private static final Logger LOG = Logger.getLogger(DeleteLab.class);

public static void main(String[] args) throws Exception {
FileUtils.deleteRecursively(new File("graphdb"));
final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
.setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
.newGraphDatabase();
registerShutdownHook(graphDb);
LOG.info(">>>> STARTED GRAPHDB");
createIndex("Parent", "name", graphDb);
createIndex("Child", "name", graphDb);
final Node parent;

try (Transaction tx = graphDb.beginTx()) {
parent = graphDb.createNode(DynamicLabel.label("Parent"));
parent.setProperty("name", "parent");
tx.success();
}

try (Transaction tx = graphDb.beginTx()) {
for (int i = 0; i < CHILDREN; i++) {
Node child = graphDb.createNode(DynamicLabel.label("Child"));
child.setProperty("name", "child" + i);
child.setProperty("count", i);
parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
}
tx.success();
}
LOG.info(">>>> CREATED NODES");
final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
ExecutorService es = Executors.newFixedThreadPool(50);
final CountDownLatch cdl = new CountDownLatch(CHILDREN);

for (int i = 0; i < CHILDREN; i++) {
final int count = i;
es.execute(new Runnable() {
@Override
public void run() {
String cName = null;
boolean success = false;
try (Transaction tx = graphDb.beginTx()) {
while (!success) {
try {
Map<String, Object> params = new HashMap<String, Object>();
params.put("cCount", count);
ExecutionResult result = engine.execute(
"match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
for (Map<String, Object> row : result) {
cName = (String) row.get("m.name");
break;
}
success = true;
} catch (org.neo4j.graphdb.NotFoundException e) {
LOG.info(">>>> RETRY QUERY ON NotFoundException: " + count);
try {
Thread.sleep((long) Math.random() * 100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

try (Transaction tx = graphDb.beginTx()) {
if (cName != null) {
tx.acquireWriteLock(parent);
Node child = findNode("Child", "name", cName, graphDb);
Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
r.delete();
child.delete();
LOG.info(">>>> DELETING NODES: " + cName);
}
tx.success();
} finally {
cdl.countDown();
}
}
});
}
cdl.await();
LOG.info(">>>> DELETED NODES");
es.shutdown();
}

private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
IndexDefinition indexDefinition;
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
tx.success();
}
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
tx.success();
}
}

private static void registerShutdownHook(final GraphDatabaseService graphDb) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
graphDb.shutdown();
}
});
}

private enum RelationshipTypes implements RelationshipType {
PARENT_CHILD
}

public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
if (propertyValue != null) {
Label label = DynamicLabel.label(labelName);
ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
if (ri != null) {
try {
ResourceIterator<Node> iter = ri.iterator();
try {
if (iter != null && iter.hasNext()) {
return iter.next();
}
} finally {
iter.close();
}
} catch (Exception e) {
LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
}
}
}
return null;
}
}

最佳答案

到目前为止我知道的唯一解决方案是:不要在执行密码的同一事务中执行写锁定。只是捕获 NotFoundException 并重试查询似乎工作正常。这种情况下不会发生内存泄漏。

固定代码如下:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
private static final int CHILDREN = 10000;
private static final Logger LOG = Logger.getLogger(DeleteLab.class);

public static void main(String[] args) throws Exception {
FileUtils.deleteRecursively(new File("graphdb"));
final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
.setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
.newGraphDatabase();
registerShutdownHook(graphDb);
LOG.info(">>>> STARTED GRAPHDB");
createIndex("Parent", "name", graphDb);
createIndex("Child", "name", graphDb);
final Node parent;

try (Transaction tx = graphDb.beginTx()) {
parent = graphDb.createNode(DynamicLabel.label("Parent"));
parent.setProperty("name", "parent");
tx.success();
}

try (Transaction tx = graphDb.beginTx()) {
for (int i = 0; i < CHILDREN; i++) {
Node child = graphDb.createNode(DynamicLabel.label("Child"));
child.setProperty("name", "child" + i);
child.setProperty("count", i);
parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
}
tx.success();
}
LOG.info(">>>> CREATED NODES");
final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
ExecutorService es = Executors.newFixedThreadPool(50);
final CountDownLatch cdl = new CountDownLatch(CHILDREN);

for (int i = 0; i < CHILDREN; i++) {
final int count = i;
es.execute(new Runnable() {
@Override
public void run() {
String cName = null;
boolean success = false;
try (Transaction tx = graphDb.beginTx()) {
while (!success) {
try {
Map<String, Object> params = new HashMap<String, Object>();
params.put("cCount", count);
ExecutionResult result = engine.execute(
"match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
for (Map<String, Object> row : result) {
cName = (String) row.get("m.name");
break;
}
success = true;
} catch (org.neo4j.graphdb.NotFoundException e) {
LOG.info(">>>> RETRY QUERY ON NotFoundException: " + count);
try {
Thread.sleep((long) Math.random() * 100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

try (Transaction tx = graphDb.beginTx()) {
if (cName != null) {
tx.acquireWriteLock(parent);
Node child = findNode("Child", "name", cName, graphDb);
Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
r.delete();
child.delete();
LOG.info(">>>> DELETING NODES: " + cName);
}
tx.success();
} finally {
cdl.countDown();
}
}
});
}
cdl.await();
LOG.info(">>>> DELETED NODES");
es.shutdown();
}

private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
IndexDefinition indexDefinition;
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
tx.success();
}
try (Transaction tx = graphDb.beginTx()) {
Schema schema = graphDb.schema();
schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
tx.success();
}
}

private static void registerShutdownHook(final GraphDatabaseService graphDb) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
graphDb.shutdown();
}
});
}

private enum RelationshipTypes implements RelationshipType {
PARENT_CHILD
}

public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
if (propertyValue != null) {
Label label = DynamicLabel.label(labelName);
ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
if (ri != null) {
try {
ResourceIterator<Node> iter = ri.iterator();
try {
if (iter != null && iter.hasNext()) {
return iter.next();
}
} finally {
iter.close();
}
} catch (Exception e) {
LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
}
}
}
return null;
}
}

关于java - 并发查询和删除,导致Neo4j 2.0.3社区内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23707321/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com