- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有两个简单的Flink流作业,这些作业从Kafka读取,进行一些转换并将结果放入Cassandra Sink中。他们从不同的Kafka主题中阅读并保存到不同的Cassandra表中。
当我单独运行两个作业中的任何一个时,一切正常。触发并完成检查点,并将数据保存到Cassandra。
但是,无论何时我运行两个作业(或其中两个执行两次),第二个作业在启动时都会失败,并出现以下异常:com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
。
我找不到有关此错误的太多信息,它可能是由以下任何一种引起的:
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
Cluster cluster = null;
try {
cluster = builder
.addContactPoint("localhost")
.withPort(9042)
.withClusterName("Test Cluster")
.withoutJMXReporting()
.withProtocolVersion(ProtocolVersion.V4)
.withoutMetrics()
.build();
// register codecs from datastax extras.
cluster.getConfiguration().getCodecRegistry()
.register(LocalTimeCodec.instance);
} catch (ConfigurationException e) {
e.printStackTrace();
} catch (NoHostAvailableException nhae) {
nhae.printStackTrace();
}
return cluster;
}
};
我尝试使用不同的PoolingOptions和SocketOptions设置,但未成功。
CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
"values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
@Override
public void onFailure(Throwable throwable) {
LOG.error("A {} occurred.", "Cassandra Failure", throwable);
}
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");
来自flink作业管理器的完整跟踪日志:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
任何帮助表示赞赏。
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.abcde.ai</groupId>
<artifactId>analytics-etl</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.abcde.analytics.etl.KafkaUniqueCountsStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
编辑:
flink-connector-cassandra
标记为提供的错误并且我只是从本地maven存储库(〜/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink-connector-cassandra_2.11-1.10.2.jar)到Flink lib文件夹。我的问题已解决,但根本原因仍然是个谜。
最佳答案
我可能是错的,但最有可能是由Netty客户端版本冲突引起的。错误状态为NoHostAvailableException
,但是潜在的错误是TransportException
和Error writing
错误消息。 Cassandra (Cassandra)的确运转良好。
有一种类似的stackoverflow情况-Cassandra - error writing,具有非常相似的症状-单个项目运行良好,并且在添加一个以上项目时,带有AllNodesFailedException
消息的TransportException
和Error writing
成为根本原因。作者能够通过统一网络客户来解决此问题。
在您的情况下,我不确定为什么会有这么多的依赖项,因此我将尝试摆脱所有附加功能和库,而只保留Flink(v 1.10.0-scala_2.12)和Flink Cassandra Connector(flink- connector-cassandra_2.12:jar:1.10.0)库。它们必须已经包括必要的驱动程序,净值等。应跳过所有其他驱动程序(至少对于初始迭代而言,以确保这可以解决问题并且与库冲突)。
关于cassandra - 使用CassandrSink进行Flink作业失败,并写入错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64336044/
我有这个代码 var myChart = new FusionCharts("../themes/clean/charts/hbullet.swf", "myChartId", "400", "75
既然写入是立即进行的(复制到内核缓冲区并返回),那么使用 io_submit 进行写入有什么好处? 事实上,它 (aio/io_submit) 看起来更糟,因为您必须在堆上分配写入缓冲区并且不能使用基
我正在使用 mootool 的 Request.JSON 从 Twitter 检索推文。收到它后,我将写入目标 div 的 .innerHTML 属性。当我在本地将其作为文件进行测试时,即 file:
最终,我想将 Vertica DB 中的数据抓取到 Spark 中,训练机器学习模型,进行预测,并将这些预测存储到另一个 Vertica DB 中。 当前的问题是确定流程最后部分的瓶颈:将 Spark
我使用 WEKA 库编写了一个 Java 程序, 训练分类算法 使用经过训练的算法对未标记的数据集运行预测 将结果写入 .csv 文件 问题在于它当前写出离散分类结果(即算法猜测一行属于哪个类别)。我
背景 - 我正在考虑使用 clickonce 通过 clickonce(通过网站)部署 WinForms 应用程序。相对简单的应用程序的要素是: - 它是一个可执行文件和一个数据库文件(sqlite)
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
我正在读取 JSON 文件,取出值并进行一些更改。 基本上我向数组添加了一些值。之后我想将其写回到文件中。当我将 JSONArray 写回文件时,会被写入字符串而不是 JSONArray 对象。怎样才
我为两个应用程序使用嵌入式数据库,其中一个是服务器,另一个是客户端。客户端应用程序。可以向服务器端发送获取数据请求以检索数据并显示在表格(或其他)中。问题是这样的:如何将获取的数据保存(写入)到页面文
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
从问题得出问题:找到所有 result = new ArrayList(); for (int i = 2; i >(i%8) & 0x1) == 0) { result.add(i
由于某种原因,它没有写入 CSV。谁能明白为什么它不写吗? def main(): list_of_emails = read_email_csv() #read input file, cr
关闭。 这个问题是 not reproducible or was caused by typos 。它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能在这里出现,
我目前正在开发一个保存和加载程序,但我无法获得正确的结果。 编写程序: #include #include #define FILENAME "Save" #define COUNT 6 type
import java.io.*; public class Main2 { public static void main(String[] args) throws Exception {
我需要使用预定义位置字符串“Office”从所有日历中检索所有 iOS 事件,然后将结果写入 NSLog 和 UITextView。 到目前为止,这是我的代码: #import "ViewCo
我正在尝试将 BOOL 值写入 PFInstallation 中的列,但会不停地崩溃: - (IBAction)pushSwitch:(id)sender { NSUserDefaults *push
我以前在学校学过一些简单的数据库编程,但现在我正在尝试学习最佳实践,因为我正在编写更复杂的应用程序。写入 MySQL 数据库并不难,但我想知道让分布式应用程序写入 Amazon EC2 上的远程数据库
是否可以写回到ResourceBundle?目前我正在使用 ResourceBundle 来存储信息,在运行时使用以下内容读取信息 while(ResourceBundle.getBundle("bu
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能是on-topi
我是一名优秀的程序员,十分优秀!