gpt4 book ai didi

java - kafka 流不与动态生成的类一起运行

转载 作者:行者123 更新时间:2023-11-30 12:03:28 24 4
gpt4 key购买 nike

我想启动一个反序列化动态创建类的流。此 Bean 是通过使用反射和 URLCLassLOader 以给定的字符串类作为参数创建的,但 KafkaStreams API 无法识别我的新类。

流与预先创建的 Bean 完美配合,但在使用动态 Bean 时会自动关闭。反序列化器是与 Jackson 一起创建的,也可以单独工作。

这是类解析器代码

@SuppressWarnings("unchecked")
public static Class<?> getClassFromSource(String className, String sourceCode)
throws IOException, ClassNotFoundException {

/*
* create an empty source file
*/
File sourceFile = new File(com.google.common.io.Files.createTempDir(), className + ".java");

sourceFile.deleteOnExit();

/*
* generate the source code, using the source filename as the class name write
* the source code into the source file
*/
try (FileWriter writer = new FileWriter(sourceFile)) {
writer.write(sourceCode);
}

/*
* compile the source file
*/
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

File parentDirectory = null;

try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

parentDirectory = sourceFile.getParentFile();

fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

Iterable<? extends JavaFileObject> compilationUnits = fileManager
.getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
}

/*
* load the compiled class
*/
try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {

parentDirectory = sourceFile.getParentFile();

fileManager.setLocation(StandardLocation.CLASS_OUTPUT, Arrays.asList(parentDirectory));

Iterable<? extends JavaFileObject> compilationUnits = fileManager
.getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));

compiler.getTask(null, fileManager, null, null, null, compilationUnits).call();
}

/*
* load the compiled class
*/
try (URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { parentDirectory.toURI().toURL() })) {
return (Class<?>) classLoader.loadClass(className);
}
}

首先我实例化接收类作为参数的 Serdes

// dynamic generated class from a source class
Class clazz = getClassFromSource("DynamicClass", source);

// Serdes for created class that implements org.apache.kafka.common.serialization.Deserializer
DynamicDeserializer deserializer = new DynamicDeserializer(clazz);
DynamicSerializer serializer = new DynamicSerializer();
Serde<?> encryptedSerde = Serdes.serdeFrom(serializer, deserializer);

然后启动使用这个Serdes的Stream拓扑

StreamsBuilder builder = new StreamsBuilder();

KTable<String, Long> dynamicStream = builder
.stream(topicName, Consumed.with(Serdes.String(), encryptedSerde))
.groupByKey()
.count();

dynamicStream.to(outputTopicName, Produced.with(Serdes.String(), Serdes.Long()));

流拓扑应该正常执行,但总是产生这个错误

2019-09-01 14:54:16 WARN  ConsumerConfig:355 - The configuration 'log4j.appender.stdout.Target' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'log4j.appender.stdout.layout.ConversionPattern' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'stream.restart.application' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'aes.key.path' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'path.to.listening' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'log4j.appender.stdout' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'admin.retries' was supplied but isn't a known config.
2019-09-01 14:54:16 WARN ConsumerConfig:355 - The configuration 'log4j.rootLogger' was supplied but isn't a known config.
2019-09-01 14:54:16 INFO AppInfoParser:117 - Kafka version: 2.3.0
2019-09-01 14:54:16 INFO AppInfoParser:118 - Kafka commitId: fc1aaa116b661c8a
2019-09-01 14:54:16 INFO AppInfoParser:119 - Kafka startTimeMs: 1567360456724
2019-09-01 14:54:16 INFO KafkaStreams:800 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] Started Streams client
2019-09-01 14:54:16 INFO StreamThread:740 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Starting
2019-09-01 14:54:16 INFO StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from CREATED to RUNNING
2019-09-01 14:54:16 INFO KafkaConsumer:1027 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Subscribed to pattern: 'DynamicBean|streamingbean-test-20190901145412544-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition'
2019-09-01 14:54:17 INFO Metadata:266 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO Metadata:266 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Cluster ID: tp7OBhwVRQqT2NpPlL55_Q
2019-09-01 14:54:17 INFO AbstractCoordinator:728 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Discovered group coordinator AcerDerick:9092 (id: 2147483647 rack: null)
2019-09-01 14:54:17 INFO ConsumerCoordinator:476 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Revoking previously assigned partitions []
2019-09-01 14:54:17 INFO StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
2019-09-01 14:54:17 INFO KafkaStreams:257 - stream-client [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72] State transition from RUNNING to REBALANCING
2019-09-01 14:54:17 INFO KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO StreamThread:324 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] partition revocation took 0 ms.
suspended active tasks: []
suspended standby tasks: []
2019-09-01 14:54:17 INFO AbstractCoordinator:505 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] (Re-)joining group
2019-09-01 14:54:17 ERROR StreamsPartitionAssignor:354 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer] DynamicClass is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
2019-09-01 14:54:17 INFO AbstractCoordinator:469 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Successfully joined group with generation 1
2019-09-01 14:54:17 INFO ConsumerCoordinator:283 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-consumer, groupId=streamingbean-test-20190901145412544] Setting newly assigned partitions:
2019-09-01 14:54:17 INFO StreamThread:1164 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Informed to shut down
2019-09-01 14:54:17 INFO StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
2019-09-01 14:54:17 INFO StreamThread:1178 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutting down
2019-09-01 14:54:17 INFO KafkaConsumer:1068 - [Consumer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2019-09-01 14:54:17 INFO KafkaProducer:1153 - [Producer clientId=streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-09-01 14:54:17 INFO StreamThread:207 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-09-01 14:54:17 INFO StreamThread:1198 - stream-thread [streamingbean-test-20190901145412544-15574162-7649-4c98-acd2-7a68ced01d72-StreamThread-1] Shutdown complete

最佳答案

一段时间后,我用一个简单的解决方案解决了这个问题,但可能不是最优雅的。首先,我使用 JSON 字符串反序列化器从主题中获取数据,然后将其传递给另一个反序列化器,该反序列化器转换为我的动态对象。

关于java - kafka 流不与动态生成的类一起运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57748378/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com