gpt4 book ai didi

java - 如何从 pcollection 将多个值写入 redshift 表

转载 作者:行者123 更新时间:2023-12-02 03:01:14 24 4
gpt4 key购买 nike

所以我有一个模板,可以将单个字符串作为记录写入 Redshift 表。

public static void main(String[] args) throws Exception {
// Step 1: Create Options
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

// Step 2: Create Pipeline
Pipeline pipeline = Pipeline.create(options);

// Step 3: Create PCollection from array of random words <Strings>
PCollection<String> collection = pipeline
.apply(Create.of(Arrays.asList("start", "test", "case", "single", "end")))
.setCoder(StringUtf8Coder.of());

// Step 4: Execute transforms on the collection. This transform writes the string value to a table named 'test'
collection.apply(JdbcIO.<String>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.amazon.redshift.jdbc42.Driver", options.getRedshiftUrl())
.withUsername(options.getUser()).withPassword(options.getPassword()))
.withStatement("insert into example_schema.test values (?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() {
public void setParameters(String element, PreparedStatement query) throws SQLException {
query.setString(1, element);
}
}));

pipeline.run().waitUntilFinish();
}

我想调整它以写入由整数、 double 和字符串组成的多个字段。

我发现我的方法存在很多问题,但我觉得我可能会在没有完全理解该过程的情况下随机尝试进入正确的实现

public static void main(String[] args) throws Exception {
// Step 1: Create Options
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

String insertQuery = "insert into sample.mytable (item_int, item_string, item_double" +
"values (?, ?, ?)";

CustomObj custom_obj = new CustomObj(1, "", 0.5);

// Step 2: Create Pipeline
Pipeline pipeline = Pipeline.create(options);

// Step 3: Create PCollection from array of random words <Strings>
PCollection<CustomObj> collection = pipeline
.apply(Create.of());

// Step 4: Execute transforms on the collection. This transform writes the string value to a table named 'test'
collection.apply(JdbcIO.<CustomObj>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.amazon.redshift.jdbc42.Driver", options.getRedshiftUrl())
.withUsername(options.getUser()).withPassword(options.getPassword()))
.withStatement(insertQuery)
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<CustomObj>() {
public void setParameters(CustomObj element, PreparedStatement query) throws SQLException {
query.setInt(1, element.intVal);
query.setString(2, element.stringVal);
query.setDouble(3, element.doubleVal);
}
}));
pipeline.run().waitUntilFinish();
}


public static class CustomObj
{
private Integer intVal;
private String stringVal;
private Double doubleVal;

public CustomObj (Integer intVal, String stringVal, Double doubleVal)
{
this.intVal = intVal;
this.stringVal = stringVal;
this.doubleVal = doubleVal;
}
}

到目前为止,我知道我需要为我的 PCollection 设置一个适当的编码器,但考虑到我正在使用的对象类型,我不确定该编码器。

我也未能正确使用PreparedStatementSetter,但当我寻求清晰度时,我得到了完全使用不同方法的示例。

我知道我的问题可能有点模糊,但如果我能找到一个能够更清楚地说明我上面展示的方法的来源,我将不胜感激。

产生的输出是

 no suitable method found for of(no arguments)
[ERROR] method org.apache.beam.sdk.transforms.Create.<T>of(java.lang.Iterable<T>) is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method org.apache.beam.sdk.transforms.Create.<T>of(T,T...) is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method org.apache.beam.sdk.transforms.Create.<K,V>of(java.util.Map<K,V>) is not applicable
[ERROR] (cannot infer type-variable(s) K,V
[ERROR] (actual and formal argument lists differ in length))
[ERROR]
[ERROR] -> [Help 1]

最佳答案

该错误表明编译器未能选择 Create.of() 的正确重载。 。如果您查看文档 Create 没有需要零参数的重载,您必须传递一个可迭代的、一个映射或带有非可选第一个参数的可变参数。您可能是说 Create.of(custom_obj) ,它应该像您期望的那样工作(在本例中,它将创建一个包含单个元素的 PCollection<CustomObj> )。

语句 setter 也应该像您拥有的那样工作,这是一个执行相同操作的示例:https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L479

关于java - 如何从 pcollection 将多个值写入 redshift 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57050780/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com