- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试将我的 Flink 应用程序部署到 AWS Kinesis Data Analytics 中。此应用程序使用 Apache Avro 对传入消息进行反序列化/序列化。我的应用程序在我的本地机器上运行良好,但是当我将它部署到 AWS 时,出现异常(在 CloudWatch Logs 中): Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase;本地类不兼容:流 classdesc serialVersionUID = 4445917349737100331,本地类 serialVersionUID = -1463700717714793795
日志详细信息:
{
"locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:913)",
"logger": "org.apache.flink.runtime.taskmanager.Task",
"message": "Source: Custom Source -> Sink: Unnamed (1/1) (a72ff69f9dc0f9e56d1104ce21456a5d) switched from RUNNING to FAILED.",
"throwableInformation": [
"org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.",
"\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:160)",
"\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:380)",
"\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)",
"\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)",
"\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)",
"\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)",
"\tat java.lang.Thread.run(Thread.java:748)",
"Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795",
"\tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)",
"\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
"\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
"\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
"\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
"\tat java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)",
"\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)",
"\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
"\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readCurrentLayout(AvroSerializer.java:465)",
"\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readObject(AvroSerializer.java:432)",
"\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
"\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
"\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"\tat java.lang.reflect.Method.invoke(Method.java:498)",
"\tat java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)",
"\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)",
"\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)",
"\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)",
"\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
"\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)",
"\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)",
"\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)",
"\tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)",
"\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:158)",
"\t... 6 more"
],
"threadName": "Source: Custom Source -> Sink: Unnamed (1/1)",
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:829044228870:application/poc-kda",
"applicationVersionId": "8",
"messageSchemaVersion": "1",
"messageType": "INFO"
}
我使用库版本:
请注意,如果我使用 Apache Flink - 1.8、1.6,也会出现同样的问题
KDA Flink代码:
public class KinesisExampleKDA {
private static final String REGION = "us-east-1";
public static void main(String[] args) throws Exception {
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, REGION);
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(50000);
DataStream<EventAttributes> consumerStream = env.addSource(new FlinkKinesisConsumer<>(
"dev-events", new KinesisSerializer(), consumerConfig));
consumerStream
.addSink(getProducer());
env.execute("kinesis-example");
}
private static FlinkKinesisProducer<EventAttributes> getProducer(){
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION);
outputProperties.setProperty("AggregationEnabled", "false");
FlinkKinesisProducer<EventAttributes> sink = new FlinkKinesisProducer<>(new KinesisSerializer(), outputProperties);
sink.setDefaultStream("dev-result");
sink.setDefaultPartition("0");
return sink;
}
}
class KinesisSerializer implements DeserializationSchema<EventAttributes>, SerializationSchema<EventAttributes> {
@Override
public EventAttributes deserialize(byte[] bytes) throws IOException {
return EventAttributes.fromByteBuffer(ByteBuffer.wrap(bytes));
}
@Override
public boolean isEndOfStream(EventAttributes eventAttributes) {
return false;
}
@Override
public byte[] serialize(EventAttributes eventAttributes) {
try {
return eventAttributes.toByteBuffer().array();
} catch (IOException e) {
e.printStackTrace();
}
return new byte[1];
}
@Override
public TypeInformation<EventAttributes> getProducedType() {
return TypeInformation.of(EventAttributes.class);
}
}
Kinesis 生产者代码:
public class KinesisProducer {
private static String streamName = "dev-events";
public static void main(String[] args) throws InterruptedException, JsonMappingException {
AmazonKinesis kinesisClient = getAmazonKinesisClient("us-east-1");
try {
sendData(kinesisClient, streamName);
} catch (IOException e) {
e.printStackTrace();
}
}
private static AmazonKinesis getAmazonKinesisClient(String regionName) {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("kinesis.us-east-1.amazonaws.com",
regionName));
clientBuilder.withCredentials(DefaultAWSCredentialsProviderChain.getInstance());
clientBuilder.setClientConfiguration(new ClientConfiguration());
return clientBuilder.build();
}
private static void sendData(AmazonKinesis kinesisClient, String streamName) throws IOException {
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(streamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
EventAttributes eventAttributes = EventAttributes.newBuilder().setName("Jon.Doe").build();
putRecordsRequestEntry.setData(eventAttributes.toByteBuffer());
putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
System.out.println("Put Result" + putRecordsResult);
}
.avdl 格式的 Avro 架构:
@version("0.1.0")
@namespace("com.naya.avro")
protocol UBXEventProtocol{
record EventAttributes{
union{null, string} name=null;
}
}
Avro 自动生成的实体类:
@org.apache.avro.specific.AvroGenerated
public class EventAttributes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 2780976157169751219L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"EventAttributes\",\"namespace\":\"com.naya.avro\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<EventAttributes> ENCODER =
new BinaryMessageEncoder<EventAttributes>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<EventAttributes> DECODER =
new BinaryMessageDecoder<EventAttributes>(MODEL$, SCHEMA$);
…
Github 链接:
有人可以添加更多详细信息吗?为什么它不能在 AWS 上运行?
提前致谢
最佳答案
查看堆栈跟踪,它似乎并没有在它尝试读取消息时发生,但实际上是在运算符本身的初始化阶段发生的。
Flink 的工作方式 - 它序列化(使用 Java 序列化)每个需要执行的运算符,然后以序列化的形式在集群中分发它们。这意味着 KinesisSerializer 将自行序列化(作为一个类)以通过网络发送。
现在的问题是,Kinesis 序列化程序引用了 EventAttributes
模型,这意味着对 EventAttributes(类本身,而不是特定实例)的引用将与它一起序列化。作为序列化元数据的一部分,它有望扩展/实现。在你的情况下,它需要 SpecificRecordBase
这不是你的可分发的一部分,而是 Avro 库的一部分。
因此运算符本身的完整序列化链是 KinesisConsumer
-> KinesisSerializer
-> EventAttributes
-> SpecificRecordBase
(Avro 库的一部分)。
然而,AWS 使用 Flink 1.8,它使用 Avro 1.8.2,所有基本 avro 类也来自 1.8.2。您编译您的应用程序并将其链接到 1.9 的 avro 二进制文件。因此,当 Flink 尝试序列化您的运算符并将它们发送到集群时 - 它会将 reference 序列化为 1.9 版的 SpecificRecordBase。但是当 Flink 实际尝试反序列化它时 - 它发现版本与它实际可用的类 (1.8.2) 不匹配,并且链接失败。
这里有 2 个选项:
关于java - 用于 Java 应用程序的 Amazon Kinesis Data Analytics : Avro issue in deserialization incoming messages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59662118/
最近,我收到了一个项目要求,即从某个页面将数据发送到Google Analytics(分析)。我不知道该怎么做。 帐户ID和所有内容均已创建,我只想知道如何在加载某个网页时发送数据。 我一直在根据自己
我试图在此站点和其他一些站点上找到此问题的答案。但这似乎并不适合我自己。以下网址显示了有关如何同时使用GA和UA的说明。 How to use both ga.js and analytics.js?
从谷歌的文档: The analytics.js snippet is part of Universal Analytics, which is currently in public beta.
根据google的新analytics.js文档,您可以设置多个跟踪器,并通过在单独的send调用中按名称明确提及跟踪器来向其发送事件: https://developers.google.com/a
有什么办法可以让 Google Analytics 的“In-Page Analytics”显示外部链接流量? 实际上,外部链接的综合浏览量会显示在流量报告中,但不会显示在页内分析中。 我们正在使用这
我正在尝试编写一个 Google Analytics API 查询,它只返回去年每个月的每月唯一身份访问者。 This is the data I see in the Google Analytic
我们在我们的应用程序中使用 Google Analytics,但现在我们需要更改它并改用 Adobe Analytics。 在对这两种工具进行比较研究时,我现在意识到了这两种工具的优缺点和特点,
我需要您有关 Google Analytics (analytics.js) 的帮助。我在头部有第一个通用部分,效果很好: (function(i,s,o,g,r,a,m
这个问题在这里已经有了答案: Why use protocol-relative URLs at all? (5 个答案) 关闭 5 年前。 我正在阅读 https://developers.goo
将目标从Analytics(分析)导入到AdWords中,然后在Analytics(分析)中更改目标条件时,是否可以通过更改将目标“重新导入”到AdWords,还是可以自动选择? 最佳答案 更改目标值
Google最近更新了他们对开发人员的政策。 https://play.google.com/about/privacy-security/personal-sensitive/ If your ap
我正在使用google analytics api来获取数据。我正在获取数据,但我想验证两个参数,它们在特定日期范围内始终为0。我正在获取['ga:transactions']和['ga:goalCo
我使用Google API从Google Analytics(分析)获取数据,但指标与Google Analytics(分析)的网络界面不同。 即:我在2015年3月1日获得数据-它返回综合浏览量79
我安装了 Google Analytics (UA) 并将跟踪代码添加到 html 页面。我从浏览器文件中运行 html 页面:///C:/test.html 并使用谷歌调试器进行调试,它成功运行并显
我正在遵循 https://developers.google.com/analytics/devguides/collection/amp-analytics/ 的简单指南 尝试添加 Pagevie
我计划管理大约。通过为每个属性创建带有主机名过滤器的专用 View ,可以在一个属性下创建 400 个差异站点。是否有任何流程可以在不手动创建 View 和制作过滤器的情况下完成此任务? 例如:我们有
我想使用 Google Analytics API 访问 User Explorer 数据,以获取 JSON 值形式的报告。使用此 JSON 值,我可以创建用于分析的 Web 应用程序仪表板。我在此
我正在尝试使用此代码来跟踪 Google Analytics 中的事件 _trackEvent(category, action, opt_label, opt_value, opt_noni
我目前正在使用访问 token 和刷新 token 从 Google Analytics Reporting API (v4) 中提取数据。当我致力于自动从 Google Analytics 中提取数
我正在尝试根据此相关问题实现 anchor (index.html#anchor)跟踪: How to track anchor tags with Google Analytics 我使用 anch
我是一名优秀的程序员,十分优秀!