- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在使用 Confluent 3.3.0。我的目的是使用 kafka-connect 将 Kafka 主题中的值插入到 Oracle 表中。我的连接器与我使用 avro console producer 制作的 avro 记录配合得很好,如下所示:
./kafka-avro-console-producer --broker-list 192.168.0.1:9092 --topic topic6 --property value.schema='{"type":"record","name":"flights3","fields":[{"name":"flight_id","type":"string"},{"name":"flight_to", "type": "string"}, {"name":"flight_from", "type": "string"}]}'
然后我插入如下值:
{"flight_id":"1","flight_to":"QWE","flight_from":"RTY"}
我想要实现的是使用 Java 应用程序使用对象插入相同的数据。以下是我的生产者代码:
public class Sender {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "serializers.custom.FlightSerializer");
props.put("schema.registry.url", "http://192.168.0.1:8081");
Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);
Flight myflight = new Flight("testflight1","QWE","RTY");
ProducerRecord<String, Flight> record = new ProducerRecord<String, Flight>("topic5","key",myflight);
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
下面是航类旁白:
package vo;
public class Flight {
String flight_id,flight_to,flight_from;
public Flight(String flight_id, String flight_to, String flight_from) {
this.flight_id = flight_id;
this.flight_to = flight_to;
this.flight_from = flight_from;
}
public Flight(){
}
public String getFlight_id() {
return flight_id;
}
public void setFlight_id(String flight_id) {
this.flight_id = flight_id;
}
public String getFlight_to() {
return flight_to;
}
public void setFlight_to(String flight_to) {
this.flight_to = flight_to;
}
public String getFlight_from() {
return flight_from;
}
public void setFlight_from(String flight_from) {
this.flight_from = flight_from;
}
}
最后是序列化程序:
package serializers.custom;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import vo.Flight;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FlightSerializer implements Serializer<Flight> {
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
}
@Override
public byte[] serialize(String arg0, Flight arg1) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsString(arg1).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
}
但我的理解是,需要定义模式之类的东西,并使用一些 avro 序列化器来获取精确数据,就像我使用 avro console consumer 所做的那样.我浏览了一些示例代码,但没有一个对我有用。
我尝试了以下代码。但是 avro console consumer 什么也没有。
package producer.serialized.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import vo.Flight;
import java.util.Properties;
public class Sender {
public static void main(String[] args) {
String flightSchema = "{\"type\":\"record\"," + "\"name\":\"flights\","
+ "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(flightSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
最佳答案
架构未定义,因此当 KafkaAvroSerializer
必须联系架构注册表以提交架构时,它不会拥有它。
你必须为你的对象 Flight
创建一个模式
下面的 file.avdl(avro 扩展文件之一)示例就可以了:
@namespace("vo")
protocol FlightSender {
record Flight {
union{null, string} flight_id = null;
union{null, string} flight_to = null;
union{null, string} flight_from = null;
}
}
At compile time, when you use the avro-maven-plugin
,上面的 avro 架构将生成您的 java Flight
类,因此您必须删除之前创建的类。
当它涉及到你的主类时,你必须像下面这样设置两个属性:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
而您的制作人,您可以使用您生成的特定 Avro 类
Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);
希望对您有所帮助:-)
关于java - 如何像使用 avro console producer 一样生成 Kafka avro 记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48195856/
已经有几个关于捕获或重定向 console.log 的问题: redirect Javascript syntax errors and console.log to somewhere else C
console.log(String(console.log('Not undefined')) === 'undefined'); console.log(String(console.log('N
我知道这是一个新手错误,但我不知道如何修复它。 public static void main (String args[]){ Console kitty = System.console(); S
我正在使用 Visual Studio 2015。 我试图打印一些语句只是为了跟踪一个非常长时间运行的测试。当使用 VSTest.Console 和/Logger:trx 时,调试输出(无论我们使用
这个问题在这里已经有了答案: Accessing console and devtools of extension's background.js (8 个回答) 5年前关闭。 我的 Chrome
我在括号中收到此错误。 我想强调一个事实,这是我第二次打开 JS 文件。 正如我强调的那样,我还想强调一个事实,即我不知道 Eslint 和 node.js 是什么。 StackOverflow 和其
我按照文档中的描述安装了 Drupal Console Launcher: curl https://drupalconsole.com/installer -L -o drupal.phar mv
Console.WriteLine() 和有什么区别和Trace.WriteLine() ? 最佳答案 从“调试”的角度来看这些。 我们开始使用 Console.WriteLine() 进行调试 后来
我一直在尝试连接到 serial console of a Raspberry Pi 3 with Android Things使用USB to TTL cable从我的 Linux (Ubuntu)
namespace Pro { class ErrorLog { public ErrorLog(RenderWindow app) {
以下代码是一个众所周知的示例,用于显示调试版本和发布版本之间的区别: using System; using System.Threading; public static class Program
if (open_date) { open_date = get_date_from_string(open_date); window.console && cons
在 Xcode 中工作时,我通常只为控制台打开一个单独的窗口,以便我可以看到尽可能多的输出行。我今天刚刚更新到 Xcode 12,在更新之前,您可以将编辑器 Pane 和控制台 Pane 之间的分隔线
在 Google Play Console 上,在所有应用程序的第一页,它会显示已安装的受众和用户获取。 我知道已安装的受众是在他们的设备上安装我的应用程序的受众。但什么是用户获取?通常,用户获取的数
Qt Quick uses qDebug执行日志记录,其中标准 Javascript 日志记录方法映射到 Qt 日志类型 console.log() -> qDebug() console.deb
Qt Quick uses qDebug执行日志记录,其中标准 Javascript 日志记录方法映射到 Qt 日志类型 console.log() -> qDebug() console.deb
我有以下代码: bool loop = true; LongbowWorkerThread Worker = new LongbowWorkerThread(); Th
我遇到了这两个 API,用于在 C# 的简单控制台应用程序中读取用户的输入: System.Console.ReadLine() System.Console.In.ReadLine() 这是一个我试
我是编程和 js 的新手,我正在尝试学习 javascript 的关键。 var obj1 = { name: 'rawn', fn: function() { con
using System; namespace ConsoleApplication1 { class Program { static void Main(strin
我是一名优秀的程序员,十分优秀!