gpt4 book ai didi

java - 将 Kafka 与 Apache Calcite 集成

转载 作者:行者123 更新时间:2023-11-30 06:52:44 28 4
gpt4 key购买 nike

我正在尝试将方解石与 Kafka 集成,我引用了 CsvStreamableTable。

每个 ConsumerRecord 都使用下面的代码转换为 Object[]:

static class ArrayRowConverter extends RowConverter<Object[]> {
private List<Schema.Field> fields;

public ArrayRowConverter(List<Schema.Field> fields) {
this.fields = fields;
}

@Override
Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
Object[] objects = new Object[fields.size()+1];
int i = 0 ;
objects[i++] = consumerRecord.timestamp();
for(Schema.Field field : this.fields) {
Object obj = consumerRecord.value().get(field.name());
if( obj instanceof Utf8 ){
objects[i ++] = obj.toString();
}else {
objects[i ++] = obj;
}
}
return objects;
}
}

Enumerator的实现如下,一个线程不断地从kafka轮询记录并将它们放入队列中,getRecord()方法从该队列轮询:

public E current() {
return current;
}

public boolean moveNext() {
for(;;) {
if(cancelFlag.get()) {
return false;
}
ConsumerRecord<String, GenericRecord> record = getRecord();
if(record == null) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
current = rowConvert.convertRow(record);
return true;
}
}

我测试了SELECT STREAM * FROM Kafka.clicks,它工作正常。rowtime是显式添加的第一列,值为Kafka的记录时间戳。

但是当我尝试

SELECT STREAM FLOOR(rowtime TO HOUR) 
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime TO HOUR), ip

抛出异常

java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
at org.apache.calcite.avatica.Helper.createException(Helper.java:41)

最佳答案

您需要声明“ROWTIME”列是单调的。在 MockCatalogReader 中,请注意如何在“ORDERS”和“SHIPMENTS”流中将“ROWTIME”声明为单调的。这就是为什么 SqlValidatorTest.testStreamGroupBy() 中的某些查询有效而其他查询无效的原因。 validator 依赖的关键方法是SqlValidatorTable.getMonotonicity(String columnName)

关于java - 将 Kafka 与 Apache Calcite 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42423559/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com