Kafka时差最后两个记录,ksql还是其他?

izkcnapc  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(384)

所以我在评估Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件的“时间流逝”的新主题,本质上是因为传感器将向kafka报告“on”或“off”。所以有了时间戳、传感器名称和状态,就可以创建持续时间为“开”和“关”状态的新主题。
这在ksql中是否可行,以及如何实现?
或者真的应该让消费者或流处理器来解决这个问题吗?
我的数据是这样的:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}

为了得到结果

{ 2019:02:15 00:30:00, sensor1, off, 30sec }.

基本上必须组合多个传感器的状态来确定机器的组合状态。工厂里有成百上千的传感器

yks3o0rb

yks3o0rb1#

这是相当容易在Kafka流,所以我会选择2。
首先,必须正确地建模输入数据。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。使用类似epoch time的东西。
从源数据模型开始,比如

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

和一个目标

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

现在您已经定义了输入和输出流(但请参阅“数据类型和序列化”),您只需要通过定义 ValueTransformer .
它必须做两件事:
检查传感器历史数据的状态存储,必要时用新数据更新
当历史数据可用时,计算时间戳之间的差异,并将数据与计算的持续时间一起发出

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

在一个简单的拓扑中把所有的东西放在一起(“连接处理器和状态存储”):

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

完整的应用程序位于github.com/melsicon/kafka-sensors。

u4dcyp6a

u4dcyp6a2#

跟进来自https://github.com/confluentinc/ksql/issues/2562 为了使用自连接,我提出了以下解决方案:
创建数据


# kafka-topics --bootstrap-server localhost:9092  --delete --topic temptest

echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest

这里我们假设连续事件已经有了counter属性。这种计数器也可以通过简单地聚合随时间变化的事件计数与ksql一起添加。
区分功能

-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');

--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;

--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;

-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
  prev.temp-cur.temp as temp_difference, cur.temp as temp,  prev.temp as prev_temp, cur.counter as counter
  FROM temp cur
  LEFT JOIN temp_shift prev WITHIN 2 HOURS
  ON cur.counter = prev.counter;

测试一下

ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4

传感器本身被忽略了,以保持解决方案的简单性。但是,可以通过使用用于分区的复合键轻松地添加它,如中所述https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key

相关问题