gpt4 book ai didi

java - Apache Flink 可以写入基于键命名的文件吗?

转载 作者:行者123 更新时间:2023-11-30 08:33:45 25 4
gpt4 key购买 nike

在 Apache Flink 中是否可以根据 key 写入多个文本文件?例如,我有一些这样的数据。

key1, foo, bar
key2, baz, foo
key3,等等,等等

key 的值在编译时是未知的;新 key 会进来,我想将该 key 的结果写入一个单独的文件,与其他 key 的结果分开。

我希望看到 3 个文件,分别命名为“key1.txt”、“key2.txt”和“key3.txt”。

这是 Flink 开箱即用的功能吗?

最佳答案

您可以尝试以下接收器的实现,它可以与 KeyedStream 一起使用:

KeyedStream<Tuple2<String, String>, Tuple> keyedDataStream = dataStream.keyBy(0);



StreamKeyPartitionerSink<Tuple2<String, SynopsesEvent>> sinkFunction = new StreamKeyPartitionerSink<Tuple2<String, SynopsesEvent>>(
"../data/key_grouping", "f0"); // f0 is the key field name
keyedDataStream.addSink(sinkFunction);

有关 Flink 中状态管理的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#keyed-state因为我用它来管理每个键的状态。

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
* * Flink sink writes tuples to files partitioned by their keys, which also writes the records as
* batches.
*
* @param <IN> Input tuple type
*
* @author ehabqadah
*/
public class StreamKeyPartitionerSink<IN> extends RichSinkFunction<IN> {

private transient ValueState<String> outputFilePath;
private transient ValueState<List<IN>> inputTupleList;
/**
* Number of rcords to be hold before writing.
*/
private int writeBatchSize;
/**
* The output directory path
*/
private String outputDirPath;
/**
* The name of the input tuple key
*/
private String keyFieldName;



public StreamKeyPartitionerSink(String outputDirPath, String keyFieldName) {

this(outputDirPath, keyFieldName, 1);
}

/**
*
* @param outputDirPath- writeBatchSize the size of on hold batch before write
* @param writeBatchSize - output directory
*/
public StreamKeyPartitionerSink(String outputDirPath, String keyFieldName, int writeBatchSize) {

this.writeBatchSize = writeBatchSize;
this.outputDirPath = outputDirPath;
this.keyFieldName = keyFieldName;
}

@Override
public void open(Configuration config) {
// initialize state holders
`//for more info about state management check `//
ValueStateDescriptor<String> outputFilePathDesc =
new ValueStateDescriptor<String>("outputFilePathDesc",
TypeInformation.of(new TypeHint<String>() {}));

ValueStateDescriptor<List<IN>> inputTupleListDesc =
new ValueStateDescriptor<List<IN>>("inputTupleListDesc",
TypeInformation.of(new TypeHint<List<IN>>() {}));

outputFilePath = getRuntimeContext().getState(outputFilePathDesc);
inputTupleList = getRuntimeContext().getState(inputTupleListDesc);

}

@Override
public void invoke(IN value) throws Exception {
List<IN> inputTuples =
inputTupleList.value() == null ? new ArrayList<IN>() : inputTupleList.value();

inputTuples.add(value);
if (inputTuples.size() == writeBatchSize) {

writeInputList(inputTuples);
inputTuples = new ArrayList<IN>();
}

// update the state
inputTupleList.update(inputTuples);

}

/**
* Write the tuple list, each record in separate line
*
* @param tupleList
* @throws Exception
*/
public void writeInputList(List<IN> tupleList) {

String path = getOrInitFilePath(tupleList);
try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
for (IN tupleToWrite : tupleList) {
outStream.println(tupleToWrite);
}
} catch (IOException e) {
throw new RuntimeException("Exception occured while writing file " + path, e);
}
}

private String getOrInitFilePath(List<IN> tupleList) {

IN firstInstance = tupleList.get(0);
String path = null;
try {
path = outputFilePath.value();

if (path == null) {
Field keyField = firstInstance.getClass().getField(keyFieldName);
String keyValue = keyField.get(firstInstance).toString();
path = Paths.get(outputDirPath, keyValue + ".txt").toString();

setUpOutputFilePathPath(outputDirPath, path);
// save the computed path for this key
outputFilePath.update(path);
}
} catch (IOException | NoSuchFieldException | SecurityException | IllegalArgumentException
| IllegalAccessException e) {
throw new RuntimeException(
"ExceptionsetUpOutputFilePathPath occured while fetching the value of key field " + path,
e);
}
return path;
}

private void setUpOutputFilePathPath(String outputDirPath, String path) throws IOException {
if (!Files.exists(Paths.get(outputDirPath))) {
Files.createDirectories(Paths.get(outputDirPath));

}
// create the file if it does not exist and delete its content
Files.write(Paths.get(path), "".getBytes(), StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);

}
}

关于java - Apache Flink 可以写入基于键命名的文件吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39276290/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com