gpt4 book ai didi

java - 使用 Samza Runner 执行 Beam Pipeline 时出现 org.apache.beam.sdk.util.UserCodeException

转载 作者:行者123 更新时间:2023-12-05 07:04:53 29 4
gpt4 key购买 nike

我正在尝试从 here 运行 Wordcount 演示与 Samza Runner。这是我的build.gradle

plugins {
id 'eclipse'
id 'java'
id 'application'

// 'shadow' allows us to embed all the dependencies into a fat jar.
id 'com.github.johnrengelman.shadow' version '4.0.3'
}

mainClassName = 'samples.quickstart.WordCount'

maven {
url = uri('http://packages.confluent.io/maven/')
}
mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

ext.apacheBeamVersion = '2.22.0'

dependencies {
shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"

runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
runtime "org.slf4j:slf4j-api:1.+"
runtime "org.slf4j:slf4j-jdk14:1.+"
compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile group: 'org.apache.samza', name: 'samza-api', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-core_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kafka_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: '1.4.0'
testCompile "junit:junit:4.+"
}
shadowJar {
zip64 true
baseName = 'WordCount' // Name of the fat jar file.
classifier = null // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
manifest {
attributes('Main-Class': mainClassName) // Specify where the main class resides.
}
}

我的wordcount.java如下

package samples.quickstart;

import org.apache.beam.runners.samza.SamzaRunner;
//import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;

public class WordCount {

private static final String jobName = "beamtest";

public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(SamzaRunner.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
.apply("Read lines", TextIO.read().from(inputsDir))
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Count words", Count.perElement())
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to(outputsPrefix));
pipeline.run().waitUntilFinish();
}
}


我使用的是 Beam 2.22.0 版。我尝试了以下组合。 Samza 1.4 与 Beam 2.22、Samza 1.0 与 Beam 2.11 和 Beam 2.22 以及 Samza 0.14.1 与 Beam 2.11.0。但是在执行时出现以下错误:

java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

我正在使用 Java 1.8。有没有人知道是什么导致了这个问题?

最佳答案

可以把使用Samza runner的build.gradle和修改后的wordcount.java贴在这里,方便我们排查是不兼容问题还是配置问题。感谢您试用 Samza runner!

关于java - 使用 Samza Runner 执行 Beam Pipeline 时出现 org.apache.beam.sdk.util.UserCodeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62852415/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com