gpt4 book ai didi

java - Flink Streaming 在计算求和时抛出异常

转载 作者:可可西里 更新时间:2023-11-01 16:30:04 27 4
gpt4 key购买 nike

我正在使用机架可用的 Flink Streaming 示例,我想通过机架 ID 计算温度组的总和。以下是我的代码:

static Properties properties=new Properties();
public static Properties getProperties()
{
properties.setProperty("bootstrap.servers", "54.210.139.57:9092");
properties.setProperty("zookeeper.connect", "54.210.139.57:2181");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}

@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

/*DataStream<String> dstream=env.addSource(new FlinkKafkaConsumer09<String>("TemperatureEvent",new SimpleStringSchema(), props));
dstream.filter(dstream -> dstream.)*/
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props));

DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").sum(1);

ds1.print();
env.execute("Temperature Consumer");
}

当我尝试执行这段代码时,它抛出以下异常: 程序结束时出现以下异常:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array).
at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:78)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:39)
at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:292)
at com.yash.main.Program.main(Program.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)

我正在使用 sum(1),因为我的第 0 个参数是 rackId,第 1 个参数是在 pojo TemperatureEvent 中定义的温度,如下所示:

public class TemperatureEvent
{
private int rackId;
private double temperature;
private long timeStamp;

public TemperatureEvent()
{
// TODO Auto-generated constructor stub
}

public TemperatureEvent(int rackId, double temperature, long timeStamp) {
super();
this.rackId = rackId;
this.temperature = temperature;
this.timeStamp = timeStamp;
}

public int getRackId() {
return rackId;
}

public void setRackId(int rackId) {
this.rackId = rackId;
}

public double getTemperature() {
return temperature;
}

public void setTemperature(double temperature) {
this.temperature = temperature;
}

public long getTimeStamp() {
return timeStamp;
}

public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}

@Override
public String toString() {
//return String.format("TemperatureEvent [rackId=%s, temperature=%s, timeStamp=%s]",rackId, temperature, timeStamp);
String str=getRackId()+","+temperature+","+getTimeStamp();
return str;

}

这个问题的解决方案是什么??如何通过 rackID 计算温度组的总和??

最佳答案

如果您的类型是元组类型,您只能在这些方法上使用基于索引的参数。在您的情况下,它应该与 .sum("temperature") 一起使用。

关于java - Flink Streaming 在计算求和时抛出异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37924110/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com