- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
转载:Flink跨集群访问开启Kerberos认证的Kafka
Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。
HadoopModule用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule负责安装整个进程的ZooKeeper安全配置。
Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。
以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。
加载安全认证上下文,从上下文中启动集群各个组件。
ClusterEntrypoint#startCluster
//
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration);
return null;
});
SecurityContext installSecurityContext(Configuration configuration)) {
SecurityUtils.install(new SecurityConfiguration(configuration));
return SecurityUtils.getInstalledContext();
}
SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
提供三个security.module.factory.classes用来准备安全认证使用的配置:
org.apache.flink.runtime.security.modules.HadoopModuleFactory
org.apache.flink.runtime.security.modules.JaasModuleFactory
org.apache.flink.runtime.security.modules.ZookeeperModuleFactory
主要还是installModules配置认证使用的配置
SecurityUtils#install
public static void install(SecurityConfiguration config) throws Exception {
// Install the security modules first before installing the security context
installModules(config);
installContext(config);
}
#installModules使用SPI动态创建moduleFactory,分别调用其install方法
static void installModules(SecurityConfiguration config) {
List<SecurityModule> modules = new ArrayList<>();
for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
SecurityModuleFactory moduleFactory = null;
SecurityModule module = moduleFactory.createModule(config);
if (module != null) {
module.install();
modules.add(module);
}
}
installedModules = modules;
}
}
HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。
public void install() throws SecurityInstallException {
## 传递hadoop相关参数
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
## 开启kerberos认证并传递Keytab和Principal
if (UserGroupInformation.isSecurityEnabled() &&
!StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
## 当前登陆用户
loginUser = UserGroupInformation.getLoginUser();
// token cache
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
// used in the context of reading the stored tokens from UGI.
// Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
// loginUser.addCredentials(cred);
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
}
}
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
} else {
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try {
//Use reflection API to get the login user object
//UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
loginUser = UserGroupInformation.getLoginUser();
}
boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
loginUser, securityConfig.useTicketCache());
LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
}
JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。
Kafka kerberos认证使用的jaas文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
keyTab="/Users/xx/kafka.keytab"
principal="kafka/cdh002@TEST.COM"
useKeyTab=true
useTicketCache=true;
};
jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。
public void install() {
priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
if (priorConfigFile == null) {
File configFile = generateDefaultConfigFile(workingDir);
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
LOG.info("Jaas file will be created as {}.", configFile);
}
// read the JAAS configuration file, 创建ConfigFile
priorConfig = javax.security.auth.login.Configuration.getConfiguration();
// construct a dynamic JAAS configuration
currentConfig = new DynamicConfiguration(priorConfig);
// wire up the configured JAAS login contexts to use the krb5 entries
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
if (krb5Entries != null) {
for (String app : securityConfig.getLoginContextNames()) {
currentConfig.addAppConfigurationEntry(app, krb5Entries); // kafkaClient
}
}
## 写入javax.security.auth.login.Configuration
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
}
ZooKeeperModule#install
public void install() throws SecurityInstallException {
priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
}
priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
}
}
加载完各个Module后,构建installContext。只会有一个installedContext。
static void installContext(SecurityConfiguration config) throws Exception {
// install the security context factory
for (String contextFactoryClass : config.getSecurityContextFactories()) {
try {
// spi加载
SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
// 有hadoop环境就走Hadoop 认证
if (contextFactory.isCompatibleWith(config)) {
try {
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining. 只加载一个
break;
} catch (SecurityContextInitializeException e) {
LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e);
}
} else {
LOG.warn("Unable to install incompatible security context factory {}", contextFactoryClass);
}
} catch (NoMatchSecurityFactoryException ne) {
LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
}
}
if (installedContext == null) {
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
}
}
Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。
kafka中添加的参数:
1. security.protocol='SASL_PLAINTEXT' //使用SASL认证协议
2. sasl.mechanism = 'GSSAPI' // 使用kerberos认证
3. sasl.kerberos.service.name = 'kafka' // 服务名称
flinkconf.yaml中参数:
1. security.kerberos.login.use-ticket-cache:true
2. security.kerberos.login.keytab: xxxx
3. security.kerberos.login.principal:xxxxx
4. security.kerberos.login.contexts: KafkaClient,Client
如果配置sasl.jaas.config,则格式为:
String config = "com.sun.security.auth.module.Krb5LoginModule required\n" +
"\tprincipal=\"kafka/xxxxx@EXAMPLE.COM\"\n" +
"\tkeyTab=\"/Users/xxxx/kafka.keytab\"\n" +
"\tuseKeyTab=true\n" +
"\tuseTicketCache=true;";
Flink kafka connector 使用的jaas配置流程。
FlinkKafkaConsumerBase#open
|
this.partitionDiscoverer.open();
|
KafkaPartitionDiscoverer#initializeConnections
|
KafkaConsumer#KafkaConsumer
|
ClientUtils.createChannelBuilder(config);
|
ChannelBuilders#clientChannelBuilder
|
ChannelBuilders#create
|
SaslChannelBuilder(0.10版本)#configure
|
JaasUtils#jaasConfig
public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
# 绑定了sasl.jaas.config参数,则从val中获取
if (jaasConfigArgs != null) {
if (loginType == LoginType.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else {
JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
return jaasConfig;
}
} else
return defaultJaasConfig(loginType);
}
private static Configuration defaultJaasConfig(LoginType loginType) {
# 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null) {
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
}
# 拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
Configuration jaasConfig = Configuration.getConfiguration();
# KafkaClient
String loginContextName = loginType.contextName();
AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
if (configEntries == null) {
String errorMessage;
errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
throw new IllegalArgumentException(errorMessage);
}
return jaasConfig;
}
目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。
步骤:
上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
单独配置sasl.jaas.config参数。
特别注意:
新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。
[realms]
PLS.COM = {
kdc = plscdh00:88
admin_server = plscdh00
}
将plscdh0-3 映射到PLS.COM
[domain_realm]
.pls.com = PLS.COM
pls.com = PLS.COM
plscdh01 = PLS.COM
plscdh02 = PLS.COM
plscdh03 = PLS.COM
plscdh00 = PLS.COM
将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
相关代码:
krb5文件构建代码:
public class KrbConfManager {
Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);
private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
private KrbConfManager(){}
public void appendKrbConf(String krbConf) {
this.appendKrbConf(krbConf, System.getProperty("user.dir"));
}
/**
* 将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
* @param krbConf
*/
public void appendKrbConf(String krbConf, String directory) {
String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
LOG.info("krb conf abs path is {}", krbConfPath);
Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
try {
Constructor<Config> constructor = Config.class.getDeclaredConstructor();
constructor.setAccessible(true);
Config configParser = constructor.newInstance();
Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
loadConfigFile.setAccessible(true);
List<String> configFileList = (List<String>) loadConfigFile.invoke(configParser, krbConfPath);
Method parseStanzaTable = configParser.getClass().getDeclaredMethod("parseStanzaTable", List.class);
parseStanzaTable.setAccessible(true);
Hashtable<String, Object> appendConfig = (Hashtable<String, Object>) parseStanzaTable.invoke(configParser, configFileList);
Hashtable<String, Object> appendRealms = (Hashtable<String, Object>) appendConfig.get("realms");
Hashtable<String, Object> appendDomainRealm = (Hashtable<String, Object>) appendConfig.get("domain_realm");
Config instance = Config.getInstance();
Field stanzaTable = instance.getClass().getDeclaredField("stanzaTable");
stanzaTable.setAccessible(true);
Hashtable<String, Object> currentTable = (Hashtable<String, Object>) stanzaTable.get(instance);
Hashtable<String, Object> realms = (Hashtable<String, Object>) currentTable.computeIfAbsent("realms", key -> new Hashtable());
realms.putAll(appendRealms);
Hashtable<String, Object> domainRealm = (Hashtable<String, Object>) currentTable.computeIfAbsent("domain_realm", key -> new Hashtable());
domainRealm.putAll(appendDomainRealm);
StringBuffer stringBuffer = new StringBuffer();
String newKerbConfigStr = buildKrbConfigStr(currentTable, stringBuffer);
LOG.info("====buildKerbConf======\n{}", newKerbConfigStr);
String krb5FilePath = DtFileUtils.createTempFile("krb-", ".conf", newKerbConfigStr, directory);
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5FilePath);
Config.refresh();
} catch (Exception e) {
throw new RuntimeException("build krb conf error", e);
}
}
private String buildKrbConfigStr(Hashtable<String, Object> currentTable, StringBuffer stringBuilder) {
Set<String> keySet = currentTable.keySet();
for (String key : keySet) {
stringBuilder = stringBuilder.append("[").append(key).append("]").append("\n");
if (!StringUtils.equalsIgnoreCase(key, "realms")) {
toStringInternal("", currentTable.get(key), stringBuilder);
} else {
dealRealms(currentTable.get(key), stringBuilder);
}
}
return stringBuilder.toString();
}
private void dealRealms(Object realms, StringBuffer stringBuilder) {
if (realms instanceof Hashtable) {
Hashtable realmsTable = (Hashtable) realms;
Iterator tabIterator = realmsTable.keySet().iterator();
while (tabIterator.hasNext()) {
Object entity = tabIterator.next();
stringBuilder = stringBuilder.append(entity).append(" = ").append("{\n");
toStringInternal("", realmsTable.get(entity), stringBuilder);
stringBuilder.append("}\n");
}
}
}
private static void toStringInternal(String prefix, Object obj, StringBuffer sb) {
if (obj instanceof String) {
// A string value, just print it
sb.append(obj).append('\n');
} else if (obj instanceof Hashtable) {
// A table, start a new sub-section...
Hashtable<?, ?> tab = (Hashtable<?, ?>) obj;
for (Object o : tab.keySet()) {
// ...indent, print "key = ", and
sb.append(prefix).append(" ").append(o).append(" = ");
// ...go recursively into value
toStringInternal(prefix + " ", tab.get(o), sb);
}
sb.append(prefix).append("\n");
} else if (obj instanceof Vector) {
// A vector of strings, print them inside [ and ]
Vector<?> v = (Vector<?>) obj;
boolean first = true;
for (Object o : v.toArray()) {
if (!first) {
sb.append(",");
}
sb.append(o);
first = false;
}
sb.append("\n");
}
}
private enum Singleton {
INSTANCE;
private final KrbConfManager instance;
Singleton() {
instance = new KrbConfManager();
}
private KrbConfManager getInstance() {
return instance;
}
}
public static KrbConfManager getInstance() {
return Singleton.INSTANCE.getInstance();
}
}
sasl.jaas.config 文件构建以及填充
public interface SecurityManager {
Logger LOG = LoggerFactory.getLogger(SecurityManager.class);
String SASL_JAAS_CONFIG = "sasl.jaas.config";
default void kerberosSecurity(KafkaSourceTableInfo kafkaSourceTableInfo, Properties props) {
if (StringUtils.equalsIgnoreCase(kafkaSourceTableInfo.getKerberosAuthEnable(), Boolean.TRUE.toString())) {
Optional.ofNullable(kafkaSourceTableInfo.getKrbConfName())
.ifPresent(KrbConfManager.getInstance()::appendKrbConf);
String jaasContent = JaasConfigUtil.JaasConfig.builder()
.setLoginModule("com.sun.security.auth.module.Krb5LoginModule")
.setLoginModuleFlag("required")
.setPrincipal(DtStringUtil.addQuoteForStr(kafkaSourceTableInfo.getPrincipal()))
.setKeyTab(DtStringUtil.addQuoteForStr(DtFileUtils.getFileAbsolutePath(kafkaSourceTableInfo.getKeyTab())))
.setUseKeyTab(kafkaSourceTableInfo.getUseKeyTab())
.setUseTicketCache(kafkaSourceTableInfo.getUseTicketCache())
.build()
.generateJaasConfigStr();
LOG.info(" kafka jaas Content: \n{}", jaasContent);
props.put(SASL_JAAS_CONFIG, jaasContent);
}
}
}
构建入口,要在OPEN中调用
KafkaConsumer010#open
@Override
public void open(Configuration configuration) throws Exception {
kerberosSecurity(kafkaSourceTableInfo, this.properties);
super.open(configuration);
}
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL
1.概述 转载:Flink 源码阅读笔记(6)- 计算资源管理 在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。 2.Task
1.概述 转载:Flink jvm参数配置GC日志 生产环境上,或者其他要测试 GC 问题的环境上,一定会配置上打印GC日志的参数,便于分析 GC 相关的问题。 但是可能很多人配置的都不够“完美”,要
1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl
1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成
1.概述 转载:面向流批一体的 Flink Runtime 新进展 首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这
在Fink源码中,有flink-stream-java和flink-stream-scala模块。 flink streaming 为什么需要两个模块? https://github.com/apac
我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
1.概述 转载:Flink 源码阅读笔记(4)- RPC 相关文章: 【Flink】Flink 源码之RPC调用 Flink】FLink 通讯组件 RPC 作为一个分布式系统,Flink 内部不同组件
1.概述 转载并且补充: flink keyby 分布不均匀问题 我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四
1.概述 转载:Flink Sort-Shuffle写流程简析 转载并且补充。 2.配置 taskmanager.network.sort-shuffle.min-parallelism 核心配置。设
1.概述 转载:Flink源码分析——批处理模式Map端数据聚合 在flink的批处理模式下,数据的计算也有着map/reduce两端的计算模型,这一点和MR、spark计算框架是类似的。在数据进行分
1.概述 转载:Flink on yarn 远程调试 大家好,我是 JasonLee。 前几天有小伙伴问我,我写的 Flink 代码是提交到 yarn 上去运行的,那我怎么能远程调试代码呢?在本地调试
当我使用 flink 事件时间窗口时,窗口就是不触发。请问如何解决,有什么debug的方法吗? 最佳答案 由于您使用的是事件时间窗口,所以很可能是水印问题。该窗口仅在水印取得进展时输出。事件时间没有提
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流程看起来像 Source1 -> operator1 -> Sink1 Source2 -> operator2 -> S
我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。通过阅读文档,在我看来,Flink 广播状态很适合这种情况。 作为实验,我构建了一个简化版本:假设我有一
我有一个 Flink Streaming 作业,它失败了,我得到如下日志。谁能告诉我如何解决这个问题?有时运行一天就失效,有时运行几个小时就失效。 09:30:25 948 INFO (org.ap
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafk
我是一名优秀的程序员,十分优秀!