- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我每秒从 Kafka 接收多行数据。对于每批数据,我都会插入到我的数据库中。
我的应用程序不断读取每批的最后一个消息
和id
。这里的问题是,promise 不是串行运行的,而是在一批完成后并发运行的,并且它们不断读取相同的 message
和 id
。我希望每个 Promise 都有自己的 message
和 id
,按照它们从第一个函数的 for 循环进入的顺序定义。
我认为我需要使用闭包,但是我不确定如何在这里应用它们。我不想使用计时器!
谢谢!
// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});
// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}
// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";
DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}
编辑(danh的解决方案):
var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}
batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}
编辑我通过添加 element.map(processUpdate) 稍微修改了 processQueue 函数,因为从 batchOfRows 接收的批处理实际上是数组,并且我需要对该数组内的每个项目执行数据库查询。
我还删除了 results.push(elementResult) 因为 elementResult 实际上由于某种原因未定义。我已将 results.push(elementResult) 移至 insertIntoDB 并将其命名为 results.push(result)。这可能是错误的根源(我不知道如何将 insertIntoDB 的结果返回到调用 Promise 函数 processQueue)。
如果你看一下 insertIntoDB,如果我 console.log("test") 它将打印 test 的次数与 batchOfRows 数组中元素的次数相同,这表明它已经解决了该批处理中的所有 promise 。因此,在第一批/消息中,如果有 72 行,它将打印“Test”72 次。但是,如果我将 console.log("Test") 更改为简单的 results.push(result),甚至 results.push("test"),然后打印 results.length,即使我预计长度为 72,它仍然会给我 0,直到第二批完成。
最佳答案
稍微抽象一下这些想法,并在数据中明确地表示它们(而不是在 promise 中隐式保留数据)可能会有所帮助。从队列开始:
let queue = [];
使用 queue.push(element)
将内容添加到队列,并使用 element =queue.shift()
按到达顺序获取和删除
我们的目标是按顺序处理队列中的所有内容,并按顺序保存结果。处理本身是异步的,我们希望在开始下一个队列项目之前完成一个队列项目,因此我们需要一系列 promise (称为 promise
)来处理队列:
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(processElement(element)).then(elementResult => {
results.push(elementResult);
queue.shift();
});
});
}
我们可以说服自己这是正确的,甚至不需要考虑 processElement()
做了什么,只要它返回一个 promise 。 (在OP情况下,该 promise 是处理“行”数组的 promise )。 processElement()
会做它的事情,结果(OP 情况下的结果数组)将被推送到 results
。
确信操作顺序是有意义的,当新批处理到达时,将其添加到队列中,然后处理队列中的所有内容:
batchOfRows.on('message', function (data) {
queue.push(batchOfRows.rows);
processQueue();
});
我们只需要定义processElement()
。在这里使用 @YuryTarabanko 的有用建议(并在我看来,将他的答案标记为正确)
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
}
这样做的一个很好的副作用是您可以衡量进度。如果输入到达太快,则表达式:
queue.length - results.length
...会随着时间的推移而增长。
编辑查看较新的代码,我很困惑为什么要对每一行(batchOfRows.rows
中的每个元素)进行查询。由于该查询的结果被忽略,因此不要这样做...
function processElement(data) {
const id = data.date + data.location
// we know everything we need to know to call insert (data and id)
// just call it and return what it returns :-)
return insertIntoDB(data, id);
}
我现在明白这将是一项长期运行的任务,并且它不应该累积结果(即使是线性的)。更干净的修复方法是删除我建议的对 results
数组的所有引用。 insert 的最小版本只是插入并返回插入的结果...
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)]);
}
我认为您添加了一些代码来记录结果(一个更好的测试是通过一些外部进程检查数据库,但如果您想记录,只需记住在记录后传递结果值。
anyPromise.then(result => {
console.log(result);
return result; // IMPORTANT
})
关于javascript - 循环内的 Promise 闭包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51001750/
如何从 promise 中退出 promise ? perl6 文档没有提供简单的方法。例如: my $x = start { loop { # loop forever until "qui
我的用户 Controller 中有一个索引操作,其中我试图连续做两件事,并且在它们都有机会完成之前不执行所需的 res.json() 方法。 我有一个加入用户的友谊加入模型。一列是 friender
请帮我解释一下为什么日志结果有两种不同: 方式 1:每 1 秒顺序记录一次 方式 2:1 秒后记录所有元素。 // Way 1 let sequence = Promise.resolve(); [1
我的问题很简单。 Promise.all() 方法可以返回 Promise 吗?让我解释一下: function simpleFunction() { let queue = [];
我正在使用 Promise 从存储中读取文件并转换为 base64 字符串。我有图像数组,使用 RNFS 读取图像 const promise_Images = _Images.map(async (
如果使用非空数组调用 Promise.all 或 Promise.race,它们将返回一个待处理的 Promise: console.log(Promise.all([1])); // prints
Promise.all 是否可以在没有包装 promise 的情况下返回链的最后一个值? 如果不使用 await,它在我的上下文中不起作用 没有包装的例子: function sum1(x){ r
我一直在玩 promise,通常能想出如何处理好它们,但在这种情况下,我不知道如何删除一个 promise-wrapping level。 代码如下: let promise2 = promise1.
考虑以下嵌套的Promises结构: const getData = async() => { const refs = [{ name: "John33", age: 3
我已经阅读了 Promise/A+ 规范,但据我了解,还有诸如 Promise/A 和 Promise 之类的东西。它们之间有什么区别? Promise 和 Promise/A 规范也是如此吗?如果是
当我运行以下代码时: my $timer = Promise.in(2); my $after = $timer.then({ say "2 seconds are over!"; 'result'
以下简单的 promise 是发誓的,我不允许打破它。 my $my_promise = start { loop {} # or sleep x; 'promise re
我正在尝试扩展Promise: class PersistedPromise extends Promise { } 然后在派生类上调用静态resolve以直接创建一个已解决的Promise: Per
我有两个返回 promise 的函数,我独立使用它们作为: getLocal().then(...) 和 getWeb().then(...) 但是现在我遇到了一个奇怪的问题: 1) 我需要第三个
我不知道 promise.all 解决方案中的 promise.all 是否是一个好的实践。我不确定。 我需要从一组用户获取信息,然后通过此信息响应,我需要发送消息通知。 let userList =
我一直在尝试使用 queueMicrotask() 函数,但我没有弄清楚当回调是微任务时回调的优先级如何。查看以下代码: function tasksAndMicroTasks() { const
我一直在尝试使用 queueMicrotask() 函数,但我没有弄清楚当回调是微任务时回调的优先级如何。查看以下代码: function tasksAndMicroTasks() { const
今年早些时候,我在 Pharo Smalltalk 参与了一个 promise 项目。这个想法是为了实现以下行为: ([ 30 seconds wait. 4 ]promiseValue )then:
大家好,提前感谢您的帮助。 下面是我正在尝试做的事情 function1(){ throw some error(); } function2() { // dosomething suc
我有以下未解析的代码。f2 解决了,所以我不会添加该代码,它是 f1 我有问题。 我调用函数,它到达最里面如果,它调用函数“find”,它执行函数 findId,完美返回 Id,然后执行 editId
我是一名优秀的程序员,十分优秀!