gpt4 book ai didi

apache-kafka - Kafka时差最后两条记录,KSQL还是其他?

转载 作者:行者123 更新时间:2023-12-03 17:03:22 24 4
gpt4 key购买 nike

所以我正在评估Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件的“已用时间”的新主题,本质上是因为传感器将向 Kafka 报告“开”或“关”。因此,拥有时间戳、传感器名称和状态,创建具有“开”和“关”状态持续时间的新主题。

  • 这在 KSQL 中可行吗?
  • 或者真的应该让消费者或流处理器来解决这个问题?

  • 我的数据是这样的:
    { 2019:02:15 00:00:00, sensor1, off}
    { 2019:02:15 00:00:30, sensor1, on}

    得到结果
    { 2019:02:15 00:30:00, sensor1, off, 30sec }. 

    本质上必须组合多个传感器的状态来确定机器的组合状态。工厂中有数百个甚至数千个传感器

    最佳答案

    这在 Kafka Streams 中很容易,所以我会选择 2。

    首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,因此无法计算两个时间戳之间的持续时间。 Use something喜欢 epoch time .

    从源数据模型开始,例如

    interface SensorState {
    String getId();
    Instant getTime();
    State getState();
    enum State {
    OFF,
    ON
    }
    }

    和一个目标

    interface SensorStateWithDurationX {
    SensorState getEvent();
    Duration getDuration();
    }

    现在您已经定义了输入和输出流(但请参阅“ Data Types and Serialization ”),您只需要通过简单地定义 Applying processors and transformers 来转换值(“ ValueTransformer ”) .

    它必须做两件事:
  • 检查传感器历史数据的状态存储,并在必要时使用新数据更新它
  • 当历史数据可用时,计算时间戳之间的差异并将数据连同计算的持续时间一起发出

  • class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
    KeyValueStore<String, SensorState> store;

    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
    }

    public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
    return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
    }

    public void close() {}

    /**
    * Checks the state store for historical state based on sensor ID and updates it, if necessary.
    *
    * @param sensorState The new sensor state
    * @return The old sensor state
    */
    Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
    // Update the state store to the new state
    store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
    }

    /**
    * Check if we need to update the state in the state store.
    *
    * <p>Either we have no historical data, or the state has changed.
    *
    * @param oldState The old sensor state
    * @param sensorState The new sensor state
    * @return Flag whether we need to update
    */
    boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
    }

    /**
    * Wrap the old state with a duration how log it lasted.
    *
    * @param oldState The state of the sensor so far
    * @param sensorState The new state of the sensor
    * @return Wrapped old state with duration
    */
    SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
    }
    }

    将所有内容(“ Connecting Processors and State Stores ”)放在一个简单的 Topology 中:

    var builder = new StreamsBuilder();

    // Our state store
    var storeBuilder =
    Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("SensorStates"),
    Serdes.String(),
    storeSerde);

    // Register the store builder
    builder.addStateStore(storeBuilder);

    builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

    var topology = builder.build();

    完整的申请位于 github.com/melsicon/kafka-sensors .

    关于apache-kafka - Kafka时差最后两条记录,KSQL还是其他?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54713760/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com