- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我编写了一个小的 Java 程序,它应该监视目录中的新文件并将它们以二进制 Avro 格式发送到 Kafka 主题。我是 Avro 的新手,我使用 Avro 文档和在线示例编写了这篇文章。监控部分运行良好,但程序在进入 Avro 序列化时在运行时失败。我得到这个错误堆栈:
Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
at org.apache.avro.generic.GenericDatumWriter.writeBytes(GenericDatumWriter.java:260)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:116)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at producers.AvroBinaryProducer.buildAvroData(AvroBinaryProducer.java:90)
at producers.AvroBinaryProducer.start(AvroBinaryProducer.java:120)
at producers.AvroBinaryProducer.main(AvroBinaryProducer.java:140)
C:\Users\guys\AppData\Local\NetBeans\Cache\8.1\executor-snippets\run.xml:53: Java returned: 1
BUILD FAILED (total time: 7 seconds)
此行失败:writer.write(datum,encoder);
它似乎期待一个 ByteBuffer,而文档和示例说我应该通过 GenericRecord。我做错了什么?
这是我的代码(还有另一个名为 Config 的实用程序类,它从文件中读取配置参数,但我没有在此处包含它):
package producers;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import static java.nio.file.StandardWatchEventKinds.*;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
/**
*
* @author guys
*/
public class AvroBinaryProducer {
String mySchema;
Schema avroSchema;
Config myConf;
Producer<String, byte[]> producer;
String topic, bootstrapServers, watchDir;
Path path;
ByteArrayOutputStream out;
BinaryEncoder encoder;
public AvroBinaryProducer(String configPath) throws IOException
{
// Read initial configuration
myConf=new Config(configPath);
// first setting the kafka producer stuff
Properties props = new Properties();
props.put("bootstrap.servers",myConf.get("bootstrap.servers"));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<>(props);
topic=myConf.get("topic");
watchDir=myConf.get("watchdir");
path=FileSystems.getDefault().getPath(watchDir);
// Now define the Avro schema
mySchema="{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"photo\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"data\", \"type\": \"bytes\"}\n" +
" ]\n" +
"}";
Schema.Parser parser = new Schema.Parser();
avroSchema=parser.parse(mySchema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder( out, null );
}
private byte[] buildAvroData(String name, byte[] data) throws IOException
{
out.reset();
GenericRecord datum=new GenericData.Record(avroSchema);
datum.put("name", name);
datum.put("data",data);
DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);
writer.write(datum,encoder);
encoder.flush();
return out.toByteArray();
}
private void start() throws IOException, InterruptedException
{
String fileName;
byte[] fileData;
WatchService watcher = FileSystems.getDefault().newWatchService();
WatchKey key=path.register(watcher, ENTRY_CREATE);
while (true)
{
key = watcher.take();
// The code gets beyond this point only when a filesystem event occurs
for (WatchEvent<?> event: key.pollEvents())
{
WatchEvent.Kind<?> kind = event.kind();
if (kind==ENTRY_CREATE)
{
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
fileName=filename.toString();
System.out.println("New file "+fileName+" found !");
// We need this little delay to make sure the file is closed before we read it
Thread.sleep(500);
fileData=Files.readAllBytes(FileSystems.getDefault().getPath(watchDir+File.separator+fileName));
publishMessage(buildAvroData(fileName,fileData));
}
}
key.reset();
}
}
private void publishMessage(byte[] bytes)
{
ProducerRecord <String, byte[]> data =new ProducerRecord<>(topic, bytes);
producer.send(data);
}
public static void main (String args[])
{
AvroBinaryProducer abp;
try {
abp=new AvroBinaryProducer(args[0]);
try {
abp.start();
} catch (InterruptedException ex) {
Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
}
} catch (IOException ex) {
Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
谢谢!
最佳答案
我就是这样解决的。如果它需要 ByteBuffer,让我们给它 ByteBuffer。我将函数更改为:
private byte[] buildAvroData(String name, byte[] data) throws IOException
{
out.reset();
GenericRecord datum=new GenericData.Record(avroSchema);
datum.put("name", name);
datum.put("data",ByteBuffer.wrap(data));
DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);
writer.write(datum,encoder);
encoder.flush();
return out.toByteArray();
我只是用 ByteBuffer 包装了数据,这很有效。您必须记住从消费者端的 ByteBuffer 中提取字节数组。
关于java - 尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39330089/
我花了相当多的时间尝试优化文件哈希算法,以尽可能地提高性能。 查看我之前的 SO 主题: Get File Hash Performance/Optimization FileChannel Byte
我不太明白它们之间有什么不同,所以我对这两个包有一些疑问。 在 Google 上浏览了一下之后,似乎 Oracle 决定使用更新和增强的 NIO.2 包来更新 NIO 包,作为 JDK7 版本的一部分
在 Java 1.4 之前,通过在不同的输入流/输出流之间移动字节来处理文件是常见的做法。 自 Java 1.4 起,其中 NIO已添加,建议使用 Channels 执行相同操作。 与 NIO2在 J
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
我需要重写一些应该在 Java 6 VM 上运行的 Java 7 文件 IO 代码。 该实现使用了方便的 Java 7 功能,例如自动关闭、Paths 和 Files。 更具体地说,我需要处理像 /t
当我查看java中Scanner的源代码时,我发现一些让我困惑的事情 import java.nio.file.Path; import java.nio.*; 它们之间有什么区别,为什么它们不直接导
我的 Java 代码中几乎所有文件 I/O 操作都使用 java.nio.*。然而,在今天调试一段代码时,我注意到调试器 (Intellij IDEA 14) 显示了以下关于 java.nio.fil
奇怪的是,我无法在 Google 中找到 NIO.2 异步 IO 性能与通过 java.nio.channels.Selector 使用 NIO 的多路复用 IO 的明确答案。 所以,我的问题是:NI
我是初级 Java 程序员。 今天,我练习了如何在 java 中复制文件并尝试按照本教程进行操作 http://www.journaldev.com/861/4-ways-to-copy-file-i
我有一个指向绝对路径的 java.nio.Path: /home/user/project/resources/configuration.xml 我有第二个 java.nio.Path 指向项目的根
我开始使用java.nio.*,现在我想知道:为什么java.nio.Paths.get(..)不使用java.nio.Path 对象作为输入? 现在我总是做这样的事情: final Path bas
我是新手,正在学习 Java。我尝试在 Netbeans 7 中运行以下应用程序。 import java.io.*; import java.nio.file.*; import java.nio.
我的 Java 程序(见下文)有时会在 java.nio.File.move() 方法执行中因 java.nio.file.AccessDeniedException 崩溃。 我不明白为什么会抛出这个
所以我在这里阅读我最喜欢的软件模式书籍之一(面向模式的软件架构 - 并发和网络对象的模式),特别是关于 Proactor/Reactor 异步 IO 模式的部分。我可以看到通过使用可选 channel
我有一个方法如下,它已经正常运行了很长时间: private String loadFromFile(){ RandomAccessFile inFile = null; FileCh
我在 IntellijIDEA Community Edition 2017.3 版本中收到以下错误。该项目使用java版本“1.8.0-ea”。请给我一些解决问题的想法 Error:Internal
一 点睛 在 scatter-and-gather 场景下,可以将数据写入多个 Buffer 中。在 NIO 中,也能够同时操作多个缓冲区。在很多 Channel 实现类中,都提供了多个重载的 rea
I/O简介 在 Java 编程中,直到最近一直使用 流 的方式完成 I/O。所有 I/O 都被视为单个的字节的移动,通过一个称为 Stream 的对象一次移动一个字节。流 I/O 用于与外部世界接
一 点睛 给某一个文件加锁,防止并发访问时引起的数据不安全。 在 JUC 中,可以使用 synchronized、Lock 给共享的资源加锁,或者使用 volatile、CAS 算法等防止并发冲突。在
一 点睛 给某一个文件加锁,防止并发访问时引起的数据不安全。 在 JUC 中,可以使用 synchronized、Lock 给共享的资源加锁,或者使用 volatile、CAS 算法等防止并发冲突。在
我是一名优秀的程序员,十分优秀!