gpt4 book ai didi

memcached - 如何在netty中异步访问memcached

转载 作者:行者123 更新时间:2023-12-04 15:34:58 25 4
gpt4 key购买 nike

我正在用 netty 编写一个服务器,我需要在其中调用 memcached。我正在使用 spymemcached 并且可以轻松地进行同步 memcached 调用。我希望这个 memcached 调用是异步的。那可能吗? netty 提供的示例似乎没有帮助。

我尝试使用回调:创建了一个 ExecutorService我的处理程序中的池并将回调工作程序提交到此池。像这样:

public class MyHandler extends ChannelInboundMessageHandlerAdapter<MyPOJO> implements CallbackInterface{

   ...
private static ExecutorService pool = Executors.newFixedThreadPool(20);


@Override
public void messageReceived(ChannelHandlerContext ctx, MyPOJO pojo) {
...
CallingbackWorker worker = new CallingbackWorker(key, this);
pool.submit(worker);
...
}
public void myCallback() {
//get response
this.ctx.nextOutboundMessageBuf().add(response);
}

}


CallingbackWorker好像:

public class CallingbackWorker implements Callable {

  public CallingbackWorker(String key, CallbackInterface c) {
this.c = c;
this.key = key;
}
public Object call() {
//get value from key
c.myCallback(value);
}


但是,当我这样做时, this.ctx.nextOutboundMessageBuf()myCallback卡住。

所以,总的来说,我的问题是:如何在 Netty 中进行异步 memcached 调用?

最佳答案

这里有两个问题:一个与您尝试编码的方式有关的小问题,以及一个具有许多提供异步服务调用的库的大问题,但没有在异步框架中充分利用它们的好方法,例如内蒂。这会迫使用户进行像这样的次优黑客攻击,或者是一种不太糟糕但仍然不是我稍后会介绍的理想方法。

首先是编码问题。问题是您试图从与处理程序关联的线程以外的线程调用 ChannelHandlerContext 方法,这是不允许的。这很容易修复,如下所示。您可以通过其他几种方式对其进行编码,但这可能是最直接的方式:

private static ExecutorService pool = Executors.newFixedThreadPool(20);

public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//...

final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());

// first wait for the response on a pool thread
pool.execute(new Runnable() {
public void run() {
String value;
Exception err;
try {
value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want
err = null;
} catch (Exception e) {
err = e;
value = null;
}
// put results into final variables; compiler won't let us do it directly above
final fValue = value;
final fErr = err;

// now process the result on the ChannelHandler's thread
ctx.executor().execute(new Runnable() {
public void run() {
handleResult(fValue, fErr);
}
});
}
});
// note that we drop through to here right after calling pool.execute() and
// return, freeing up the handler thread while we wait on the pool thread.
}

private void handleResult(String value, Exception err) {
// handle it
}

这将起作用,并且可能足以满足您的应用程序。但是你有一个固定大小的线程池,所以如果你要处理超过 20 个并发连接,那将成为一个瓶颈。您可以增加池大小,或使用无限制的池大小,但此时,您还不如在 Tomcat 下运行,因为内存消耗和上下文切换开销开始成为问题,并且您失去了吸引人的可扩展性首先是Netty!

问题是,Spymemcached 是基于 NIO 的、事件驱动的,并且仅使用一个线程来完成其所有工作,但无法充分利用其事件驱动的特性。我希望他们很快就会解决这个问题,就像 Netty 4 和 Cassandra 最近通过在 Future 对象上提供回调(监听器)方法一样。

同时,与您在同一条船上,我研究了替代方案,并且对我的发现不太满意,我写了(昨天) a Future tracker class它可以以可配置的速率轮询多达数千个 Futures,并在它们完成时在您选择的线程(Executor)上回调您。它只使用一个线程来做到这一点。我已经 put it up on GitHub如果您想尝试一下,但请注意它仍然是湿的,正如他们所说。我在过去的一天测试了很多,即使有 10000 个并发模拟 Future 对象,每毫秒轮询一次,它的 CPU 使用率可以忽略不计,虽然它开始超过 10000。使用它,上面的例子看起来像这样:
// in some globally-accessible class:

public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS);

// in a handler class:

public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
// ...

final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());

// add a listener for the Future, with a timeout in 2 seconds, and pass
// the Executor for the current context so the callback will run
// on the same thread.
Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(),
new ForeignFutureListener<String,GetFuture<String>>() {

public void operationSuccess(String value) {
// do something ...
ctx.fireChannelRead(someval);
}
public void operationTimeout(GetFuture<String> f) {
// do something ...
}
public void operationFailure(Exception e) {
// do something ...
}
});
}

在任何时候,您都不希望有超过一两个 FFT 实例处于事件状态,否则它们可能会消耗 CPU。但是单个实例可以处理数千个未完成的 Futures;拥有第二个调用的唯一原因是以较慢的轮询速率(例如 10-20 毫秒)处理更高延迟的调用,例如 S3。

轮询方法的一个缺点是它增加了少量延迟。例如,每毫秒轮询一次,平均会增加 500 微秒的响应时间。对于大多数应用程序来说,这不会成为问题,而且我认为与线程池方法相比,内存和 CPU 节省不仅仅是抵消了这一点。

我预计在一年左右的时间内这将不是问题,因为更多的异步客户端提供回调机制,让您充分利用 NIO 和事件驱动模型。

关于memcached - 如何在netty中异步访问memcached,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17661382/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com