gpt4 book ai didi

python - 如何使用 SSL 将 PySpark 连接到 Elasticsearch 并验证设置为 False 的证书?

转载 作者:行者123 更新时间:2023-12-01 08:22:07 28 4
gpt4 key购买 nike

之前,我已使用以下代码直接从 Python 成功连接到 Elasticsearch 集群:

ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

es = Elasticsearch(
ES_HOST,
http_auth=(ES_USERNAME, ES_PASSWORD),
scheme="https",
port=ES_PORT,
use_ssl=True,
verify_certs=False,
ssl_context=ssl_context,
ca_certs=False
)

现在我尝试使用 Pyspark to Elasticsearch 连接器连接到同一个集群。 Spark 已设置版本 2.4.0Hadoop 2.7。我正在使用 elasticsearch-hadoop-6.1.1 连接两者。

我使用以下配置将 PySpark 与 ES 连接:

es_live_conf = {

"es.nodes" : ES_HOST,

"es.port" : ES_PORT,

"es.resource" : 'testindex/testdoc',

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.ssl":"true",

"es.nodes.resolve.hostname": "false",

"es.net.ssl.cert.allow.self.signed": "true"

}

然后使用此代码激活连接:

sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN")

es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_live_conf)

在 Restful 调用上启用 TRACE 日志记录时,我收到以下错误:

19/02/07 07:00:31 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:31 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:31 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:31 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:31 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:31 TRACE CommonsHttpTransport:Tx [HEAD]@[xx.xx.xx.xx:xxxxx][testindex]?[null] w/ payload [null]
19/02/07 07:00:31 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:31 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:35 TRACE CommonsHttpTransport: Rx @[xx.xx.xx.xx] [404-Not Found] [null]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:35 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:35 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:35 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][]?[null] w/ payload [null]
19/02/07 07:00:35 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:35 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:39 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{
"name" : "iad1esapp2vz742",
"cluster_name" : "68fc89bc2c36e7188782e4f226ed3948",
"cluster_uuid" : "7xk6py25R_mRgLgLTYeavg",
"version" : {
"number" : "5.6.5",
"build_hash" : "6a37571",
"build_date" : "2017-12-04T07:50:10.466Z",
"build_snapshot" : false,
"lucene_version" : "6.6.1"
},
"tagline" : "You Know, for Search"
}
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:39 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:39 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:39 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:39 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:39 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:43 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{"_nodes":{"total":14,"successful":14,"failed":0},"cluster_name":"68fc89bc2c36e7188782e4f226ed3948","nodes":{"VJl4scTeQbuPmnXcsb5MOA":{"name":"iad1esdn20vz166","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn20"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"MEq2hOGkSdeBMldoBiv2AQ":{"name":"iad1esdn23vz187","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn23"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"AHDBAMeQTg2LMyhvnxYxeQ":{"name":"iad1esmst4vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst4"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"nJjtbzRuTP2Bkp9E0EOLBw":{"name":"iad1esapp0vz755","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp0"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5PrI09xWQRuNHPPUYCKOcQ":{"name":"iad1esdn31vz12","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn31"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"f1l6borzQt6-d_QEy9hY9Q":{"name":"iad1esapp1vz782","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp1"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"bO6AoGXFSgGzrISe-pCW8g":{"name":"iad1esdn21vz99","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn21"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"z7UA-JHlQ7SCNyUYfbwR1A":{"name":"iad1esmst5vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst5"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"53RAotd0QbiE28swz3fyOg":{"name":"iad1esapp2vz742","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp2"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5Lje4zCgSryonTouZMBqyA":{"name":"iad1esdn22vz156","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn22"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"YHFxFv5SQzCt9OesKD_a-g":{"name":"iad1esapp3vz748","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp3"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"yZOAHjgoSz-Qi_eNO7HHxg":{"name":"iad1esmst3vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst3"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"kg9NYEJiRy60InOxIh796A":{"name":"iad1esdn1vz163","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn1"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"PvQ0PobMTaSNnzw0l03z4A":{"name":"iad1esdn13vz80","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn13"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}}}}]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:43 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:43 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:43 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:43 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:43 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
[I 07:01:55.454 NotebookApp] Saving file at /work/sth-baseline.ipynb
[W 07:01:55.455 NotebookApp] Notebook work/sth-baseline.ipynb is not trusted
19/02/07 07:05:48 TRACE NetworkClient: Caught exception while performing request [xx.xx.xx.xx:xxxxx][_nodes/http] - falling back to the next node in line...
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:666)
at sun.security.ssl.SSLSocketImpl.<init>(SSLSocketImpl.java:471)
at sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:153)
at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:129)
at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:430)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:112)
at org.elasticsearch.hadoop.rest.RestClient.getHttpDataNodes(RestClient.java:129)
at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:157)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:405)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:386)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:302)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
19/02/07 07:05:48 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:05:48 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:05:48 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:05:48 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 ERROR NetworkClient: Node [xx.xx.xx.xx:xxxxx] failed (Connection refused (Connection refused)); selected next node [xx.xx.xx.xx:xxxxx]

我应该注意到,使用与本地 ES 集群连接的配置的相同代码可以成功运行。因此,该错误纯粹与远程集群的 SSL 连接有关。我可以使用命令 curl --insecure --user ES_USER:ES_PASS -XGET 'https://ES_HOST:ES_PORT/' 从客户端环境访问集群 - 所以看起来客户端应该能够连接。

如何使用正确的 ssl 详细信息设置 Spark ES 连接?我尝试删除配置中的 es.net.ssl.cert.allow.self.signed 和 es.nodes.resolve.hostname 属性,但仍然收到相同的结果错误。

最佳答案

好的,明白了。

答案详述如下:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html

简单地说,正如您在发布的日志中看到的那样,客户端能够连接到集群并获取节点列表。一旦获得该列表,它就会尝试连接到它们,但它们在专用网络上都无法访问。本质上,您需要关闭直接与集群节点连接的功能,尽管值得注意的是这可能会影响性能。执行此操作的设置是 "es.nodes.wan.only": "true",尽管我也将 "es.nodes.discovery": "false" 设置为弄清楚这一点,所以发现设置可能不是必需的,但我将把它留在这里以防它有用。

关于python - 如何使用 SSL 将 PySpark 连接到 Elasticsearch 并验证设置为 False 的证书?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54568355/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com