gpt4 book ai didi

scala - 如果我有身份验证详细信息,如何使用 spark 编程 API 写入 HDFS?

转载 作者:可可西里 更新时间:2023-11-01 14:30:17 25 4
gpt4 key购买 nike

我需要写入外部 HDFS 集群,其身份验证详细信息可用于简单例份验证和 kerberos 身份验证。为了简单起见,假设我们正在处理简单的身份验证。

这是我的:

  • 外部 HDFS 集群连接详细信息(主机、端口)
  • 身份验证详细信息(简单例份验证的用户)
  • HDFS需要写入文件的位置(hdfs://host:port/loc)
  • 此外,其他细节,如格式等。

请注意 SPARK 用户与为 HDFS 身份验证指定的用户不同。

现在,使用 spark 编程 API,这就是我想要做的:

val hadoopConf =  new Configuration()
hadoopConf.set("fs.defaultFS", fileSystemPath)
hadoopConf.set("hadoop.job.ugi", userName)
val jConf = new JobConf(hadoopConf)
jConf.setUser(user)
jConf.set("user.name", user)
jConf.setOutputKeyClass(classOf[NullWritable])
jConf.setOutputValueClass(classOf[Text])
jConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])

outputDStream.foreachRDD(r => {
val rdd = r.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
println(x.toString)
(NullWritable.get(), text)
}
}

val rddCount = rdd.count()
if(rddCount > 0) {
rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf)
}
})

在这里,我假设如果我们通过正确的详细信息传递 JobConf,它应该用于身份验证并且应该使用 JobConf 中指定的用户来完成写入。

但是,无论 JobConf (“hdfs”作为用户) 中存在的身份验证详细信息如何,作为 spark 用户 (“root”) 仍会发生写入。以下是我得到的异常:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode="/spark-deploy/out/_temporary/0":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy40.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:558)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3000)
... 45 more

如果有任何建议,请告诉我。

最佳答案

这可能更像是评论而不是答案,但由于太长,我把它放在这里。我没有尝试过这个,因为我没有环境来测试它。请尝试让我知道这是否有效(如果无效,我将删除此答案)。

稍微看一下代码,它看起来像 DFSClient creates a proxy使用 createProxyWithClientProtocol使用 UserGroupInformation.getCurrentUser()(我没有追踪 createHAProxy 分支,但我怀疑那里有相同的逻辑)。然后将此信息发送到服务器进行身份验证。

这意味着你需要改变什么UserGroupInformation.getCurrentUser()在您的特定调用的上下文中返回。这就是UserGroupInformation.doAs应该这样做,您只需要获得一个正确的 UserGroupInformation 实例。而在简单认证的情况下UserGroupInformation.createRemoteUser可能真的有用。

所以我建议尝试这样的事情:

...
val rddCount = rdd.count()
if(rddCount > 0) {
val remoteUgi = UserGroupInformation.createRemoteUser("hdfsUserName")
remoteUgi.doAs(() => { rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf) })
}

关于scala - 如果我有身份验证详细信息,如何使用 spark 编程 API 写入 HDFS?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50083275/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com