- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们的目标是实现以下架构。最重要的是能够读取主题 T1 的所有数据(来自所有分区)。
我们面临的问题是我们无法在从不同构建器创建的两个节点之间进行连接(每个实例中有两个不同的构建器)。在每个实例中,我们都创建了两个构建器(B1、B2)。 B1创建一个源处理器,从T1主题的所有分区读取数据,因此每个实例都有唯一的ID。 B2从T2的一个分区的一个分区读取数据。后来,当我们加入时,我们收到此错误拓扑无效:StateStore聚合流存储尚未添加,因为B1和B2具有不同的APP_ID。
这是我们的代码:
StrmApp 类
public class StrmApp extends StrmProc {
private StreamsBuilder myBuilder;
private Validator<String, Data> dataValidator;
private Properties ownBuilderProps;
private KafkaStreams ownStreams;
public StrmApp(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super(dataService, config, listener);
myBuilder = new StreamsBuilder();
dataValidator = getValidDataService().getValidator(String.class, Data.class);
ownBuilderProps = new Properties();
ownBuilderProps.putAll(getProperties());
// Unique ID for each instance (different consumer group)
ownBuilderProps.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID());
}
private KTable<String, TheDataList> globalStream() {
// KStream of records from T1 topic using String and TheDataSerde deserializers
KStream<String, Data> trashStream = getOwnBuilder().stream("T1", Consumed.with(Serdes.String(), SerDes.TheDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> kGroupedStream = trashStream.groupByKey();
// Describe how a StateStore should be materialized (as a KTable).
// In our case we are using the default RocksDB back-ends by providing "vdp-aggregated-stream-store" as a state store name
Materialized<String, TheDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("aggregated-stream-store");
materialized = materialized.withValueSerde(SerDes.TheDataListSerde);
// Return a KTable
return kGroupedStream.aggregate(() -> new TheDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
private Data tombstone(String Vid) {
Data d = new Data();
d.setVid(Vid);
d.setValideData(false);
d.setTimestamp(System.currentTimeMillis());
return d;
}
@Override
public void run() {
/* read from topic 2 (T2) - we want to only read one partition */
KStream<String, Data> inStream = getBuilder()
.stream(getProcessConfig().getTopicConfig().getTopicIn(), Consumed.with(Serdes.String(), SerDes.TheDataSerde))
.filter(getValidDataService().getValidator(String.class, Data.class));
/* Read all partitions from topic 1 (T1) - we want to read from all partitions (P1, P2 and P3) */
KTable<String, TheDataList> ft = globalStream();
// ERROR: Invalid topology: StateStore vdp-aggregated-stream-store is not added yet.
// When it comes to do the join it raises this error
// I think because two builders have different APP_ID
logger.warn("##JOIN:");
/* join bteween data coming from T1 with data coming from T2 */
KStream<String, TheDataList> validated = inStream.join(ft,
new ValueJoiner<Data, TheDataList, TheDataList>() {
@Override
public TheDataList apply(Data valid, TheDataList ivalids) {
ivalids.getList().forEach((c) -> {
dataValidator.validate(c, valid);
});
return ivalids;
}
});
// ...... some code
ownStreams = StreamTools.startKStreams(getOwnBuilder(), getOwnBuilderProps(), this, this);
super.startStreams();
}
private Properties getOwnBuilderProps() {
return ownBuilderProps;
}
private StreamsBuilder getOwnBuilder() {
// return getBuilder();
return myBuilder;
}
// .......
}
StrmProc 类
public abstract class StrmProc extends AProcess {
private final StreamsBuilder builder;
public StrmProc(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super(dataService, config, listener);
this.builder = new StreamsBuilder();
}
protected final StreamsBuilder getBuilder() {
return builder;
}
protected final KafkaStreams startStreams() {
return StreamTools.startKStreams(getBuilder(), getProperties(), this, this);
}
// ........
}
AProcess 类
public abstract class AProcess implements Process {
private final Properties propertie;
private final ProcessConfig config;
private final ValidDataService dataService;
private final ProcessListener listener;
protected AProcess(ValidDataService dataService, ProcessConfig config, ProcessListener listener) {
super();
this.dataService = dataService;
this.propertie = getProperties(config);
this.config = config;
this.listener = listener;
}
private Properties getProperties(ProcessConfig config) {
Properties kafkaProperties = new Properties();
kafkaProperties = new Properties();
kafkaProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getApp());
kafkaProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServerUrl());
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return kafkaProperties;
}
protected Properties getProperties() {
return propertie;
}
protected ProcessConfig getProcessConfig() {
return config;
}
protected ValidDataService getValidDataService() {
return dataService;
}
// .......
}
请问如何使用 Kafka 流实现此目的?
最佳答案
为了在 Kafka Streams 上使用 join,您需要使用单个 StreamsBuilder
实例,而不是两个(在您的情况下,其中两个 - 变量 inStream
和 ft
)。
如果 KeyValueStore 未添加到 StreamsBuilder
实例中,通常 Kafka Streams 会抛出异常 TopologyException: Invalid Topology: StateStore is not尚未添加
: streamsBuilder.addStateStore(storeBuilder)
.
关于java - 卡夫卡流: Invalid topology: StateStore is not added yet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53722235/
我正在使用 Groovy 进行一个项目,我想采用一个员工数组,这样在数组中没有经理跟随他们的下属。原因是我需要将人员添加到数据库中,我不希望分两次完成。 所以,我基本上有: 12
要缓存的数据: 100 Gb 数据 大小为 500-5000 字节的对象 平均每分钟更新/插入 1000 个对象(峰值 5000) 需要对生产和测试中的 Coherence 拓扑提出建议(与备份一起分
在所有边都处于错误方向的图上反转拓扑排序的结果是否会产生有效的拓扑顺序,就好像边在排序之前被反转一样? a -> b a -> c b -> d c -> d 可以给出 a b c d 的拓扑排序。反
使用JTS,如何找到多边形边界上距外部点最近的点? 最佳答案 查看 DistanceOp 。它返回一个坐标数组: 坐标[] pts = DistanceOp.closestPoints(poly, O
我正在尝试使用充当 Kafka 消费者的 Storm 喷口将我的数据从 Kafka 主题发送到 HBase,并将数据发送到 HBase 我在 Storm 拓扑中面临异常.... java.lang.R
我已经配置了我的机器 zookeeper、nimbus、supervisor 正常运行并且我的拓扑在 LocalCluster 中工作 LocalCluster cluster = new Local
我正在构建一个 REST API,但每次加载我的网站时,我都会收到 MongoError:拓扑已损坏。有人可以帮我解决这个问题吗?我感觉异步运行有问题。 const client = new Mong
我使用一些带有 zb 堆栈的 xbee (s2) 模块进行网状网络评估。因此,必须创建多跳环境。问题是,固件自己处理关联,并且没有像 api 提供的那样更深入地了解堆栈。为了强制数据的路径,而不干扰路
我有一个在 Node.js 中使用 Restify 和 Mongoose 构建的 REST 服务,以及一个包含大约 30.000 个常规大小文档的集合的 mongoDB。我的 Node 服务通过 pm
当我尝试创建时出现此错误: Error (E_UNKNOWN) :: Encountered an unexpected error MongoError: topology was destroye
我正在使用 storm jar 类将拓扑提交给 `nimbus'。它在本地工作正常但在远程集群上它说它无法加载主类。下面是错误 stderr: SLF4J: Class path contains m
当我尝试创建此错误时: Error (E_UNKNOWN) :: Encountered an unexpected error MongoError: topology was destroyed
我有一个内置在 node.js 中的 REST 服务,带有 Restify 和 Mongoose,还有一个 mongoDB,其中包含大约 30.000 个常规大小的文档。我的 Node 服务通过 pm
我正在寻找一种方法来设置“cassandra-topology.properties”的路径,以便 Cassandra 可以从给定路径获取此文件。有什么办法可以做到这一点吗? 谢谢,巴蒂亚 最佳答案
在尝试提交 Storm 拓扑时, ./storm jar /home/winoria1/Desktop/stormtopology.jar com.storm.StormTopology 我收到以下错
Apache Storm 如何设置新拓扑和存在一次的日志级别? 在java中我写道: import org.slf4j.Logger; import org.slf4j.LoggerFactory;
在同一台计算机上有两个节点的本地(测试)设置中(使用端口范围 47500..47501 的静态 IP 配置),“第二个”节点将不会加入集群;它发出 TcpDiscoveryJoinRequestMes
我是一名前端开发人员,试图在新的 Next 项目中拓展自己的视野,第一次学习 Node、Mongo 和 GraphQL 的服务器端。 Apollo 对我来说是最简单的入门方式,因为我已经在之前的项目中
我已将我的 MongoDB 数据库配置为单节点副本集。我可以通过 api 访问它(向它写入数据),也可以从 shell 访问它: rs0:PRIMARY> rs.status() { "set
我目前在 Windows 机器上使用 Netbeans 开发拓扑。当我以本地模式部署时: LocalCluster 集群 = new LocalCluster(); cluster.submitTop
我是一名优秀的程序员,十分优秀!