gpt4 book ai didi

python - Pyspark:使用私钥连接到 Snowflake 时出错

转载 作者:行者123 更新时间:2023-12-01 06:42:55 26 4
gpt4 key购买 nike

我希望创建一个从 Snowflake 读取查询的 ETL 流程。大多数在线示例都展示了如何使用常规字符串密码建立连接,但我的公司设置密码的方式是通过私钥。不幸的是,当我尝试将私钥作为参数传递时,它返回以下错误:

Traceback (most recent call last):
File "/Users/rihun/PycharmProjects/snowflake_gcp_etl/loader.py", line 61, in <module>
.option("query", query) \
File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py", line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Input PEM private key is invalid'

代码示例:

import findspark
findspark.init()

import pyspark
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.6.24,net.snowflake:spark-snowflake_2.11:2.4.12-spark_2.3 pyspark-shell'

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from snowflake_connector import get_keeper_token, get_snowflake_credentials

spark = SparkSession.builder.master('local').appName('Snowflake Loader').config('spark.driver.memory', '5G').getOrCreate()

spark.builder.config('spark.executor.memory', '16G')
spark.builder.config('spark.executor.cores', '4')

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

sf_creds = get_snowflake_credentials(keeper_token=get_keeper_token())

sfOptions = {
"sfURL": sf_creds['sfURL'],
"sfAccount": sf_creds['sfAccount'],
"sfUser": sf_creds['sfUser'],
"pem_private_key": sf_creds['sfPrivateKey'],
"sfDatabase": sf_creds['sfDatabase'],
"sfSchema": sf_creds['sfSchema'],
"sfWarehouse": sf_creds['sfWarehouse'],
}

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", query) \
.load()

df.count()

我如何获取凭据

def get_snowflake_credentials(keeper_token: str,
keeper_url='<keeper_url>',
keeper_namespace='cloudDB',
keeper_secret_path='<path_to_key>',
sf_account='<sf_account>',
sf_svc_user='<user>',
sf_wh='<warehouse>',
sf_role='<role>',
sf_db='<db>',
sf_schema='<schema>'):
# Connect to Keeper to collect secrets
client = hvac.Client(
url=keeper_url,
namespace=keeper_namespace,
token=keeper_token
)

# Secrets are stored within the key entitled 'data'
keeper_secrets = client.read(keeper_secret_path)['data']
passphrase = keeper_secrets['SNOWSQL_PRIVATE_KEY_PASSPHRASE']
private_key = keeper_secrets['private_key']

# PEM key must be byte encoded
key = bytes(private_key, 'utf-8')

p_key = serialization.load_pem_private_key(
key
, password=passphrase.encode()
, backend=default_backend()
)

pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER
, format=serialization.PrivateFormat.PKCS8
, encryption_algorithm=serialization.NoEncryption())

sf_client = snowflake.connector.connect(
user=sf_svc_user
, account=sf_account
, warehouse=sf_wh
, role=sf_role
, database=sf_db
, schema=sf_schema
, private_key=pkb)

return {
"sfURL": "<url>",
"sfAccount": sf_account,
"sfUser": sf_svc_user,
"sfPrivateKey": pkb,
"sfDatabase": sf_db,
"sfSchema": sf_schema,
"sfWarehouse": sf_wh
}

最佳答案

Can you try with this code.

---------------------------------------------------------------------------------

#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
import subprocess
from pyspark.sql import SparkSession
import os
import logging
from logging import getLogger
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import re
from cryptography.hazmat.primitives.serialization import load_pem_private_key

v_log = '<path>/spark.log'

spark = SparkSession \
.builder \
.config("spark.jars", "<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.repl.local.jars",
"<path>/snowflake-jdbc-3.8.0.jar,<path>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

logging.basicConfig(
filename=v_log,
level=logging.DEBUG)
logger = getLogger(__name__)



with open("<path-to>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)

pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")


sfOptions = {
"sfURL": "<URL>",
"sfAccount": "sfcsupport",
"sfUser": "",
"sfDatabase": "",
"sfSchema": "PUBLIC",
"sfWarehouse": "",
"sfRole": "",
"pem_private_key":pkb
}




SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "Select * from <TableName>") \
.load()


df.show()

----------------------------------------------------------------------------

关于python - Pyspark:使用私钥连接到 Snowflake 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59367071/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com