gpt4 book ai didi

java - Akka 分布式 pub sub : Java implementation not working

转载 作者:搜寻专家 更新时间:2023-11-01 03:49:52 25 4
gpt4 key购买 nike

订阅者的主类:Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.SubscriberActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {

public static void main(String[] args) throws InterruptedException {

ApplicationContext ctx = SpringApplication.run(Application.class, args);
// get hold of the actor system
ActorSystem system = ctx.getBean(ActorSystem.class);
ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
ActorRef subscriber = system.actorOf(
Props.create(SubscriberActor.class), "subscriber");
// subscribe to the topic named "content"
mediator.tell(new DistributedPubSubMediator.Put(subscriber), subscriber);
// subscriber.tell("init", null);
System.out.println("Running.");
Thread.sleep(5000l);
}
}

订阅者 Actor :SubscriberActor.java

package com.mynamespace.actors;

import java.util.ArrayList;
import java.util.List;

import akka.actor.UntypedActor;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class SubscriberActor extends UntypedActor {

@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CategoryServiceRequest) {
System.out.println("Request received for GetCategories.");
CategoryServiceResponse response = new CategoryServiceResponse();
List<String> categories = new ArrayList<>();
categories.add("Food");
categories.add("Fruits");
response.setCatgories(categories);
getSender().tell(response, getSelf());
} else if (msg instanceof String && msg.equals("init")) {
System.out.println("init called");
} else {
System.out
.println("Unhandelled message received for getCategories.");
}
}

}

订阅者的Application.conf

akka {
loglevel = INFO
stdout-loglevel = INFO
loggers = ["akka.event.slf4j.Slf4jLogger"]
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}

remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}

cluster {
seed-nodes = [
"akka.tcp://mynamespace-actor-system@127.0.0.1:2551",
"akka.tcp://mynamespace-actor-system@127.0.0.1:2552"]

auto-down-unreachable-after = 10s
}

}

发布者的主类:Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.PublisherActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {

public static void main(String[] args) throws InterruptedException {

ApplicationContext ctx = SpringApplication.run(Application.class, args);
// get hold of the actor system
ActorSystem system = ctx.getBean(ActorSystem.class);
ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),
"publisher");
mediator.tell(new DistributedPubSubMediator.Put(publisher), publisher);
Thread.sleep(5000);
publisher.tell("hi", publisher);
System.out.println("Running.");
}
}

PublisherActor.java

package com.mynamespace.actors;

import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class PublisherActor extends UntypedActor {

// activate the extension
ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
.mediator();

public void onReceive(Object msg) {
if (msg instanceof String) {
Timeout timeOut = new Timeout(50000l);
mediator.tell(new DistributedPubSubMediator.Send(
"/user/subscriber", new CategoryServiceRequest()),
getSelf());
Future<Object> response = Patterns.ask(mediator,
new DistributedPubSubMediator.Send("/user/subscriber",
new CategoryServiceRequest()), timeOut);
Future<CategoryServiceResponse> finalresponse = response.map(
new Mapper<Object, CategoryServiceResponse>() {

@Override
public CategoryServiceResponse apply(Object parameter) {
CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter;
System.out.println("received:: list of size:: "
+ responseFromRemote.getCatgories().size());
return responseFromRemote;
}

}, getContext().system().dispatcher());
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
System.out.println("subscribbed.......");

} else {
unhandled(msg);
}
}
}

发布者的应用程序配置与订阅者的相同。两者都在同一系统的不同端口上运行。

我在本地系统上定义并运行了两个种子节点。不知何故,我无法通过 DistributedPubSub 调解器向生产者(均在不同节点上运行)询问/告知订阅者。

在运行 Subscriber 然后是 publisher 之后:我没有在 stdout/logs 中打印任何异常或任何死信引用。

是否可以查看我的调解员持有哪些 Actor 引用资料?

需要帮助来发现问题或可能的问题。

最佳答案

我遇到了同样的问题,在@spam 的评论和我自己的实验之后,我可以推荐的是对组使用发布/订阅和 sendOneMessageToEachGroup=true

是否假设 Send 仅在本地有效?如果是这样,文档没有明确说明。但我也可以通过那里的代码看出文档的这个特定部分显然被忽略了(如更改类名但不调用那些,调用前面示例中的前面的)

希望这对遇到此问题的任何人有所帮助,因为文档显然有点误导

关于java - Akka 分布式 pub sub : Java implementation not working,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31536080/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com