gpt4 book ai didi

大型数据库插入的时间戳之间的 Java 计数

转载 作者:行者123 更新时间:2023-11-29 14:37:45 25 4
gpt4 key购买 nike

我有一个连接到 MQTT 代理的 java 程序。我需要为来自代理的每条传入消息插入一行。

消息表架构

   Column   |              Type              |                     Modifiers
------------+--------------------------------+------------------------------------
content | character(255) |
user_id | character(255) |
sent_at | timestamp(6) without time zone | default ('now'::text)::timestamp(6) with time zone
message_id | character(255) |
status | character(1) | default 'w'::bpchar

我需要在一个时间间隔内跟踪消息。

我的主要 Java 应用程序建立了一个数据库连接并包含一个 MQTT 监听器,它为每个传入的新消息插入一行。

@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//System.out.println("New Msg");
//System.out.println(s);
insertMessage(mqttMessage);

}

消息插入方式

/***
*
* @param mqttMessage
*/
private static void insertMessage(MqttMessage mqttMessage) {
arrived++ ;
try {
String mysql = "insert into messages (content, message_id, user_id, sent_at, status) values ('" + mqttMessage.getPayload() + "', " + arrived + ", " + arrived + ", " + " CURRENT_TIMESTAMP (6) " + ", " + "'w'" + ") RETURNING sent_at";
//System.out.println (mysql);
ResultSet resultSet = statement.executeQuery(mysql);
if (resultSet.next()) {
// Log the last timestamp
System.out.println(resultSet.getTimestamp(1));
}
} catch (SQLException e) {
//System.out.println("Failed !");
e.printStackTrace();
}
//System.out.println(arrived);
}

在同一个程序中,我实现了一个 java 类,它具有 db 连接 并保持 latestTimestamp

我使用 Executors.newScheduledThreadPool 检查每 10 秒插入的消息数并更新最新的时间戳。获取最后插入时间戳的方法是:

/**
* Used to update the latest value from the db
*/
private void updateTimestamp() throws SQLException {
//timestamp = new Timestamp(System.currentTimeMillis());
resultSet = statement.executeQuery( "select sent_at from messages order by sent_at desc limit 1 ;");
if (resultSet.next()) {
// Supposed to be the latest inserted row and the latest timestamp in the db
latestTimestamp = resultSet.getTimestamp(1);
System.out.print("new timestamp ==> ");
System.out.println(timestamp);
} else {
timestamp = Timestamp.valueOf(Constants.MIN_TIMESTAMP_VALUE);
}
}

然后当我需要获取在最新更新日期之后插入的消息计数时,我使用比较时间戳的查询。

/**
* This function get all messages that have been sent from latest timestamp
*
* @return
* @throws SQLException
*/
private ResultSet getMQTTMessagesDelayed() throws SQLException {
oldTimeStamp = latestTimestamp ;
// Update the new timestamp to reduce losing time in execution
updateTimestamp();
mysql = "Select count(*) as cn from messages where sent_at > '" + oldTimeStamp + "' ;";
System.out.println(mysql);
return statement.executeQuery(mysql);

}

现在的问题是,对于从 ~5000 条开始的大量消息,我希望在计算 select count 的总和时得到正确的一些消息,例如,如果我发送大量 5000 毫秒,当计划的线程这次执行并获得 2500 作为计数,我需要在下一个纪元时间(接下来的 10 秒)获得 2500,情况并非如此,我得到了一些不正确的结果(大约 45/20有区别!)。

注意事项

  • 使用 Mysql 和 postgres 测试

  • 8 GB 内存

  • Windows 10

  • Java 8

最佳答案

如果两个线程并行运行,一个插入,另一个从同一个表中选择,您几乎永远不会得到可预测的结果,并且您的性能可能会随着消息表的增长而下降。我的理解是您只想保留在两个给定日期之间插入的消息数。这些日期都在相当短的时间间隔内(10 秒)。因此,我认为如果您使用内存中的列表来跟踪传入的消息,工作线程会按预定的时间间隔丢弃最旧的元素,这样会好得多。

此外,您不需要从 INSERT 中检索结果集。相反,在客户端生成 sent_at Date 字段然后使用 PreparedStatement 参数或 STR_TO_DATE MySQL 函数或 { ts 'YYYY-MM-DD HH:mm:SS' } 标准将其传递到 INSERT SQL 语句中会快得多日期的 JDBC 转义语法。

您的 insertMessage 将变成

private static void insertMessage(MqttMessage mqttMessage) {
arrived++ ;
try {
Date now = new Date();
SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH.mm.ss");
String mysql = "insert into messages (content, message_id, user_id, sent_at, status) values ('" + mqttMessage.getPayload() + "', " + arrived + ", " + arrived + ", { ts '" + fmt.format(now) + "' }, " + "'w'" + ")";
statement.executeUpdate(mysql);
messageList.add(now);
} catch (SQLException e) {
e.printStackTrace();
}
}

和(假设您只有一个编写器线程)一个用于跟踪传入消息的列表的示例实现,例如

import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.Collections;

public class MessageList implements AutoCloseable {

private List<Date> messages;
private CleanUp cleaner;

private final long MAX_KEEP_TRACK = 20l;
private final long RUN_EVERY_SECS = 10l;

public MessageList() {
messages = Collections.synchronizedList(new LinkedList<Date>());
cleaner = new CleanUp(messages, MAX_KEEP_TRACK, RUN_EVERY_SECS);
cleaner.start();
}

@Override
public void close() throws Exception {
cleaner.stop();
}

public void add(Date messageDate) {
messages.add(messageDate);
}

public int countBetween(Date start, Date end) {
int count =0;
for (Date d : messages) {
if (d.compareTo(end)>0) {
break;
} else if (d.compareTo(start)>=0) {
count++;
}
}
return count;
}

private class CleanUp extends Thread {

private List<Date> msgs;
private long maxKeepMilis;
private long runEveryMilis;
private boolean stop;

public CleanUp(List<Date> messages, long maxKeepSecs, long runEverySecs) {
msgs = messages;
maxKeepMilis = maxKeepSecs * 1000l;
runEveryMilis = runEverySecs * 1000l;
stop = false;
}

@Override
public void run() {
Date d;
while(!stop) {
long now = new Date().getTime();
while ((d=msgs.get(0))!=null)
if (now-d.getTime()>maxKeepMilis)
msgs.remove(0);
try {
Thread.sleep(runEveryMilis);
} catch (InterruptedException e) { }
}
}
}
}

然后您只需调用 messageList.countBetween() 即可获取两个日期之间收到的消息数。

关于大型数据库插入的时间戳之间的 Java 计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41891652/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com