gpt4 book ai didi

python - 方法参数不同时共享代码的继承最佳实践?

转载 作者:太空宇宙 更新时间:2023-11-04 05:27:16 25 4
gpt4 key购买 nike

我有一个 AWS Redshift 包装器类,可以为我自动从 S3 加载类似类型的负载,我最近将其改编为适用于 Spark 作业,不需要 list ,而是需要稍微不同的 COPY 语句。除了这一种方法,所有其他代码都是可转移和可重用的。由于方法参数不同,PyCharm 向我发出警告,我想知道是否有“最佳实践”方法来执行此操作。

class RedshiftLoader(PrettyStr):
def __init__(self,
s3_credentials=config3.S3_INFO,
redshift_db_credentials=config3.REDSHIFT_POSTGRES_INFO_PROD,
table_name=None,
schema_name=None,
dev_db_credentials=config3.REDSHIFT_POSTGRES_INFO,
safe_load=False,
truncate=False):
...
def copy_to_db(self, database_credentials, copy_from, manifest=False):
"""
Copies data from a file on S3 to a Redshift table. Data must be
properly formatted and in the right order, etc...

:param database_credentials: A dictionary containing the host, port,
database name, username, and password. Keys must match example:

REDSHIFT_POSTGRES_INFO = {
'host': REDSHIFT_HOST,
'port': REDSHIFT_PORT,
'database': REDSHIFT_DATABASE_DEV,
'user': REDSHIFT_USER,
'password': REDSHIFT_PASS
}
:param copy_from: The location of the file on the S3 server.
:param manifest: True if a manifest file is to be used in the copy
step, False otherwise.

:return: None
"""
if not self.table_name:
raise AttributeError('A table must be specified.')
s3_access = self.s3_credentials['aws_access_key_id']
s3_secret = self.s3_credentials['aws_secret_access_key']
manifest = 'MANIFEST' if manifest else ''
logger.info('Accessing {table}'.format(table=self.table_name))
try:
with ppg2.connect(**database_credentials) as conn:
cur = conn.cursor()

if self.truncate:
RedshiftLoader.truncate_table(self.table_name, cur)

load = '''
COPY {table}
FROM '{copy_from}'
CREDENTIALS 'aws_access_key_id={pub};aws_secret_access_key={priv}'
DELIMITER '|'
GZIP
TRIMBLANKS
TRUNCATECOLUMNS
ACCEPTINVCHARS
TIMEFORMAT 'auto'
DATEFORMAT 'auto'
{manifest}
'''.format(
table=self.table_name,
copy_from=copy_from,
pub=s3_access,
priv=s3_secret,
manifest=manifest
)
logger.info('Copying to {table}'.format(
table=self.table_name))
cur.execute(load)
conn.commit()
logger.info('Copy complete.')
except ppg2.Error as e:
logger.critical('Error occurred during load: {error}'.format(
error=e
))
raise

然后是子类:

class SparkRedshiftLoader(RedshiftLoader):
def copy_to_db(self, database_credentials, copy_from):
"""
Copies data from a file on S3 to a Redshift table. Data must be
properly formatted and in the right order, etc...

:param database_credentials: A dictionary containing the host, port,
database name, username, and password. Keys must match example:

REDSHIFT_POSTGRES_INFO = {
'host': REDSHIFT_HOST,
'port': REDSHIFT_PORT,
'database': REDSHIFT_DATABASE_DEV,
'user': REDSHIFT_USER,
'password': REDSHIFT_PASS
}
:param copy_from: The location of the file on the S3 server. Assumes
that it is being passed an 's3n' version of the path (common in Spark
and Hadoop) and will automatically convert to the proper format.

:return: None
"""
if not self.table_name:
raise AttributeError('A table must be specified.')
s3_access = self.s3_credentials['aws_access_key_id']
s3_secret = self.s3_credentials['aws_secret_access_key']
copy_from = copy_from.replace('s3n', 's3')
logging.info('Accessing {table}'.format(table=self.table_name))
try:
with ppg2.connect(**database_credentials) as conn:
cur = conn.cursor()

