- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 SparkStreaming 从主题读取数据。我遇到了一个异常(exception)。
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord Serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = rawEventTopic, partition = 0, offset = 14098, CreateTime = 1556113016951, serialized key size = -1, serialized value size = 2916, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":null,"message":null,"eventDate":"","group":null,"category":"AD","userName":null,"inboundDataSource":"AD","source":"192.168.1.14","destination":"192.168.1.15","bytesSent":"200KB","rawData":"{username: vinit}","account_name":null,"security_id":null,"account_domain":null,"logon_id":null,"process_id":null,"process_information":null,"process_name":null,"target_server_name":null,"source_network_address":null,"logon_process":null,"authentication_Package":null,"network_address":null,"failure_reason":null,"workstation_name":null,"target_server":null,"network_information":null,"object_type":null,"object_name":null,"source_port":null,"logon_type":null,"group_name":null,"source_dra":null,"destination_dra":null,"group_admin":null,"sam_account_name":null,"new_logon":null,"destination_address":null,"destination_port":null,"source_address":null,"logon_account":null,"sub_status":null,"eventdate":null,"time_taken":null,"s_computername":null,"cs_method":null,"cs_uri_stem":null,"cs_uri_query":null,"c_ip":null,"s_ip":null,"s_supplier_name":null,"s_sitename":null,"cs_username":null,"cs_auth_group":null,"cs_categories":null,"s_action":null,"cs_host":null,"cs_uri":null,"cs_uri_scheme":null,"cs_uri_port":null,"cs_uri_path":null,"cs_uri_extension":null,"cs_referer":null,"cs_user_agent":null,"cs_bytes":null,"sc_status":null,"sc_bytes":null,"sc_filter_result":null,"sc_filter_category":null,"x_virus_id":null,"x_exception_id":null,"rs_content_type":null,"s_supplier_ip":null,"cs_cookie":null,"s_port":null,"cs_version":null,"creationTime":null,"operation":null,"workload":null,"clientIP":null,"userId":null,"eventSource":null,"itemType":null,"userAgent":null,"eventData":null,"sourceFileName":null,"siteUrl":null,"targetUserOrGroupType":null,"targetUserOrGroupName":null,"sourceFileExtension":null,"sourceRelativeUrl":null,"resultStatus":null,"client":null,"loginStatus":null,"userDomain":null,"clientIPAddress":null,"clientProcessName":null,"clientVersion":null,"externalAccess":null,"logonType":null,"mailboxOwnerUPN":null,"organizationName":null,"originatingServer":null,"subject":null,"sendAsUserSmtp":null,"deviceexternalid":null,"deviceeventcategory":null,"devicecustomstring1":null,"customnumber2":null,"customnumber1":null,"emailsender":null,"sourceusername":null,"sourceaddress":null,"emailrecipient":null,"destinationaddress":null,"destinationport":null,"requestclientapplication":null,"oldfilepath":null,"filepath":null,"additionaldetails11":null,"applicationprotocol":null,"emailrecipienttype":null,"emailsubject":null,"transactionstring1":null,"deviceaction":null,"devicecustomdate2":null,"devicecustomdate1":null,"sourcehostname":null,"additionaldetails10":null,"filename":null,"bytesout":null,"additionaldetails13":null,"additionaldetails14":null,"accountname":null,"destinationhostname":null,"dataSourceId":2,"date":"","violated":false,"oobjectId":null,"eventCategoryName":"AD","sourceDataType":"AD"})) - element of array (index: 0) - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) ~[spark-core_2.11-2.3.0.jar:2.3.0] at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[spark-core_2.11-2.3.0.jar:2.3.0] at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[spark-core_2.11-2.3.0.jar:2.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393) ~[spark-core_2.11-2.3.0.jar:2.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_151] at java.lang.Thread.run(Unknown Source) [na:1.8.0_151]
2019-04-24 19:07:00.025 ERROR 21144 --- [result-getter-1] o.apache.spark.scheduler.TaskSetManager : Task 1.0 in stage 48.0 (TID 97) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
读取主题数据的代码如下 -
@Service
public class RawEventSparkConsumer {
private final Logger logger = LoggerFactory.getLogger(RawEventSparkConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
@Autowired
private JavaStreamingContext streamingContext;
@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;
@Autowired
private EnrichEventKafkaProducer enrichEventKafkaProd;
@PostConstruct
private void sparkRawEventConsumer() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
messages.foreachRDD((rdd) -> {
List<ConsumerRecord<String, String>> rddList = rdd.collect();
Iterator<ConsumerRecord<String, String>> rddIterator = rddList.iterator();
while (rddIterator.hasNext()) {
ConsumerRecord<String, String> rddRecord = rddIterator.next();
if (rddRecord.topic().toString().equalsIgnoreCase("rawEventTopic")) {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel = mapper.readValue(rddRecord.value(), BaseDataModel.class);
EnrichEventDataModel enrichEventDataModel = (EnrichEventDataModel) csvDataModel;
enrichEventKafkaProd.sendEnrichEvent(enrichEventDataModel);
} else if (rddRecord.topic().toString().equalsIgnoreCase("enrichEventTopic")) {
System.out.println("************getting enrichEventTopic data ************************");
}
}
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});
}
这是配置代码。
@Bean
public JavaInputDStream<ConsumerRecord<String, String>> getKafkaParam(JavaStreamingContext streamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(rawEventTopic,enrichEventTopic);
return KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
}
请帮忙。我被困在这一点上。
最佳答案
在下面的链接中找到了我的问题的解决方案 -
org.apache.spark.SparkException: Task not serializable
将内部类声明为静态变量:
static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
关于java - 获取 NotSerializedException - 将 Spark Streaming 与 Kafka 结合使用时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55831626/
我现在正把头撞到墙上。 事情是这样的:我创建了一个 BlockList 类,在其中创建 block 列表。这个类是序列化的,Block 类也是序列化的(也是我在 Block 类中使用的 RGB 类)。
我收到以下错误: java.io.NotSerializableException: cscie55.project.AccountInfo 我的 AccountInfo 类未实现 java.io.S
下面的代码抛出 java.io.NotSerializedException。为什么?谢谢! private void test3() { Element3 element3=new Eleme
我尝试在 Spark 中跨 JavaRDD 映射函数,但在 map 调用中不断收到 NotSerializedError 错误。 public class SparkPrunedSet extends
我在 Android 上,尝试将对象列表保存到文件中,但总是遇到此异常:java.io.NotSerializedException: android.app.Application。 p> 我试图找
我有一个实现可序列化的类,但每当我尝试将其写入磁盘时,我都无法找到它抛出此异常的原因。我正在尝试编写 ValueConatiner.class public class ValueContainer
我仅在第一次时在 objStream.writeObject(myobj) 中收到 NotSerializedException。 myobj 内部包含对象列表。同一个 myobj,当它不是第一个时,
所以今天我正在为 Minecraft 服务器开发一个插件。为此,我创建了一个代表赛道的“Track”类。为了存储所有现有轨道及其数据,我想保存整个轨道对象,这使一切对我来说更容易。总之,我在 Trac
我正在构建一个带有服务器和多个客户端的聊天应用程序。当用户连接到服务器时,服务器调用 notify_clients() 并向每个客户端发送在线客户端列表,以显示在客户端“在线列表”中。 serve
我正在使用 Ehcache 试验 XA 事务。目前我正在使用 Spring 事务管理和 Bitronix 作为事务管理器。 我使用以下方法创建、配置和填充缓存: @Transactional publ
我正在使用 primefaces 和 web 2.0 进行 j2ee目前因此错误而卡在此处 java.io.NotSerializableException: org.apache.derby
我正在使用 Javafx,并将对象包装到 ListProperty 中,以便让 TableView 针对列表对象的任何更改进行更新。现在我正在尝试序列化我的项目和对象的 ListProperty,它向
我使用的是quartz 1.5.2 和 spring 3.0.5 版本。当我尝试使用 jdbc 存储类型quartz 获取调度程序上下文时,我面临 NotSerializedException。我已经
我需要将 Cursor 对象从一个 Android 模拟器传递到另一个。但是当我尝试序列化它时,出现“NotSerialableException:android.database.sqlite.SQ
我正在用 Java 编写一个应用程序,它需要做的部分工作是序列化一些对象,以便以后可以导入它们。当我编写序列化代码时,它无法正常工作。经过多次修改,我相信我已经将其范围缩小到只有几个属性,并包含了触发
我在 Android 应用程序中使用 ObjectOutputStream 和 ObjectInputStream 保存和读取保存在文件中的对象时遇到一些问题我有两个类(class)“锻炼”和“锻炼”
这个问题已经有答案了: java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException (2 个回答)
我正在尝试使用 ObjectInputStream 从 .dat 文件读取对象数组(称为 PWlist)。该数组包含两个不同的对象:LongPW 和 PinPW。当我尝试执行读取时,我收到此异常...
我正在评估在 JTextPane(textpane) 中用作编辑器套件的 StyledEditorKit 的可用性。一旦用户在编辑器中键入任何内容并关闭编辑器,程序就会获取 Document(text
我正在尝试将行集的内容打印到文件中,但出现 java.io.NotSerializedException: 这是我的程序.. package k564; import java.io.*; impor
我是一名优秀的程序员,十分优秀!