gpt4 book ai didi

python - 单元测试 lambda 脚本

转载 作者:行者123 更新时间:2023-11-28 17:55:37 25 4
gpt4 key购买 nike

我使用 python 和 boto3 编写了一个 lambda 脚本来管理 Amazon Machine Images 的生命周期。该脚本运行良好,但当我意识到我必须为其编写单元测试时,我的噩梦开始了。我不是开发人员,我习惯以系统管理员身份编写脚本。

我已经为具有返回状态的函数创建了单元测试,如下所示,我工作正常。

def get_interface_wrapper(region, service, interface_type):
interface_types = ['client', 'resource']
interface = None

if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
interface = ("boto3." + interface_type +
"(" + "service_name=service," + "region_name=region)")

return interface

def get_interface(region, service, interface_type):
return eval(get_interface_wrapper(region, service, interface_type))

#Unit tests
def test_get_interface_client(self):

service = 'ec2'
interface_expression = 'boto3.client(service_name=service,region_name=region)'
client_interface = get_interface_wrapper(
self.region, service, 'client')
self.assertEqual(client_interface, interface_expression)


def test_get_interface_resource(self):

service = 'ec2'
interface_expression = 'boto3.resource(service_name=service,region_name=region)'
resource_interface = get_interface_wrapper(
self.region, service, 'resource')
self.assertEqual(resource_interface, interface_expression)

但是,对于以下没有 return 语句且依赖 AWS 端点的函数,我很难理解它。我如何模拟端点或如何更改我的代码以创建不依赖 AWS 端点的单元测试。

def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
for action in actions:

action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
put_log_events(logs_client, log_group, log_stream, [action])

# The tag packer_ami_state_tagging_date is not set
if (action['is_timestamp_present'] == True):

if (action['action'] == 'update'):
# The tag packer_ami_state_tagging_date is set, so update the state and tagging date
try:

ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]

except Exception as e:

operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]

finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)

if (action['action'] == 'delete'):
image = ec2_client.Image(action['ImageId'])
snapshots = []
for blockDevMapping in image.block_device_mappings:
if 'Ebs' in blockDevMapping:
snapshots.append(blockDevMapping['Ebs']['SnapshotId'])

try:
image.deregister(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]

except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]

finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)

counter = 1
for snapshotID in snapshots:
snapshot = ec2_client.Snapshot(snapshotID)

try:
snapshot.delete(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]

except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]

finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)

counter += 1

if (action['action'] == 'none'):
action.update(
{'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})

operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]

put_log_events(logs_client, log_group,
log_stream, operation_result)

else:
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]

except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]

finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)


def put_log_events(client, log_group_name, log_stream_name, log_events):
log_stream = client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=log_stream_name
)

if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
response = {
'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
else:
response = {}

for log_event in log_events:
if bool(response):
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
sequenceToken=response['nextSequenceToken']
)
else:
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
)

最佳答案

我建议您在内置 unittest.mock library 中使用补丁.我用它来模拟所有 boto3 调用,所以我从来没有打过真正的 AWS 服务。有很多选项,但这里有一个模拟客户端的简单示例。

假设您在名为“my_code”的模块中有代码导入 boto3 并调用“ssm”boto3 客户端以获取 get_parameters_by_path 函数。您可以使用如下代码模拟它:

from unittest.mock import patch, MagicMock

...

@patch('my_app.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
mock_client = MagicMock()
mock_boto.client.return_value = mock_client
mock_client.get_parameters_by_path.return_value = helper_function()

my_param = my_code.my_function_being_tested_that_fetches_a_parameter('/TEST_APP/CI/secure_string_test')

self.assertEqual(my_param, 'secure string value')


def helper_function():
return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
'Type': 'SecureString',
'Value': 'secure string value',
'Version': 1,
'LastModifiedDate': datetime.datetime(2019, 8, 8, 14, 44, 26, 878000, tzinfo=datetime.timezone.utc),
'ARN': 'arn:aws:ssm:us-east-1:999478573200:parameter/TEST_APP/CI/secure_string_test'}], ResponseMetadata': {'RequestId': 'b9f016a4-485d-80d2-a504-015d081d8603',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'b9f016a4-475d-40d2-a504-015d981d8603',
'content-type': 'application/x-amz-json-1.1',
'content-length': '666',
'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
'RetryAttempts': 0}
}

我将模拟的返回值放在一个单独的辅助函数中,因为这不是本示例的重点,并且是您需要 boto3 模拟作为返回的任何 JSON。如果您不熟悉 unittest 模拟和修补,您将不得不稍微尝试一下使用它,但我自己已经这样做了,我可以证明它将更优雅地解决这些类型的 boto3 单元测试问题。

@patch 注释允许您用您创建的模型调用替换真实的 boto3 库。注释声明了您要修补的导入函数,并且需要函数签名中的相应变量(本例中为 mock_boto)。接下来的几行设置了当我正在测试的函数中的代码调用 boto3.client() 时要返回的对象,然后下面的行设置了当代码调用客户端对象的 get_parameters_by_path 函数时应该返回的内容.修补程序在您的模拟中具有类似 assert_called_once 的函数,以验证该函数是否按预期调用,因此即使该函数未返回任何内容,您也可以模拟它。

关于python - 单元测试 lambda 脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58652322/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com