gpt4 book ai didi

Java 8 - 将 Kairosdb 中的多个对象列表保存到 csv 文件中

转载 作者:行者123 更新时间:2023-11-30 08:36:39 25 4
gpt4 key购买 nike

我需要根据 Kairosdb 的所有指标值创建一个 csv 文件。

kairosdb UI 已有另存为功能,但导出文件中没有指标名称。此外,我们无法将多个指标导出到单个文件中。

我面临的问题是匹配来自多个指标的时间戳。例如,一个指标可能会返回 5 个时间戳值。另一个指标可能会返回 10 个时间戳值,这些值可能与之前的指标匹配,也可能不匹配。

所以我需要生成如下所示的 csv:

tmestamp,metric1,metric2,tmetric3\n
0,1,,2\n
1,,2,\n
2,1,3,6\n
3,5,5, \n
4,,,5\n

查询返回的值可能超过 10000 个数据点。我该如何解决这个问题。我可以在 spark 集群中运行这个程序吗?

我试过的代码:

package com.example;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.kairosdb.client.builder.DataPoint;
public class Test {
private static Map<MetricMap, String> metricMaps = new HashMap<>();

public static void main(String args[]) {
Map<String, List<DataPoint>> metriDps = new HashMap<>();
String[] metricNames = new String[] { "m1", "m2", "m3" };
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>();
DataPoint dp1 = new DataPoint(0, 1);
DataPoint dp2 = new DataPoint(2, 1);
DataPoint dp3 = new DataPoint(3, 5);
dataPoints1.add(dp1);
dataPoints1.add(dp2);
dataPoints1.add(dp3);
metriDps.put("m1", dataPoints1);
List<DataPoint> dataPoints2 = new ArrayList<DataPoint>();
DataPoint dp21 = new DataPoint(1, 2);
DataPoint dp22 = new DataPoint(2, 3);
DataPoint dp23 = new DataPoint(3, 5);
dataPoints2.add(dp21);
dataPoints2.add(dp22);
dataPoints2.add(dp23);
metriDps.put("m2", dataPoints2);
List<DataPoint> dataPoints3 = new ArrayList<DataPoint>();
DataPoint dp31 = new DataPoint(0, 2);
DataPoint dp32 = new DataPoint(2, 6);
DataPoint dp33 = new DataPoint(4, 5);
dataPoints3.add(dp31);
dataPoints3.add(dp32);
dataPoints3.add(dp33);
metriDps.put("m3", dataPoints3);
try {
FileWriter writer = new FileWriter("/home/lr/Desktop/csv1.csv");
metriDps.keySet().stream().forEach(key -> createMap(metriDps.get(key), key));
String value;
for (MetricMap metricMap : metricMaps.keySet()) {
String time = metricMap.getTime();
writer.append(time);
writer.append(',');
for (int i = 0; i < 3; i++) {
MetricMap map = new MetricMap();
map.setName(metricNames[i]);
map.setTime(time);
value = metricMaps.get(map);
if (value != null)
writer.append(metricMaps.get(map));
else
writer.append("");
if (i == 2)
writer.append('\n');
else
writer.append(',');
}
}
// generate whatever data you want

writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static void createMap(List<DataPoint> list, String key) {

MetricMap map = null;

for (DataPoint dp : list) {
map = new MetricMap();
map.setName(key);
map.setTime(String.valueOf(dp.getTimestamp()));
metricMaps.put(map, String.valueOf(dp.getValue()));
}

}

非常感谢您的帮助。

最佳答案

要使您的算法正常工作,您必须映射以时间为键、以点的值 + 指标名称为值的映射。以下是这样做的:

Map<String, List<DataPoint>> metriDps = new HashMap<>();
String[] metricNames = new String[] {
"m1", "m2", "m3"
};
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>();
dataPoints1.add(new DataPoint(0, 1));
dataPoints1.add(new DataPoint(2, 1));
dataPoints1.add(new DataPoint(3, 5));
metriDps.put("m1", dataPoints1);

List<DataPoint> dataPoints2 = new ArrayList<DataPoint>();
dataPoints2.add(new DataPoint(1, 2));
dataPoints2.add(new DataPoint(2, 3));
dataPoints2.add(new DataPoint(3, 5));
metriDps.put("m2", dataPoints2);

List<DataPoint> dataPoints3 = new ArrayList<DataPoint>();
dataPoints3.add(new DataPoint(0, 2));
dataPoints3.add(new DataPoint(2, 6));
dataPoints3.add(new DataPoint(4, 5));
metriDps.put("m3", dataPoints3);

SortedMap<Long, Map<String, String>> map = new TreeMap<>();
// format:
// time1 -> [(metricName, value), (metricName, value), ..]
// time2 -> [(metricName, value), (metricName, value), ..]
// ..

metriDps.entrySet().stream()
.forEach(entry -> {
List<DataPoint> points = entry.getValue();
String metric = entry.getKey();
points.forEach(point -> {
Long time = point.getTimestamp();
Object value = point.getValue();
if (value != null)
// add (metricName, value) to map stored under time
map.computeIfAbsent(time, key -> new HashMap<>())
.put(metric, value.toString());
});
});

StringWriter writer = new StringWriter();
// header
writer.append("timestamp,");
writer.append(Stream.of(metricNames).collect(Collectors.joining(",")));
writer.append('\n');
// content, sorted map means we can simply iterate it's keys
map.entrySet().forEach(entry -> {
// time
writer.append(String.valueOf(entry.getKey()));
writer.append(',');
// fetch all possible metric names from the map so it prints empty ",,"
String line = Stream.of(metricNames)
.map(entry.getValue()::get)
.map(val -> val == null ? "" : val)
.collect(Collectors.joining(","));
writer.append(line);
writer.append('\n');
});
System.out.println(writer);

打印

timestamp,m1,m2,m3
0,1,,2
1,,2,
2,1,3,6
3,5,5,
4,,,5

对于已排序的输入列表,您可以通过保留 3 个迭代器,然后将指向最早值的迭代器前进来改进算法。因此,您可以并行/并排迭代所有系列。这样您就可以节省一些内存,因为您不必构建 map 并一张一张地处理列表。


使用下面的工具类

static class NamedKeeparator implements Iterator<DataPoint> {
private final Iterator<DataPoint> delegate;
private final String name;
private DataPoint current;

public NamedKeeparator(String name, Iterator<DataPoint> delegate) {
this.delegate = delegate;
this.name = name;
}

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public DataPoint next() {
return current = delegate.next();
}

public DataPoint current() {
return current;
}

public void consume() {
current = null;
}

String getName() {
return name;
}
}

一个潜在的实现可能是

StringWriter writer = new StringWriter();
// header
writer.append("timestamp,");
writer.append(Stream.of(metricNames).collect(Collectors.joining(",")));
writer.append('\n');

List<NamedKeeparator> iterators = metriDps.entrySet().stream()
.map(entry -> new NamedKeeparator(entry.getKey(), entry.getValue().iterator()))
.collect(Collectors.toList());

List<NamedKeeparator> leastIterators = new ArrayList<>();
for (;;) {
leastIterators.clear();
long leastValue = Long.MAX_VALUE;
for (NamedKeeparator iterator : iterators) {
// advance until there is some value
while (iterator.current() == null && iterator.hasNext()) {
iterator.next();
}
// build set of iterators pointing to least value
if (iterator.current() != null
&& iterator.current().getTimestamp() <= leastValue) {
if (iterator.current().getTimestamp() < leastValue) {
leastValue = iterator.current().getTimestamp();
leastIterators.clear();
}
leastIterators.add(iterator);
}
}
// nothing -> all iterators done
if (leastIterators.isEmpty())
break;

// least contains now iterators for the same timestamp

// get time from the first
long time = leastIterators.get(0).current().getTimestamp();
writer.append(String.valueOf(time)).append(',');

// format points
String points = Stream.of(metricNames)
.map(metric -> leastIterators.stream()
.filter(it -> it.getName().equals(metric)).findAny()
.map(it -> it.current()).orElse(null))
.map(point -> point != null ? String.valueOf(point.getValue()) : "")
.collect(Collectors.joining(","));

writer.append(points).append('\n');

leastIterators.forEach(it -> {
it.consume();
});
}
System.out.println(writer);

http://ideone.com/pVCfNB

关于Java 8 - 将 Kairosdb 中的多个对象列表保存到 csv 文件中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37609373/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com