gpt4 book ai didi

java - 了解创建的 StreamProcessor 实例数以及流任务是否共享同一个流处理器实例?

转载 作者:行者123 更新时间:2023-12-04 14:16:02 27 4
gpt4 key购买 nike

我想更详细地了解 StreamThreadStreamTask 之间的关系以及当我们创建时有多少 StreamProcessor 实例:

  • 具有多个分区的源 kafka 主题,例如 6。
  • 我只保留 ONE StreamThread (num.stream.threads=1)

我保留了一个简单的处理器拓扑结构:

source_topic --> Processor1 --> Processor2 --> Processo3 --> sink_topic

每个处理器简单地转发到链中的下一个处理器。其中一个处理器的片段。我正在使用低级 Java API。

public class Processor1 implements Processor<String, String> {

private ProcessorContext context;
public Processor1() {

}

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context
}

@Override
public void punctuate(long timestamp) {
// TODO Auto-generated method stub
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
}

主驱动程序应用程序片段:

Topology topology = new Topology();

topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");

Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

有了这样的安排,我有以下问题:

  • 将创建多少个处理器实例(Processor1Processor2Processor3)?
  • 据我了解,将有六个流任务。是为每个 Stream 任务 创建一个新的处理器实例,还是它们“共享”相同的 Processor 实例
  • 创建Stream Thread 时,它会创建一个新的processor 实例吗?
  • Stream Tasks 是否作为 Stream Threads 创建的一部分创建?

(新问题添加到原始列表)

  • 在这种情况下,单个流线程 将具有 六个流任务stream thread 是否逐个执行这些 stream tasks,有点像“循环”。 流任务是否作为单独的“线程”运行。基本上,无法理解单个流线程如何同时/并行运行多个流任务

下面是打印出来的拓扑结构:


KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread appId: my-first-streams-application
StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
Active tasks:
Running: StreamsTask taskId: 0_0
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-0]
StreamsTask taskId: 0_1
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-1]
StreamsTask taskId: 0_2
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-2]
StreamsTask taskId: 0_3
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-3]
StreamsTask taskId: 0_4
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-4]
StreamsTask taskId: 0_5
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-5]

Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New:


最佳答案

How many instances of processors (Processor1, Processor2, Processor3) will be created?

在您的示例中,每个 6 个。每个任务都将实例化拓扑 的完整副本。 (参见 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355 ;注意:Topology 是程序的逻辑表示,在运行时实例化为 ProcessorTopology)

As per my understanding, there will be SIX stream tasks. Is a new instance of processor created for each Stream task or they "share" the same Processor instance?

每个任务都有自己的 Processor 实例——它们不共享。

When a Stream Thread is created, does it create a new instance of processor?

没有。创建任务时,它将创建新的 Processor 实例。

Are Stream Tasks created as part of Stream Threads creation?

没有。任务是根据分区/任务分配在重新平衡期间创建的。 KafkaStreams 在其调用 TaskManager#createTasks()

的内部 cosumner 上注册了一个 StreamsRebalanceListener

更新(随着问题的扩展):

In this scenario a single stream thread will have SIX stream tasks. Does a stream thread execute these stream tasks one-by-one, sort of "in-a-loop". Do stream tasks run as a separate "thread". Basically, not able to understand how a single stream thread run multiple stream tasks at the same time/parallely?

是的,StreamsThread 将循环执行任务。没有其他线程。因此,分配给同一线程的任务不会同时/并行执行,而是一个接一个地执行。(参见 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472——每个 StreamThread 只使用一个 内部使用 AssignedStreamsTasksAssignedStandbyTasks 的 TaskManager。)

关于java - 了解创建的 StreamProcessor 实例数以及流任务是否共享同一个流处理器实例?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60133007/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com