gpt4 book ai didi

hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?

转载 作者:行者123 更新时间:2023-12-02 20:28:13 26 4
gpt4 key购买 nike

我正在从文件中获取一些 json 记录。我想解析json,然后根据json中的一个字段,更新bucketing函数的基本路径。

例如:Json 记录中有一个字段名称'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")

我该怎么做?

代码:
数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");

    DataStream<String> parsedJson = input.map((inputMsg)->{

String json="";
try{

json=jsonParser.parse(inputMsg).getAsString();

}catch (Exception e){
e.printStackTrace();
}
return json;

});

parsedJson.addSink(new BucketingSink<>(""));

}

最佳答案

使用 BucketingSink.setBucketer()方法来设置您创建的实现 Bucketer 的类接口(interface),并使用 user-id字段值作为子存储桶路径。

关于hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54760702/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com