- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我使用 python 和 boto3 编写了一个 lambda 脚本来管理 Amazon Machine Images 的生命周期。该脚本运行良好,但当我意识到我必须为其编写单元测试时,我的噩梦开始了。我不是开发人员,我习惯以系统管理员身份编写脚本。
我已经为具有返回状态的函数创建了单元测试,如下所示,我工作正常。
def get_interface_wrapper(region, service, interface_type):
interface_types = ['client', 'resource']
interface = None
if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
interface = ("boto3." + interface_type +
"(" + "service_name=service," + "region_name=region)")
return interface
def get_interface(region, service, interface_type):
return eval(get_interface_wrapper(region, service, interface_type))
#Unit tests
def test_get_interface_client(self):
service = 'ec2'
interface_expression = 'boto3.client(service_name=service,region_name=region)'
client_interface = get_interface_wrapper(
self.region, service, 'client')
self.assertEqual(client_interface, interface_expression)
def test_get_interface_resource(self):
service = 'ec2'
interface_expression = 'boto3.resource(service_name=service,region_name=region)'
resource_interface = get_interface_wrapper(
self.region, service, 'resource')
self.assertEqual(resource_interface, interface_expression)
但是,对于以下没有 return 语句且依赖 AWS 端点的函数,我很难理解它。我如何模拟端点或如何更改我的代码以创建不依赖 AWS 端点的单元测试。
def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
for action in actions:
action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
put_log_events(logs_client, log_group, log_stream, [action])
# The tag packer_ami_state_tagging_date is not set
if (action['is_timestamp_present'] == True):
if (action['action'] == 'update'):
# The tag packer_ami_state_tagging_date is set, so update the state and tagging date
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
if (action['action'] == 'delete'):
image = ec2_client.Image(action['ImageId'])
snapshots = []
for blockDevMapping in image.block_device_mappings:
if 'Ebs' in blockDevMapping:
snapshots.append(blockDevMapping['Ebs']['SnapshotId'])
try:
image.deregister(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter = 1
for snapshotID in snapshots:
snapshot = ec2_client.Snapshot(snapshotID)
try:
snapshot.delete(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter += 1
if (action['action'] == 'none'):
action.update(
{'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]
put_log_events(logs_client, log_group,
log_stream, operation_result)
else:
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
def put_log_events(client, log_group_name, log_stream_name, log_events):
log_stream = client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=log_stream_name
)
if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
response = {
'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
else:
response = {}
for log_event in log_events:
if bool(response):
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
sequenceToken=response['nextSequenceToken']
)
else:
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
)
最佳答案
我建议您在内置 unittest.mock library 中使用补丁.我用它来模拟所有 boto3 调用,所以我从来没有打过真正的 AWS 服务。有很多选项,但这里有一个模拟客户端的简单示例。
假设您在名为“my_code”的模块中有代码导入 boto3 并调用“ssm”boto3 客户端以获取 get_parameters_by_path 函数。您可以使用如下代码模拟它:
from unittest.mock import patch, MagicMock
...
@patch('my_app.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
mock_client = MagicMock()
mock_boto.client.return_value = mock_client
mock_client.get_parameters_by_path.return_value = helper_function()
my_param = my_code.my_function_being_tested_that_fetches_a_parameter('/TEST_APP/CI/secure_string_test')
self.assertEqual(my_param, 'secure string value')
def helper_function():
return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
'Type': 'SecureString',
'Value': 'secure string value',
'Version': 1,
'LastModifiedDate': datetime.datetime(2019, 8, 8, 14, 44, 26, 878000, tzinfo=datetime.timezone.utc),
'ARN': 'arn:aws:ssm:us-east-1:999478573200:parameter/TEST_APP/CI/secure_string_test'}], ResponseMetadata': {'RequestId': 'b9f016a4-485d-80d2-a504-015d081d8603',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'b9f016a4-475d-40d2-a504-015d981d8603',
'content-type': 'application/x-amz-json-1.1',
'content-length': '666',
'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
'RetryAttempts': 0}
}
我将模拟的返回值放在一个单独的辅助函数中,因为这不是本示例的重点,并且是您需要 boto3 模拟作为返回的任何 JSON。如果您不熟悉 unittest 模拟和修补,您将不得不稍微尝试一下使用它,但我自己已经这样做了,我可以证明它将更优雅地解决这些类型的 boto3 单元测试问题。
@patch
注释允许您用您创建的模型调用替换真实的 boto3 库。注释声明了您要修补的导入函数,并且需要函数签名中的相应变量(本例中为 mock_boto
)。接下来的几行设置了当我正在测试的函数中的代码调用 boto3.client() 时要返回的对象,然后下面的行设置了当代码调用客户端对象的 get_parameters_by_path 函数时应该返回的内容.修补程序在您的模拟中具有类似 assert_called_once
的函数,以验证该函数是否按预期调用,因此即使该函数未返回任何内容,您也可以模拟它。
关于python - 单元测试 lambda 脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58652322/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!