gpt4 book ai didi

javascript - 循环内的 Promise 闭包

转载 作者:太空宇宙 更新时间:2023-11-04 02:55:25 25 4
gpt4 key购买 nike

我每秒从 Kafka 接收多行数据。对于每批数据,我都会插入到我的数据库中。

我的应用程序不断读取每批的最后一个消息id。这里的问题是,promise 不是串行运行的,而是在一批完成后并发运行的,并且它们不断读取相同的 messageid。我希望每个 Promise 都有自己的 messageid,按照它们从第一个函数的 for 循环进入的顺序定义。

我认为我需要使用闭包,但是我不确定如何在这里应用它们。我不想使用计时器!

谢谢!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}

// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";

DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}

编辑(danh的解决方案):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}

batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});

function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}

编辑我通过添加 element.map(processUpdate) 稍微修改了 processQueue 函数,因为从 batchOfRows 接收的批处理实际上是数组,并且我需要对该数组内的每个项目执行数据库查询。

我还删除了 results.push(elementResult) 因为 elementResult 实际上由于某种原因未定义。我已将 results.push(elementResult) 移至 insertIntoDB 并将其命名为 results.push(result)。这可能是错误的根源(我不知道如何将 insertIntoDB 的结果返回到调用 Promise 函数 processQueue)。

如果你看一下 insertIntoDB,如果我 console.log("test") 它将打印 test 的次数与 batchOfRows 数组中元素的次数相同,这表明它已经解决了该批处理中的所有 promise 。因此,在第一批/消息中,如果有 72 行,它将打印“Test”72 次。但是,如果我将 console.log("Test") 更改为简单的 results.push(result),甚至 results.push("test"),然后打印 results.length,即使我预计长度为 72,它仍然会给我 0,直到第二批完成。

最佳答案

稍微抽象一下这些想法,并在数据中明确地表示它们(而不是在 promise 中隐式保留数据)可能会有所帮助。从队列开始:

let queue = [];

使用 queue.push(element) 将内容添加到队列,并使用 element =queue.shift() 按到达顺序获取和删除

我们的目标是按顺序处理队列中的所有内容,并按顺序保存结果。处理本身是异步的,我们希望在开始下一个队列项目之前完成一个队列项目,因此我们需要一系列 promise (称为 promise )来处理队列:

let results = [];
let promises = Promise.resolve();

function processQueue() {
queue.forEach(element => {
promises = promises.then(processElement(element)).then(elementResult => {
results.push(elementResult);
queue.shift();
});
});
}

我们可以说服自己这是正确的,甚至不需要考虑 processElement() 做了什么,只要它返回一个 promise 。 (在OP情况下,该 promise 是处理“行”数组的 promise )。 processElement() 会做它的事情,结果(OP 情况下的结果数组)将被推送到 results

确信操作顺序是有意义的,当新批处理到达时,将其添加到队列中,然后处理队列中的所有内容:

batchOfRows.on('message', function (data) {
queue.push(batchOfRows.rows);
processQueue();
});

我们只需要定义processElement()。在这里使用 @YuryTarabanko 的有用建议(并在我看来,将他的答案标记为正确)

function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
}

这样做的一个很好的副作用是您可以衡量进度。如果输入到达太快,则表达式:

queue.length - results.length

...会随着时间的推移而增长。

编辑查看较新的代码,我很困惑为什么要对每一行(batchOfRows.rows中的每个元素)进行查询。由于该查询的结果被忽略,因此不要这样做...

function processElement(data) {
const id = data.date + data.location
// we know everything we need to know to call insert (data and id)
// just call it and return what it returns :-)
return insertIntoDB(data, id);
}

我现在明白这将是一项长期运行的任务,并且它不应该累积结果(即使是线性的)。更干净的修复方法是删除我建议的对 results 数组的所有引用。 insert 的最小版本只是插入并返回插入的结果...

function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)]);
}

我认为您添加了一些代码来记录结果(一个更好的测试是通过一些外部进程检查数据库,但如果您想记录,只需记住在记录后传递结果值。

anyPromise.then(result => {
console.log(result);
return result; // IMPORTANT
})

关于javascript - 循环内的 Promise 闭包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51001750/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com