gpt4 book ai didi

java - 如何将队列中的数据写入MongoDB

转载 作者:太空宇宙 更新时间:2023-11-04 09:37:15 24 4
gpt4 key购买 nike

我有一个正在运行的zookeeper + kafka,并且我已成功向kafka生产者发送推文。这些推文是从队列中获取的:

queue = new LinkedBlockingQueue<>(10000);
public void run() {
client.connect();
try (Producer<Long, String> producer = getProducer()) {
while (true) {
Tweet tweet = gson.fromJson(queue.take(), Tweet.class);
System.out.printf("Fetched tweet id %d\n", tweet.getId());
long key = tweet.getId();
String msg = tweet.toString();
ProducerRecord<Long, String> record = new ProducerRecord<>(KafkaConfiguration.TOPIC, key, msg);
producer.send(record, callback);


}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.stop();
}

我的问题是如何将已经接收到的对象(Tweet 类)写入 MongoDB?我在本地主机上有这样的配置设置:

//MongoDB config
int port_no = 27017;
String host_name = "localhost", db_name = "bigdata", db_coll_name = "twitter";

// Mongodb connection string.
String client_url = "mongodb://" + host_name + ":" + port_no + "/" + db_name;
MongoClientURI uri = new MongoClientURI(client_url);

// Connecting to the mongodb server using the given client uri.
MongoClient mongo_client = new MongoClient(uri);

// Fetching the database from the mongodb.
MongoDatabase db = mongo_client.getDatabase(db_name);

// Fetching the collection from the mongodb.
MongoCollection<Document> coll = db.getCollection(db_coll_name);

有没有办法用 JSON 反序列化它?任何建议将不胜感激。提前致谢。

最佳答案

我建议使用Kafka Connect MongoDB Sink Connector为了将数据从 Kafka 推送到 MongoDB

带有架构的 JSON 配置示例:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

如果您使用Confluent Hub ,您可以找到有关如何安装连接器的说明 here

关于java - 如何将队列中的数据写入MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56356598/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com