gpt4 book ai didi

Java:同时写入和读取文件

转载 作者:行者123 更新时间:2023-11-30 01:57:02 24 4
gpt4 key购买 nike

这实际上是一个设计问题。我不确定写入和读取文件是否是这里的理想解决方案。尽管如此,我将在下面概述我正在尝试做的事情:我有以下静态方法,一旦调用 objreqStreamingData 方法,它就会开始以 150 毫秒的速率不断从客户端服务器检索数据。

    public static void streamingDataOperations(ClientSocket cs) throws InterruptedException, IOException{
// call - retrieve streaming data constantly from client server,
// and write a line in the csv file at a rate of 150 milliseconds
// using bufferedWriter and printWriter (print method).
// Note that the flush method of bufferedWriter is never called,
// I would assume the data is in fact being written in buffered memory
// not the actual file.
cs.reqStreamingData(output_file); // <- this method comes from client's API.

// I would like to another thread (aka data processing thread) which repeats itself every 15 minutes.
// I am aware I can do that by creating a class that extends TimeTask and fix a schedule
// Now when this thread runs, there are things I want to do.
// 1. flush last 15 minutes of data to the output_file (Note no synchronized statement method or statements are used here, hence no object is being locked.)
// 2. process the data in R
// 3. wait for the output in R to come back
// 4. clear file contents, so that it always store data that only occurs in the last 15 minutes
}

现在,我不太熟悉多线程。我担心的是

  1. 请求数据线程和数据处理线程同时读取和写入文件,但速度不同,我是不确定数据处理线程是否会延迟请求数据线程由于数据处理比请求数据线程需要执行更多计算繁重的任务,因此数量很大。但考虑到它们是两个独立的线程,这里会发生任何错误或异常吗?
  2. 我不太支持同时写入和读取同一个文件的想法,但因为我必须使用 R 来实时处理和存储 R 的 dataframe 中的数据,所以我真的想不出其他方法接近这个。还有更好的选择吗?
  3. 有更好的设计来解决这个问题吗?

我知道这是一个很长的问题。如果您需要更多信息,请告诉我。

最佳答案

这些行(CSV 或任何其他文本)可以写入临时文件。当处理准备好开始时,唯一需要的同步发生在临时文件被新文件替换时。这保证了生产者永远不会同时写入消费者正在处理的文件。

完成后,生产者将继续向新文件添加行。使用者刷新并关闭旧文件,然后按照 R 应用程序的预期将其移动到文件。

为了进一步阐明该方法,下面是一个示例实现:

public static void main(String[] args) throws IOException {
// in this sample these dirs are supposed to exist
final String workingDirectory = "./data/tmp";
final String outputDirectory = "./data/csv";

final String outputFilename = "r.out";
final int addIntervalSeconds = 1;
final int drainIntervalSeconds = 5;

final FileBasedTextBatch batch = new FileBasedTextBatch(Paths.get(workingDirectory));
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

final ScheduledFuture<?> producer = executor.scheduleAtFixedRate(
() -> batch.add(
// adding formatted date/time to imitate another CSV line
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
),
0, addIntervalSeconds, TimeUnit.SECONDS);

final ScheduledFuture<?> consumer = executor.scheduleAtFixedRate(
() -> batch.drainTo(Paths.get(outputDirectory, outputFilename)),
0, drainIntervalSeconds, TimeUnit.SECONDS);

try {
// awaiting some limited time for demonstration
producer.get(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
System.err.println("Producer failed: " + e);
}
catch (TimeoutException e) {
System.out.println("Finishing producer/consumer...");
producer.cancel(true);
consumer.cancel(true);
}
executor.shutdown();
}

static class FileBasedTextBatch {
private final Object lock = new Object();
private final Path workingDir;
private Output output;

public FileBasedTextBatch(Path workingDir) throws IOException {
this.workingDir = workingDir;
output = new Output(this.workingDir);
}

/**
* Adds another line of text to the batch.
*/
public void add(String textLine) {
synchronized (lock) {
output.writer.println(textLine);
}
}

/**
* Moves currently collected batch to the file at the specified path.
* The file will be overwritten if exists.
*/
public void drainTo(Path targetPath) {
try {
final long startNanos = System.nanoTime();
final Output output = getAndSwapOutput();
final long elapsedMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
System.out.printf("Replaced the output in %d millis%n", elapsedMillis);
output.close();
Files.move(
output.file,
targetPath,
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING
);
}
catch (IOException e) {
System.err.println("Failed to drain: " + e);
throw new IllegalStateException(e);
}
}

/**
* Replaces the current output with the new one, returning the old one.
* The method is supposed to execute very quickly to avoid delaying the producer thread.
*/
private Output getAndSwapOutput() throws IOException {
synchronized (lock) {
final Output prev = this.output;
this.output = new Output(this.workingDir);
return prev;
}
}
}

static class Output {
final Path file;
final PrintWriter writer;

Output(Path workingDir) throws IOException {
// performs very well on local filesystems when working directory is empty;
// if too slow, maybe replaced with UUID based name generation
this.file = Files.createTempFile(workingDir, "csv", ".tmp");
this.writer = new PrintWriter(Files.newBufferedWriter(this.file));
}

void close() {
if (this.writer != null)
this.writer.flush();
this.writer.close();
}
}

关于Java:同时写入和读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54064129/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com