if self.truncate:
SparkRedshiftLoader.truncate_table(self.table_name, cur)

load = '''
COPY {table}
FROM '{copy_from}'
CREDENTIALS 'aws_access_key_id={pub};aws_secret_access_key={priv}'
DELIMITER '|'
GZIP
TRIMBLANKS
TRUNCATECOLUMNS
ACCEPTINVCHARS
TIMEFORMAT 'auto'
DATEFORMAT 'auto'
CSV
NULL 'null'
'''.format(
table=self.table_name,
copy_from=copy_from,
pub=s3_access,
priv=s3_secret,
)
logging.info('Copying to {table}'.format(
table=self.table_name))
cur.execute(load)
conn.commit()
logging.info('Copy complete.')
except ppg2.Error as e:
logging.info('Error occurred during load: {error}'.format(
error=e
))
raise

如您所见,子类将 manifest 作为参数删除,有一个 replace 语句在第一个中找不到,还有一个略有不同的 COPY命令。

最佳答案

RedshiftLoader._copy_to_db 定义为:

def _copy_to_db(self, database_credentials, copy_from, manifest):
"""
Copies data from a file on S3 to a Redshift table. Data must be
properly formatted and in the right order, etc...

:param database_credentials: A dictionary containing the host, port,
database name, username, and password. Keys must match example:

REDSHIFT_POSTGRES_INFO = {
'host': REDSHIFT_HOST,
'port': REDSHIFT_PORT,
'database': REDSHIFT_DATABASE_DEV,
'user': REDSHIFT_USER,
'password': REDSHIFT_PASS
}
:param copy_from: The location of the file on the S3 server.
:param manifest: True if a manifest file is to be used in the copy
step, False otherwise.

:return: None
"""
if not self.table_name:
raise AttributeError('A table must be specified.')
s3_access = self.s3_credentials['aws_access_key_id']
s3_secret = self.s3_credentials['aws_secret_access_key']
logger.info('Accessing {table}'.format(table=self.table_name))
try:
with ppg2.connect(**database_credentials) as conn:
cur = conn.cursor()

if self.truncate:
RedshiftLoader.truncate_table(self.table_name, cur)

load = '''
COPY {table}
FROM '{copy_from}'
CREDENTIALS 'aws_access_key_id={pub};aws_secret_access_key={priv}'
DELIMITER '|'
GZIP
TRIMBLANKS
TRUNCATECOLUMNS
ACCEPTINVCHARS
TIMEFORMAT 'auto'
DATEFORMAT 'auto'
{manifest}
'''.format(
table=self.table_name,
copy_from=copy_from,
pub=s3_access,
priv=s3_secret,
manifest=manifest
)
logger.info('Copying to {table}'.format(
table=self.table_name))
cur.execute(load)
conn.commit()
logger.info('Copy complete.')
except ppg2.Error as e:
logger.critical('Error occurred during load: {error}'.format(
error=e
))
raise

这个和RedshiftLoader.copy_to_db唯一的区别是manifest没有默认值,使用前不修改它的值。现在,在每个类中定义copy_to_db:

class RedshiftLoader(PrettyStr):

def copy_to_db(self, database_credentials, copy_from, manifest=False):
manifest = 'MANIFEST' if manifest else ''
self._copy_to_db(database_credentials, copy_from, manifest)

class SparkRedshiftLoader(RedshiftLoader):

def copy_to_db(self, database_credentials, copy_from):
copy_from = copy_from.replace('s3n', 's3')
self._copy_to_db(database_credentials, copy_from, "CSV NULL 'null'")

私有(private)方法抽象出所有公共(public)代码(几乎是全部);公共(public)方法提供了一个地方,可以以适合类的方式修改 copy_frommanifest 的值。

请注意,manifest 可能不是最好的参数名称,因为它的使用方式各不相同。但请注意,在这两种情况下,它只是附加到共享查询末尾的一些特定于类的 SQL。

可以使用您将 spark bool 值传递给 copy_to_db 的想法在一个类中完成相同的重构。

关于python - 方法参数不同时共享代码的继承最佳实践?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38334374/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com