- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我每秒从 Kafka 接收多行数据。对于每批数据,我都会插入到我的数据库中。
我的应用程序不断读取每批的最后一个消息
和id
。这里的问题是,promise 不是串行运行的,而是在一批完成后并发运行的,并且它们不断读取相同的 message
和 id
。我希望每个 Promise 都有自己的 message
和 id
,按照它们从第一个函数的 for 循环进入的顺序定义。
我认为我需要使用闭包,但是我不确定如何在这里应用它们。我不想使用计时器!
谢谢!
// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});
// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}
// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";
DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}
编辑(danh的解决方案):
var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}
batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}
编辑我通过添加 element.map(processUpdate) 稍微修改了 processQueue 函数,因为从 batchOfRows 接收的批处理实际上是数组,并且我需要对该数组内的每个项目执行数据库查询。
我还删除了 results.push(elementResult) 因为 elementResult 实际上由于某种原因未定义。我已将 results.push(elementResult) 移至 insertIntoDB 并将其命名为 results.push(result)。这可能是错误的根源(我不知道如何将 insertIntoDB 的结果返回到调用 Promise 函数 processQueue)。
如果你看一下 insertIntoDB,如果我 console.log("test") 它将打印 test 的次数与 batchOfRows 数组中元素的次数相同,这表明它已经解决了该批处理中的所有 promise 。因此,在第一批/消息中,如果有 72 行,它将打印“Test”72 次。但是,如果我将 console.log("Test") 更改为简单的 results.push(result),甚至 results.push("test"),然后打印 results.length,即使我预计长度为 72,它仍然会给我 0,直到第二批完成。
最佳答案
稍微抽象一下这些想法,并在数据中明确地表示它们(而不是在 promise 中隐式保留数据)可能会有所帮助。从队列开始:
let queue = [];
使用 queue.push(element)
将内容添加到队列,并使用 element =queue.shift()
按到达顺序获取和删除
我们的目标是按顺序处理队列中的所有内容,并按顺序保存结果。处理本身是异步的,我们希望在开始下一个队列项目之前完成一个队列项目,因此我们需要一系列 promise (称为 promise
)来处理队列:
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(processElement(element)).then(elementResult => {
results.push(elementResult);
queue.shift();
});
});
}
我们可以说服自己这是正确的,甚至不需要考虑 processElement()
做了什么,只要它返回一个 promise 。 (在OP情况下,该 promise 是处理“行”数组的 promise )。 processElement()
会做它的事情,结果(OP 情况下的结果数组)将被推送到 results
。
确信操作顺序是有意义的,当新批处理到达时,将其添加到队列中,然后处理队列中的所有内容:
batchOfRows.on('message', function (data) {
queue.push(batchOfRows.rows);
processQueue();
});
我们只需要定义processElement()
。在这里使用 @YuryTarabanko 的有用建议(并在我看来,将他的答案标记为正确)
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
}
这样做的一个很好的副作用是您可以衡量进度。如果输入到达太快,则表达式:
queue.length - results.length
...会随着时间的推移而增长。
编辑查看较新的代码,我很困惑为什么要对每一行(batchOfRows.rows
中的每个元素)进行查询。由于该查询的结果被忽略,因此不要这样做...
function processElement(data) {
const id = data.date + data.location
// we know everything we need to know to call insert (data and id)
// just call it and return what it returns :-)
return insertIntoDB(data, id);
}
我现在明白这将是一项长期运行的任务,并且它不应该累积结果(即使是线性的)。更干净的修复方法是删除我建议的对 results
数组的所有引用。 insert 的最小版本只是插入并返回插入的结果...
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)]);
}
我认为您添加了一些代码来记录结果(一个更好的测试是通过一些外部进程检查数据库,但如果您想记录,只需记住在记录后传递结果值。
anyPromise.then(result => {
console.log(result);
return result; // IMPORTANT
})
关于javascript - 循环内的 Promise 闭包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51001750/
我是 PHP 新手。我一直在脚本中使用 for 循环、while 循环、foreach 循环。我想知道 哪个性能更好? 选择循环的标准是什么? 当我们在另一个循环中循环时应该使用哪个? 我一直想知道要
我在高中的编程课上,我的作业是制作一个基本的小计和顶级计算器,但我在一家餐馆工作,所以制作一个只能让你在一种食物中读到。因此,我尝试让它能够接收多种食品并将它们添加到一个价格变量中。抱歉,如果某些代码
这是我正在学习的一本教科书。 var ingredients = ["eggs", "milk", "flour", "sugar", "baking soda", "baking powder",
我正在从字符串中提取数字并将其传递给函数。我想给它加 1,然后返回字符串,同时保留前导零。我可以使用 while 循环来完成此操作,但不能使用 for 循环。 for 循环只是跳过零。 var add
编辑:我已经在程序的输出中进行了编辑。 该程序要求估计给定值 mu。用户给出一个值 mu,同时还提供了四个不等于 1 的不同数字(称为 w、x、y、z)。然后,程序尝试使用 de Jaeger 公式找
我正在编写一个算法,该算法对一个整数数组从末尾到开头执行一个大循环,其中包含一个 if 条件。第一次条件为假时,循环可以终止。 因此,对于 for 循环,如果条件为假,它会继续迭代并进行简单的变量更改
现在我已经习惯了在内存非常有限的情况下进行编程,但我没有答案的一个问题是:哪个内存效率更高;- for(;;) 或 while() ?还是它们可以平等互换?如果有的话,还要对效率问题发表评论! 最佳答
这个问题已经有答案了: How do I compare strings in Java? (23 个回答) 已关闭 8 年前。 我正在尝试创建一个小程序,我可以在其中读取该程序的单词。如果单词有 6
这个问题在这里已经有了答案: python : list index out of range error while iteratively popping elements (12 个答案) 关
我正在尝试向用户请求 4 到 10 之间的整数。如果他们回答超出该范围,它将进入循环。当用户第一次正确输入数字时,它不会中断并继续执行 else 语句。如果用户在 else 语句中正确输入数字,它将正
我尝试创建一个带有嵌套 foreach 循环的列表。第一个循环是循环一些数字,第二个循环是循环日期。我想给一个日期写一个数字。所以还有另一个功能来检查它。但结果是数字多次写入日期。 Out 是这样的:
我想要做的事情是使用循环创建一个数组,然后在另一个类中调用该数组,这不会做,也可能永远不会做。解决这个问题最好的方法是什么?我已经寻找了所有解决方案,但它们无法编译。感谢您的帮助。 import ja
我尝试创建一个带有嵌套 foreach 循环的列表。第一个循环是循环一些数字,第二个循环是循环日期。我想给一个日期写一个数字。所以还有另一个功能来检查它。但结果是数字多次写入日期。 Out 是这样的:
我正在模拟一家快餐店三个多小时。这三个小时分为 18 个间隔,每个间隔 600 秒。每个间隔都会输出有关这 600 秒内发生的情况的统计信息。 我原来的结构是这样的: int i; for (i=0;
这个问题已经有答案了: IE8 for...in enumerator (3 个回答) How do I check if an object has a specific property in J
哪个对性能更好?这可能与其他编程语言不一致,所以如果它们不同,或者如果你能用你对特定语言的知识回答我的问题,请解释。 我将使用 c++ 作为示例,但我想知道它在 java、c 或任何其他主流语言中的工
这个问题不太可能帮助任何 future 的访问者;它只与一个小的地理区域、一个特定的时间点或一个非常狭窄的情况有关,这些情况并不普遍适用于互联网的全局受众。为了帮助使这个问题更广泛地适用,visit
我是 C 编程和编写代码的新手,以确定 M 测试用例的质因数分解。如果我一次只扫描一次,该功能本身就可以工作,但是当我尝试执行 M 次时却惨遭失败。 我不知道为什么 scanf() 循环有问题。 in
这个问题已经有答案了: JavaScript by reference vs. by value [duplicate] (4 个回答) 已关闭 3 年前。 我在使用 TSlint 时遇到问题,并且理
我尝试在下面的代码中添加 foreach 或 for 循环,以便为 Charts.js 创建多个数据集。这将允许我在此折线图上创建多条线。 我有一个 PHP 对象,我可以对其进行编码以稍后填充变量,但
我是一名优秀的程序员,十分优秀!