- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我一直在尝试使用 Kafka 和 Avro 数据进行 Angel Conde 的结构化流 Structured-Streaming Avro
然而,我的数据似乎有点复杂,其中有嵌套数据。这是我的代码,
private static Injection<GenericRecord, byte[]> recordInjection;
private static StructType type;
private static final String SNOQTT_SCHEMA = "{"
+"\"type\": \"record\","
+"\"name\": \"snoqttv2\","
+"\"fields\": ["
+" { \"name\": \"src_ip\", \"type\": \"string\" },"
+" { \"name\": \"classification\", \"type\": \"long\" },"
+" { \"name\": \"device_id\", \"type\": \"string\" },"
+" { \"name\": \"alert_msg\", \"type\": \"string\" },"
+" { \"name\": \"src_mac\", \"type\": \"string\" },"
+" { \"name\": \"sig_rev\", \"type\": \"long\" },"
+" { \"name\": \"sig_gen\", \"type\": \"long\" },"
+" { \"name\": \"dest_mac\", \"type\": \"string\" },"
+" { \"name\": \"packet_info\", \"type\": {"
+" \"type\": \"record\","
+" \"name\": \"packet_info\","
+" \"fields\": ["
+" { \"name\": \"DF\", \"type\": \"boolean\" },"
+" { \"name\": \"MF\", \"type\": \"boolean\" },"
+" { \"name\": \"ttl\", \"type\": \"long\" },"
+" { \"name\": \"len\", \"type\": \"long\" },"
+" { \"name\": \"offset\", \"type\": \"long\" }"
+" ],"
+" \"connect.name\": \"packet_info\" }},"
+" { \"name\": \"timestamp\", \"type\": \"string\" },"
+" { \"name\": \"sig_id\", \"type\": \"long\" },"
+" { \"name\": \"ip_type\", \"type\": \"string\" },"
+" { \"name\": \"dest_ip\", \"type\": \"string\" },"
+" { \"name\": \"priority\", \"type\": \"long\" }"
+"],"
+"\"connect.name\": \"snoqttv2\" }";
private static Schema.Parser parser = new Schema.Parser();
private static Schema schema = parser.parse(SNOQTT_SCHEMA);
static {
recordInjection = GenericAvroCodecs.toBinary(schema);
type = (StructType) SchemaConverters.toSqlType(schema).dataType();
}
public static void main(String[] args) throws StreamingQueryException{
// Set log4j untuk development langsung dari java
LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
LogManager.getLogger("akka").setLevel(Level.ERROR);
// Set konfigurasi untuk streamcontext dan sparkcontext
SparkConf conf = new SparkConf()
.setAppName("Snoqtt-Avro-Structured")
.setMaster("local[*]");
// Inisialisasi spark session
SparkSession sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();
// Reduce task number
sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");
// Mulai data stream di kafka
Dataset<Row> ds1 = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "snoqttv2")
.option("startingOffsets", "latest")
.load();
// Mulai streaming query
sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get();
return RowFactory.create(
record.get("timestamp").toString(),
record.get("device_id").toString(),
record.get("ip_type").toString(),
record.get("src_ip").toString(),
record.get("dest_ip").toString(),
record.get("src_mac").toString(),
record.get("dest_mac").toString(),
record.get("alert_msg").toString(),
record.get("sig_rev").toString(),
record.get("sig_gen").toString(),
record.get("sig_id").toString(),
record.get("classification").toString(),
record.get("priority").toString());
}, DataTypes.createStructType(type.fields()));
ds1.printSchema();
Dataset<Row> ds2 = ds1
.select("value").as(Encoders.BINARY())
.selectExpr("deserialize(value) as rows")
.select("rows.*");
ds2.printSchema();
StreamingQuery query1 = ds2
.groupBy("sig_id")
.count()
.writeStream()
.queryName("Signature ID Count Query")
.outputMode("complete")
.format("console")
.start();
query1.awaitTermination();
}
这一切都很有趣和游戏,直到我收到第一批消息,它遇到了错误
18/01/22 14:29:00 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 8) org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (binary) => struct,timestamp:string,sig_id:bigint,ip_type:string,dest_ip:string,priority:bigint>) at ...
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@232f8415 at ...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25 at ...
我做错了吗?或者我的嵌套架构是我的代码中的邪恶根源?感谢你们的帮助
最佳答案
刚刚使用嵌套架构和新的 avro 数据源的示例更新了存储库。 Repo
在使用新数据源之前,我尝试使用双射库并遇到与您发布的相同错误,但修复了它,删除 Kafka 临时文件夹以重置旧的排队数据。
最好的
关于java - "Malformed data length is negative",当尝试使用带有 Avro 数据源的 kafka 的 Spark 结构化流时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48392301/
我试图通过预准备语句使用同一连接执行多个查询,但无法完全实现! 代码片段: public class PostPrReqDaoImpl implements PostPrReqDaoInterface
我目前有一个 2 列宽的 DataGridView,第一列是 DataGridViewTextBoxColumn,第二列是 DataGridViewComboBoxColumn。我还有一个预生成的通用
当我在一台机器上运行以下代码时,我得到了 org.apache.tomcat.dbcp.dbcp.BasicDataSource 的 tomcat 实现,当我在另一台机器上运行它时,我得到了 org.
不确定这是否可行,但这是我的设置。 我有一台带有双启动功能的笔记本电脑。 一个一个分区我有 WinXP 和 MSAccess 2000在另一个分区上,Ubuntu 10.04,带有 apache we
我试过: czmlDataSource.load(czmlurl).then(function(){ viewer.dataSource
我有一个 TableView 和一个数组源。当我在 viewDidLoad 方法中初始化数组时,tableview 显示数组中的数据。当我从 Internet 上的 XML 数据的 URL 填充数组时
我对 DataSource 和 SessionFactory 之间的区别感到困惑。 我认为SessionFactory是一个用于检索 session 的管理器(我猜这实际上是与数据库的连接)。 Dat
我想存储大量(~数千)个字符串并能够使用通配符执行匹配。 例如,这里是一个示例内容: Folder1 文件夹 1/Folder2 Folder1/* Folder1/Folder2/Folder3 文
我有一个 DataGridView 和一个从 SQL 表填充的一些对象的列表。我曾使用两种方法将列表绑定(bind)到网格。 1.直接使用列表到数据源 grdSomeList.DataSource =
我正在尝试在 DataGridView 中设置一些内容。看起来这应该很简单,但我遇到了麻烦。我想显示三列: 代码ID 代号 带有 TypeName 的 DisplayMember 和 TypeID 的
在我的 Config.groovy我把线: grails.config.locations = [ "classpath:app-config.properties"] 我在哪里设置数据源的定义。文件
为了这个问题,假设我有一个包含各种酒类的 Excel 数据源电子表格。 (Cell A) | (Cell B) Bacardi | Rum Smirnoff | Vodka Another Vodka
由于我经常使用第三方 API,我认为创建一些 Magento 模块以实现轻松连接和查询它们会很有帮助。理想情况下,您可以像这样查询 API... $data = Mage::getModel( 'to
将后台线程频繁更新的数据源与 GUI 主线程同步的最佳方法是什么? 我应该在每个方法调用周围放置一个 pthread 互斥体吗?这对我来说似乎也很重。 编辑:我正在寻找 10.5 解决方案 最佳答案
经过几个小时的点击和试用,在查看各种帖子寻求帮助后,这段代码终于起作用了。但我希望有人帮助我理解函数(i,dat),这意味着什么?下面是我的完整代码 - function get_assignedta
我使用的是 Wildfly 10.1 版本,有两个数据源,如下所示, jdbc:mysql://${dbhostn
我正在学习数据源,我想我开始理解它,但我不明白这一段。 据我所知,MySQL 和 PostgreSQL 等数据库供应商编写了自己的不同 DataSource 接口(interface)的实现。现在,这
我有一个关于 TomEE 和使用 tomee.xml 中指定的数据源的奇怪问题。值得注意的是,我使用的是 Netbeans、TomEE 和 MySQL。在 Ubuntu 13.04(Xubuntu 最
WWDC 2019 确实充满了 iOS 的新内容以及 TableViews 和 CollectionView 的新数据源,即 UITableViewDiffableDataSource . 我已成功将
我在独立模式下运行 jboss 并将 standalone.xml 中的数据源设置为以下内容: jdbc:sqlserver://myip:1433;databaseNam
我是一名优秀的程序员,十分优秀!