gpt4 book ai didi

node.js - 我如何实现将数据从mongodb流传输到elasticsearch的 Node JS worker?

转载 作者:行者123 更新时间:2023-12-03 01:14:00 25 4
gpt4 key购买 nike

我正在构建一个基于CDC的应用程序,该应用程序使用Mongo Change Streams侦听更改事件并以接近实时的方式索引elasticsearch中的更改。
到目前为止,我已经实现了一个工作程序,该工作程序调用一个函数来捕获事件,对其进行转换并在为1个mongo集合实现流时在elasticsearch中对其进行索引没有问题:

function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
我已经使用无限循环实现了它(可能是一种不好的方法),但是我不确定当我必须使变更流永远存活时有什么替代方法。
问题出在我必须为另一个模型实现变更流时。由于第一个函数有一个while循环正在阻塞,因此工作人员无法调用第二个函数来启动第二个更改流。
我想知道最好的方法是启动一个可以触发x no的 worker 。变更流的数量,而不会影响每个变更流的性能。 worker 线程是正确的方法吗?

最佳答案

在Node.js中使用三种主要方法来处理变更流。

  • 您可以使用EventEmitter的on()函数监视变更流。
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
    const changeStream = collection.watch(pipeline);

    // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
    // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
    // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
    changeStream.on('change', (next) => {
    console.log(next);
    });

    // Wait the given amount of time and then close the change stream
    await closeChangeStream(timeInMs, changeStream);
  • 您可以使用hasNext()监视变更流。
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
    const changeStream = collection.watch(pipeline);

    // Set a timer that will close the change stream after the given amount of time
    // Function execution will continue because we are not using "await" here
    closeChangeStream(timeInMs, changeStream);

    // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
    // If the change stream is closed, hasNext() will return false so the while loop will exit.
    // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
    while (await changeStream.hasNext()) {
    console.log(await changeStream.next());
    }
  • 您可以使用Stream API监视更改流
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
    const changeStream = collection.watch(pipeline);

    // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
    changeStream.pipe(
    new stream.Writable({
    objectMode: true,
    write: function (doc, _, cb) {
    console.log(doc);
    cb();
    }
    })
    );

    // Wait the given amount of time and then close the change stream
    await closeChangeStream(timeInMs, changeStream);

  • 如果您的MongoDB数据库托管在Atlas( https://cloud.mongodb.com)上,则最简单的方法是创建 Trigger。 Atlas可以为您编写变更流代码,因此您只需编写将转换事件并在Elasticsearch中对其进行索引的代码。
    in my blog post提供了有关使用变更流和触发器的更多信息。上面所有片段的完整代码示例可在 GitHub上找到。

    关于node.js - 我如何实现将数据从mongodb流传输到elasticsearch的 Node JS worker?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63300167/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com