gpt4 book ai didi

java - 跨 JVM 分配订阅者

转载 作者:行者123 更新时间:2023-12-02 02:18:10 25 4
gpt4 key购买 nike

根据我的理解,RXJava 在单个 JVM 中工作。是否有包装器/lib/api 来支持集群环境,并结合分布式缓存、JMS 或任何其他队列来提供订阅者跨分布式环境的扩展?在重新发明轮子之前想先检查一下这里。

最佳答案

您可以部署Vertx集群中的实例并在其上使用 RxJava。这个想法是使用 EventBus作为传输层并使用 RxJava 订阅消息。它不是一个纯粹的 RxJava 解决方案。

一个非常简单的可运行示例:

package com.example;

import java.util.concurrent.TimeUnit;

import io.reactivex.Flowable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.eventbus.EventBus;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

public class MainVerticle extends AbstractVerticle {

String nodeId;
static final String CENTRAL = "CENTRAL";

@Override
public void start() throws Exception {

EventBus eventBus = vertx.eventBus();

JsonObject config = config();
String nodeID = config.getString("nodeID");

eventBus.consumer(CENTRAL).toFlowable()
.map(msg -> (JsonObject) msg.body())
.filter(msgBody -> !msgBody.getString("sender", "").equals(nodeID))
.subscribe(msgBody -> {
System.out.println(msgBody);
});

Flowable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> {
JsonObject msg = new JsonObject()
.put("sender", nodeID)
.put("msg", "Hello world");
eventBus.publish(CENTRAL, msg);
});
}

public static void main(String[] args) {
ClusterManager clusterManager = new HazelcastClusterManager();

VertxOptions options = new VertxOptions().setClusterManager(clusterManager);

Vertx.rxClusteredVertx(options)
.doOnError(throwable -> throwable.printStackTrace())
.subscribe(vertx -> {
if (vertx.isClustered()) {
System.out.println("Vertx is running clustered");
}
String nodeID = clusterManager.getNodeID();
System.out.println("Node ID : " + nodeID);

String mainVerticle = MainVerticle.class.getCanonicalName();

DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setConfig(new JsonObject().put("nodeID", nodeID));

vertx.rxDeployVerticle(mainVerticle, deploymentOptions).subscribe();
});

}

}

Maven 依赖项:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>rxjava2-clustered</artifactId>
<version>0.42</version>
<packaging>jar</packaging>

<name>rxjava2-clustered</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.5.0</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.5.0</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<version>3.5.0</version>
</dependency>

</dependencies>
</project>

在此示例中,我使用 Hazelcast ClusterManager。 Infinispan、Apache Ignite 和 Apache Zookeeper 都有实现。引用documentation完整引用:

关于java - 跨 JVM 分配订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48950478/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com