gpt4 book ai didi

Apache Beam 不支持 Kotlin Iterable?

转载 作者:行者123 更新时间:2023-12-04 15:38:49 26 4
gpt4 key购买 nike

Apache Beam 似乎拒绝识别 Kotlin 的 Iterable .这是一个示例代码:

@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}

我收到以下奇怪的错误:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

果然,如果我替换 Iterablejava.lang.Iterable ,相同的代码工作得很好。我究竟做错了什么?

依赖版本:
  • kotlin-jvm:1.3.21
  • org.apache.beam: 2.11.0

  • 这是一个包含完整代码和堆栈跟踪的要点:
  • https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-apache-beam-iterable-notworking-kt

  • 更新 :

    经过一番反复试验,我发现虽然 List<String>抛出类似的异常,但 MutableList<String>实际工作:
    class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
    @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
    }
    }

    所以,这让我想起了 Kotlin 的 Immutable 集合实际上只是接口(interface),而底层集合仍然是可变的。但是,尝试替换 IterableMutableIterable继续提出错误。

    更新 2 :

    我使用 MutableList 部署了我的 Kotlin 数据流作业根据上述和作业失败:
    java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
    org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

    我不得不切换回使用 java.lang.Iterable .

    最佳答案

    我在使用 ParDo 时也遇到了这个问题。关注 GroupByKey .原来是 @JvmWildcard Iterable 中需要注释编写接受 GroupByKey 结果的转换时的泛型类型.

    请参阅下面的人为示例,该示例读取文件并按每行的第一个字符进行分组。

    class BeamPipe {
    class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
    receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
    }

    fun pipe(options: PipelineOptions) {
    val file =
    "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
    .apply("Key lines by first character",
    WithKeys.of { line: String -> line[0].toString() }
    .withKeyType(TypeDescriptors.strings()))
    .apply("Group lines by first character", GroupByKey.create<String, String>())
    .apply("Concatenate lines", ParDo.of(ConcatLines()))
    .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
    .by { it.key }
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
    .to("whatever")
    .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
    )
    p.run()
    }
    }

    关于Apache Beam 不支持 Kotlin Iterable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55908999/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com