- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Confluence-3.2.1 作为 Kafka 流媒体。我正在尝试汇总我的 KGroupedStream<String, MyClass1>
进入KTable<Windowed<String>,MsgAggr>
。在使用聚合时,我还使用 TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
。我使用用户定义的“Serdes”作为聚合的参数。用户定义“Serdes”的代码是,
Map<String, Object> serdeProps = new HashMap<>();
final Serializer<MsgAggr> pageViewSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewSerializer.configure(serdeProps, false);
final Deserializer<MsgAggr> pageViewDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", MsgAggr.class);
pageViewDeserializer.configure(serdeProps, false);
final Serde<MsgAggr> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);`
流媒体代码是
KGroupedStream<String, MyClass1> msg_grp = message
.groupByKey();
KTable<Windowed<String>,MsgAggr> msg_win = msg_grp
//.reduce(new Reduced(), arg1, arg2);
.aggregate(new Init(),
new Aggr(),
TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),
pageViewSerde,
"MySample_out");
当我运行代码时,出现错误:
[2017-05-23 18:16:45,648] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread:249)
java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Exception in thread "StreamThread-1" java.lang.ClassCastException: my.kafka.strm.MyClass1 cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
最佳答案
问题出在 message.groupByKey();
上。它使用 String Serde 作为您的自定义类 MyClass1
。请为 MyClass1
实现自定义序列化器和反序列化器,并在 groupByKey
的重载版本中使用相同的序列化器和反序列化器 - https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde)
关于java - 卡夫卡流媒体 : Issue with user defined 'Serdes' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44138527/
export class UserListComponent implements OnInit{ users; constructor(private userService: UserS
我最近在我的系统中遇到了 Java 语言环境的问题,我试图用这个配置运行一个项目: -Duser.language=pt_BR -Duser.country=BR 谷歌搜索后,我找到了this sit
1 当我希望出现注册错误时,我的代码出现问题:管理器不可用; 'auth.User' 已替换为 'users.User' ,我尝试解决其他问题,与 Manager 不可用相同; 'auth.User'
Loopback 非常酷,但这是我迄今为止遇到的一个缺点,我真的不确定如何解决它。内置用户模型在我的 MongoDB 数据库中生成一个名为“User”的集合,当我尝试根据 Loopback.js 自己
我在 aws cognito 中有以下用户组。行政成员付费成员(member) 我想在所有用户注册我的应用程序时将所有用户默认分配到 Member 用户组,这样我就可以为该用户组分配不同的 IAM A
blogsIndex.blade.php @extends('layouts.default') @section('details')
我正在尝试在Rails 3开发环境中使用sqlite3而不是MySQL,但是遇到了问题。尝试执行rake db:migrate时,我得到: SQLite3::SQLException: no such
尝试使用 构建 API Phoenix v1.3 按照本教程: https://dreamconception.com/tech/phoenix-full-fledged-api-in-five-mi
我正在使用通过模板 cookie-cutter 创建的 Django。当我尝试在本地使用 docker 运行项目时,出现以下错误。 FATAL: password authentication fai
我正在尝试使用 node.js/adonis 创建新用户 我创建了这两个函数: const User = use("App/Models/User") async store ({ request,
我想安排一些事情,例如 GET 请求 http://example.com/user/foo@bar.com 内部调用脚本 /var/www/example.com/rest/user/GET.php
我是一名具有可用性工程背景的软件开发人员。当我在研究生院学习可用性工程时,其中一位教授有一句口头禅:“你不是用户”。我们的想法是,我们需要将 UI 设计基于实际的用户研究,而不是我们自己关于 UI 应
您好,我正在制作一个使用互联网发送消息的消息传递应用程序。我需要从用户 a 向用户 b 发出通知。 我使用这段代码: if (toUser!= nil){ parseMessage[@
在 ruby/ror 中你可以这样做: user = User.new(params[:user]) 它使用发布表单中的值填充新对象。 使用 django/python 可以完成类似的事情吗? 最
每当我编辑用户的角色时,用户都需要注销并重新登录以查看更改。提升用户时没有问题,因为他们在再次登录之前不会看到额外的权限。但是,当降级发生时,用户仍将保留其现有角色,这会带来安全风险。想象一下,撤销一
我的核心数据有线问题。使用 iOS 10 中的 Swift3,每次使用 获取或存储数据时,我都会获得托管对象上下文 func getContext () -> NSManagedObjectCont
我发现当我使用 users_path(user) 时它返回 /users.id 其中 id 是用户的 ID 但我希望它返回 /用户/ID。我的配置 routes.rb 如下所示。 # config/r
我的应用程序在我的测试设备上正常运行(当我通过 ADT 安装它时,当我通过导出的 APK 文件安装它时)但它在 Play Store 测试设备上失败并出现以下错误: Permission Denial
创建模型的第一个条目会抛出错误 我执行了以下命令进行迁移 manage.py makemigrations manage.py migrate 在我执行这些命令以在数据库中创建第一个“数据”之后,一切
我正在尝试实现一个 getter,但它在下面代码 fragment 的最后一行向我显示了这个错误。 代码是—— class AuthRepository extends BaseAuthReposit
我是一名优秀的程序员,十分优秀!