- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Python reques接口测试框架实现代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
1、框架菜单 。
1.1 common模块 。
1.2 其他 。
2、Excel接口测试案例编写 。
3、读取Excel测试封装(核心封装) 。
excel_utils.py 读取Excel中的数据 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import
os
import
xlrd
#内置模块、第三方模块pip install 自定义模块
class
ExcelUtils():
def
__init__(
self
,file_path,sheet_name):
self
.file_path
=
file_path
self
.sheet_name
=
sheet_name
self
.sheet
=
self
.get_sheet()
# 整个表格对象
def
get_sheet(
self
):
wb
=
xlrd.open_workbook(
self
.file_path)
sheet
=
wb.sheet_by_name(
self
.sheet_name)
return
sheet
def
get_row_count(
self
):
row_count
=
self
.sheet.nrows
return
row_count
def
get_col_count(
self
):
col_count
=
self
.sheet.ncols
return
col_count
def
__get_cell_value(
self
,row_index, col_index):
cell_value
=
self
.sheet.cell_value(row_index,col_index)
return
cell_value
def
get_merged_info(
self
):
merged_info
=
self
.sheet.merged_cells
return
merged_info
def
get_merged_cell_value(
self
,row_index, col_index):
"""既能获取普通单元格的数据又能获取合并单元格数据"""
cell_value
=
None
for
(rlow, rhigh, clow, chigh)
in
self
.get_merged_info():
if
(row_index >
=
rlow
and
row_index < rhigh):
if
(col_index >
=
clow
and
col_index < chigh):
cell_value
=
self
.__get_cell_value(rlow, clow)
break
;
# 防止循环去进行判断出现值覆盖的情况
else
:
cell_value
=
self
.__get_cell_value(row_index, col_index)
else
:
cell_value
=
self
.__get_cell_value(row_index, col_index)
return
cell_value
def
get_sheet_data_by_dict(
self
):
all_data_list
=
[]
first_row
=
self
.sheet.row(
0
)
#获取首行数据
for
row
in
range
(
1
,
self
.get_row_count()):
row_dict
=
{}
for
col
in
range
(
0
,
self
.get_col_count()):
row_dict[first_row[col].value]
=
self
.get_merged_cell_value(row, col)
all_data_list.append(row_dict)
return
all_data_list
if
__name__
=
=
'__main__'
:
current_path
=
os.path.dirname(__file__)
excel_path
=
os.path.join( current_path,
'..'
,
'samples/data/test_case.xlsx'
)
excelUtils
=
ExcelUtils(excel_path,
"Sheet1"
)
for
row
in
excelUtils.get_sheet_data_by_dict():
print
( row )
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import
os
from
common.excel_utils
import
ExcelUtils
from
common.config_utils
import
config
current_path
=
os.path.dirname(__file__)
test_data_path
=
os.path.join( current_path,
'..'
, config.CASE_DATA_PATH )
class
TestdataUtils():
def
__init__(
self
,test_data_path
=
test_data_path):
self
.test_data_path
=
test_data_path
self
.test_data
=
ExcelUtils(test_data_path,
"Sheet1"
).get_sheet_data_by_dict()
self
.test_data_by_mysql
=
SqlUtils().get_mysql_test_case_info()
def
__get_testcase_data_dict(
self
):
testcase_dict
=
{}
for
row_data
in
self
.test_data:
testcase_dict.setdefault( row_data[
'测试用例编号'
],[] ).append( row_data )
return
testcase_dict
def
def_testcase_data_list(
self
):
testcase_list
=
[]
for
k,v
in
self
.__get_testcase_data_dict().items():
one_case_dict
=
{}
one_case_dict[
"case_id"
]
=
k
one_case_dict[
"case_info"
]
=
v
testcase_list.append( one_case_dict )
return
testcase_list
if
__name__
=
=
"__main__"
:
testdataUtils
=
TestdataUtils()
for
i
in
testdataUtils.def_testcase_data_list():
print
( i )
|
testdata_utils.py 读取Excel中的数据后处理成需要的数据 。
4、request封装(核心封装) 。
requests_utils.py 包含post请求,get请求,异常,调用断言 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import
ast
import
re
import
requests
import
jsonpath
from
requests.exceptions
import
RequestException
from
requests.exceptions
import
ProxyError
from
requests.exceptions
import
ConnectionError
from
common.config_utils
import
config
from
common.check_utils
import
CheckUtils
class
RequestsUtils():
def
__init__(
self
):
self
.hosts
=
config.hosts
self
.headers
=
{
"ContentType"
:
"application/json;charset=utf-8"
}
self
.session
=
requests.session()
self
.temp_variables
=
{}
def
__get(
self
,get_info):
try
:
url
=
self
.hosts
+
get_info[
"请求地址"
]
response
=
self
.session.get( url
=
url,
params
=
ast.literal_eval(get_info[
"请求参数(get)"
])
)
response.encoding
=
response.apparent_encoding
if
get_info[
"取值方式"
]
=
=
"json取值"
:
value
=
jsonpath.jsonpath( response.json(),get_info[
"取值代码"
] )[
0
]
self
.temp_variables[ get_info[
"传值变量"
] ]
=
value
elif
get_info[
"取值方式"
]
=
=
"正则取值"
:
value
=
re.findall(get_info[
"取值代码"
],response.text)[
0
]
self
.temp_variables[get_info[
"传值变量"
]]
=
value
result
=
CheckUtils(response).run_check(get_info[
'期望结果类型'
], get_info[
'期望结果'
])
except
ProxyError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:代理错误异常'
%
(get_info[
"接口名称"
])}
except
ConnectionError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:连接超时异常'
%
(get_info[
"接口名称"
])}
except
RequestException as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:Request异常,原因:%s'
%
(get_info[
"接口名称"
], e.__str__())}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:系统异常,原因:%s'
%
(get_info[
"接口名称"
],e.__str__())}
return
result
def
__post(
self
,post_info):
try
:
url
=
self
.hosts
+
post_info[
"请求地址"
]
response
=
self
.session.post( url
=
url,
headers
=
self
.headers,
params
=
ast.literal_eval(post_info[
"请求参数(get)"
]),
# data = post_infos["提交数据(post)"],
json
=
ast.literal_eval(post_info[
"提交数据(post)"
])
)
response.encoding
=
response.apparent_encoding
if
post_info[
"取值方式"
]
=
=
"json取值"
:
value
=
jsonpath.jsonpath( response.json(),post_info[
"取值代码"
] )[
0
]
self
.temp_variables[ post_info[
"传值变量"
] ]
=
value
elif
post_info[
"取值方式"
]
=
=
"正则取值"
:
value
=
re.findall(post_info[
"取值代码"
],response.text)[
0
]
self
.temp_variables[post_info[
"传值变量"
]]
=
value
#调用CheckUtils()
result
=
CheckUtils(response).run_check(post_info[
'期望结果类型'
],post_info[
'期望结果'
])
except
ProxyError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:代理错误异常'
%
(post_info[
"接口名称"
])}
except
ConnectionError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:连接超时异常'
%
(post_info[
"接口名称"
])}
except
RequestException as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:Request异常,原因:%s'
%
(post_info[
"接口名称"
], e.__str__())}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:系统异常,原因:%s'
%
(post_info[
"接口名称"
],e.__str__())}
return
result
def
request(
self
,step_info):
try
:
request_type
=
step_info[
"请求方式"
]
param_variable_list
=
re.findall(
'\\${\w+}'
, step_info[
"请求参数(get)"
])
if
param_variable_list:
for
param_variable
in
param_variable_list:
step_info[
"请求参数(get)"
]
=
step_info[
"请求参数(get)"
]\
.replace(param_variable,
'"%s"'
%
self
.temp_variables.get(param_variable[
2
:
-
1
]))
if
request_type
=
=
"get"
:
result
=
self
.__get( step_info )
elif
request_type
=
=
"post"
:
data_variable_list
=
re.findall(
'\\${\w+}'
, step_info[
"提交数据(post)"
])
if
data_variable_list:
for
param_variable
in
data_variable_list:
step_info[
"提交数据(post)"
]
=
step_info[
"提交数据(post)"
] \
.replace(param_variable,
'"%s"'
%
self
.temp_variables.get(param_variable[
2
:
-
1
]))
result
=
self
.__post( step_info )
else
:
result
=
{
'code'
:
1
,
'result'
:
'请求方式不支持'
}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'用例编号[%s]的[%s]步骤出现系统异常,原因:%s'
%
(step_info[
'测试用例编号'
],step_info[
"测试用例步骤"
],e.__str__())}
return
result
def
request_by_step(
self
,step_infos):
self
.temp_variables
=
{}
for
step_info
in
step_infos:
temp_result
=
self
.request( step_info )
# print( temp_result )
if
temp_result[
'code'
]!
=
0
:
break
return
temp_result
if
__name__
=
=
"__main__"
:
case_info
=
[
{
'请求方式'
:
'get'
,
'请求地址'
:
'/cgi-bin/token'
,
'请求参数(get)'
:
'{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}'
,
'提交数据(post)'
: '
', '
取值方式
': '
json取值
', '
传值变量
': '
token
', '
取值代码
': '
$.access_token
', '
期望结果类型
': '
正则匹配
', '
期望结果
': '
{
"access_token"
:
"(.+?)"
,
"expires_in"
:(.
+
?)}'},
{
'请求方式'
:
'post'
,
'请求地址'
:
'/cgi-bin/tags/create'
,
'请求参数(get)'
:
'{"access_token":${token}}'
,
'提交数据(post)'
:
'{"tag" : {"name" : "衡东"}}'
,
'取值方式'
:
'无'
,
'传值变量'
: '
', '
取值代码
': '
', '
期望结果类型
': '
正则匹配
', '
期望结果
': '
{
"tag"
:{
"id"
:(.
+
?),
"name"
:
"衡东"
}}'}
]
RequestsUtils().request_by_step(case_info)
|
5、断言封装(核心封装) 。
check_utils.py 断言封装,与实际结果核对 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
import
re
import
ast
class
CheckUtils():
def
__init__(
self
,check_response
=
None
):
self
.ck_response
=
check_response
self
.ck_rules
=
{
'无'
:
self
.no_check,
'json键是否存在'
:
self
.check_key,
'json键值对'
:
self
.check_keyvalue,
'正则匹配'
:
self
.check_regexp
}
self
.pass_result
=
{
'code'
:
0
,
'response_reason'
:
self
.ck_response.reason,
'response_code'
:
self
.ck_response.status_code,
'response_headers'
:
self
.ck_response.headers,
'response_body'
:
self
.ck_response.text,
'check_result'
:
True
,
'message'
: ''
# 扩招作为日志输出等
}
self
.fail_result
=
{
'code'
:
2
,
'response_reason'
:
self
.ck_response.reason,
'response_code'
:
self
.ck_response.status_code,
'response_headers'
:
self
.ck_response.headers,
'response_body'
:
self
.ck_response.text,
'check_result'
:
False
,
'message'
: ''
# 扩招作为日志输出等
}
def
no_check(
self
):
return
self
.pass_result
def
check_key(
self
,check_data
=
None
):
check_data_list
=
check_data.split(
','
)
#把需要判断的值做切割,取出键值
res_list
=
[]
#存放每次比较的结果
wrong_key
=
[]
#存放比较失败key
for
check_data
in
check_data_list:
#把切割的键值和取出响应结果中的所有的键一个一个对比
if
check_data
in
self
.ck_response.json().keys():
res_list.append(
self
.pass_result )
else
:
res_list.append(
self
.fail_result )
wrong_key.append(check_data)
#把失败的键放进来,便于后续日志输出
# print(res_list)
# print(wrong_key)
if
self
.fail_result
in
res_list:
return
self
.fail_result
else
:
return
self
.pass_result
def
check_keyvalue(
self
,check_data
=
None
):
res_list
=
[]
# 存放每次比较的结果
wrong_items
=
[]
# 存放比较失败 items
for
check_item
in
ast.literal_eval(check_data).items():
#literal_eval()安全性的把字符串转成字典,items()取出键值对
if
check_item
in
self
.ck_response.json().items():
res_list.append(
self
.pass_result )
else
:
res_list.append(
self
.fail_result )
wrong_items.append(check_item)
# print( res_list )
# print( wrong_items )
if
self
.fail_result
in
res_list:
return
self
.fail_result
else
:
return
self
.pass_result
def
check_regexp(
self
,check_data
=
None
):
pattern
=
re.
compile
(check_data)
if
re.findall(pattern
=
pattern,string
=
self
.ck_response.text):
#匹配到了,不为空,为true
return
self
.pass_result
else
:
return
self
.fail_result
def
run_check(
self
,check_type
=
None
,check_data
=
None
):
code
=
self
.ck_response.status_code
if
code
=
=
200
:
if
check_type
in
self
.ck_rules.keys():
result
=
self
.ck_rules[check_type](check_data)
return
result
else
:
self
.fail_result[
'message'
]
=
'不支持%s判断方法'
%
check_type
return
self
.fail_result
else
:
self
.fail_result[
'message'
]
=
'请求的响应状态码非%s'
%
str
(code)
return
self
.fail_result
if
__name__
=
=
"__main__"
:
# 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
CheckUtils({
"access_token"
:
"hello"
,
"expires_"
:
7200
}).check_key(
"access_token,expires_in"
)
#检查键值对是否存在
CheckUtils({
"access_token"
:
"hello"
,
"expires_i"
:
7200
}).check_keyvalue(
'{"expires_in": 7200}'
)
#正则对比
#TURE
print
(CheckUtils(
'{"access_token":"hello","expires_in":7200}'
).check_regexp(
'"expires_in":(.+?)'
))
#False
print
(CheckUtils(
'{"access_token":"hello","expires":7200}'
).check_regexp(
'"expires_in":(.+?)'
))
|
6、api_testcase下的api_test.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import
warnings
import
unittest
import
paramunittest
from
common.testdata_utils
import
TestdataUtils
from
common.requests_utils
import
RequestsUtils
#如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list()
case_infos
=
TestdataUtils().def_testcase_data_list_by_mysql()
@paramunittest
.parametrized(
*
case_infos
)
class
APITest(paramunittest.ParametrizedTestCase):
def
setUp(
self
)
-
>
None
:
warnings.simplefilter(
'ignore'
, ResourceWarning)
#不会弹出警告提示
def
setParameters(
self
, case_id, case_info):
self
.case_id
=
case_id
self
.case_info
=
case_info
def
test_api_common_function(
self
):
'''测试描述'''
self
._testMethodName
=
self
.case_info[
0
].get(
"测试用例编号"
)
self
._testMethodDoc
=
self
.case_info[
0
].get(
"测试用例名称"
)
actual_result
=
RequestsUtils().request_by_step(
self
.case_info)
self
.assertTrue( actual_result.get(
'check_result'
),actual_result.get(
'message'
) )
if
__name__
=
=
'__main__'
:
unittest.main()
|
7、common下的log_utils.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import
os
import
logging
import
time
from
common.config_utils
import
config
current_path
=
os.path.dirname(__file__)
log_output_path
=
os.path.join( current_path,
'..'
, config.LOG_PATH )
class
LogUtils():
def
__init__(
self
,log_path
=
log_output_path):
self
.log_name
=
os.path.join( log_output_path ,
'ApiTest_%s.log'
%
time.strftime(
'%Y_%m_%d'
) )
self
.logger
=
logging.getLogger(
"ApiTestLog"
)
self
.logger.setLevel( config.LOG_LEVEL )
console_handler
=
logging.StreamHandler()
# 控制台输出
file_handler
=
logging.FileHandler(
self
.log_name,
'a'
,encoding
=
'utf-8'
)
# 文件输出
formatter
=
logging.Formatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
self
.logger.addHandler( console_handler )
self
.logger.addHandler( file_handler )
console_handler.close()
# 防止打印日志重复
file_handler.close()
# 防止打印日志重复
def
get_logger(
self
):
return
self
.logger
logger
=
LogUtils().get_logger()
# 防止打印日志重复
if
__name__
=
=
'__main__'
:
logger.info(
'hello'
)
|
8、common下的config_utils.py的封装 。
配置文件的编写:
对配置文件的读取封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
import
os
import
configparser
current_path
=
os.path.dirname(__file__)
cfgpath
=
os.path.join(current_path,
"../conf/local_config.ini"
)
print
(cfgpath)
class
ConfigUtils:
def
__init__(
self
,config_path
=
cfgpath):
self
.__conf
=
configparser.ConfigParser()
self
.__conf.read(config_path, encoding
=
"utf-8"
)
def
read_ini(
self
,sec,option):
value
=
self
.__conf.get(sec,option)
return
value
@property
def
hosts(
self
):
value
=
self
.read_ini(
'default'
,
'hosts'
)
return
value
@property
def
LOG_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'LOG_PATH'
)
return
value
@property
def
CASE_DATA_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'CASE_DATA_PATH'
)
return
value
@property
def
REPORT_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'REPORT_PATH'
)
return
value
@property
def
LOG_LEVEL(
self
):
value
=
int
(
self
.read_ini(
'log'
,
'LOG_LEVEL'
))
return
value
@property
def
smtp_server(
self
):
smtp_server_value
=
self
.read_ini(
'email'
,
'smtp_server'
)
return
smtp_server_value
@property
def
smtp_sender(
self
):
smtp_sender_value
=
self
.read_ini(
'email'
,
'smtp_sender'
)
return
smtp_sender_value
@property
def
smtp_password(
self
):
smtp_password_value
=
self
.read_ini(
'email'
,
'smtp_password'
)
return
smtp_password_value
@property
def
smtp_receiver(
self
):
smtp_receiver_value
=
self
.read_ini(
'email'
,
'smtp_receiver'
)
return
smtp_receiver_value
@property
def
smtp_cc(
self
):
smtp_cc_value
=
self
.read_ini(
'email'
,
'smtp_cc'
)
return
smtp_cc_value
@property
def
smtp_subject(
self
):
smtp_subject_value
=
self
.read_ini(
'email'
,
'smtp_subject'
)
return
smtp_subject_value
config
=
ConfigUtils()
if
__name__
=
=
'__main__'
:
current_path
=
os.path.dirname(__file__)
cfgpath
=
os.path.join(current_path,
"../conf/local_config.ini"
)
config_u
=
ConfigUtils()
print
(config_u.hosts)
print
(config_u.LOG_LEVEL)
|
9、test_runner下的run_case.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
class
RunCase():
def
__init__(
self
):
self
.test_case_path
=
test_case_path
self
.report_path
=
test_report_path
self
.title
=
'P1P2接口自动化测试报告'
self
.description
=
'自动化接口测试框架'
self
.tester
=
'测试开发组'
def
load_test_suite(
self
):
discover
=
unittest.defaultTestLoader.discover(start_dir
=
self
.test_case_path,
pattern
=
'api_test.py'
,
top_level_dir
=
self
.test_case_path)
all_suite
=
unittest.TestSuite()
all_suite.addTest( discover )
return
all_suite
def
run(
self
):
report_dir
=
HTMLTestReportCN.ReportDirectory(
self
.report_path)
report_dir.create_dir(
self
.title)
report_file_path
=
HTMLTestReportCN.GlobalMsg.get_value(
'report_path'
)
fp
=
open
( report_file_path ,
'wb'
)
runner
=
HTMLTestReportCN.HTMLTestRunner(stream
=
fp,
title
=
self
.title,
description
=
self
.description,
tester
=
self
.tester)
runner.run(
self
.load_test_suite() )
fp.close()
return
report_file_path
if
__name__
=
=
'__main__'
:
report_path
=
RunCase().run()
EmailUtils(
open
(report_path,
'rb'
).read(), report_path).send_mail()
|
10、common下的email_utils.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import
os
import
smtplib
from
email.mime.text
import
MIMEText
from
email.mime.multipart
import
MIMEMultipart
from
common.config_utils
import
config
class
EmailUtils():
def
__init__(
self
,smtp_body,smtp_attch_path
=
None
):
self
.smtp_server
=
config.smtp_server
self
.smtp_sender
=
config.smtp_sender
self
.smtp_password
=
config.smtp_password
self
.smtp_receiver
=
config.smtp_receiver
self
.smtp_cc
=
config.smtp_cc
self
.smtp_subject
=
config.smtp_subject
self
.smtp_body
=
smtp_body
self
.smtp_attch
=
smtp_attch_path
def
mail_message_body(
self
):
message
=
MIMEMultipart()
message[
'from'
]
=
self
.smtp_sender
message[
'to'
]
=
self
.smtp_receiver
message[
'Cc'
]
=
self
.smtp_cc
message[
'subject'
]
=
self
.smtp_subject
message.attach( MIMEText(
self
.smtp_body,
'html'
,
'utf-8'
) )
if
self
.smtp_attch:
attach_file
=
MIMEText(
open
(
self
.smtp_attch,
'rb'
).read(),
'base64'
,
'utf-8'
)
attach_file[
'Content-Type'
]
=
'application/octet-stream'
attach_file.add_header(
'Content-Disposition'
,
'attachment'
, filename
=
(
'gbk'
, '', os.path.basename(
self
.smtp_attch)))
message.attach(attach_file)
return
message
def
send_mail(
self
):
smtp
=
smtplib.SMTP()
smtp.connect(
self
.smtp_server)
smtp.login(user
=
self
.smtp_sender, password
=
self
.smtp_password)
smtp.sendmail(
self
.smtp_sender,
self
.smtp_receiver.split(
","
)
+
self
.smtp_cc.split(
","
),
self
.mail_message_body().as_string())
if
__name__
=
=
'__main__'
:
html_path
=
os.path.dirname(__file__)
+
'/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html'
EmailUtils(
'<h3 align="center">自动化测试报告</h3>'
,html_path).send_mail()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://www.cnblogs.com/123anqier-blog/p/13376455.html 。
最后此篇关于Python reques接口测试框架实现代码的文章就讲到这里了,如果你想了解更多关于Python reques接口测试框架实现代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在尝试在我的代码库中为我正在编写的游戏服务器更多地使用接口(interface),并了解高级概念以及何时应该使用接口(interface)(我认为)。在我的例子中,我使用它们将我的包相互分离,并使
我有一个名为 Widget 的接口(interface),它在我的整个项目中都在使用。但是,它也用作名为 Widget 的组件的 Prop 。 处理此问题的最佳方法是什么?我应该更改我的 Widget
有一个接口(interface)可以是多个接口(interface)之一 interface a {x:string} interface b {y:string} interface c {z:st
我遇到了一种情况,我需要调用第三方服务来获取一些信息。这些服务对于不同的客户可能会有所不同。我的界面中有一个身份验证功能,如下所示。 interface IServiceProvider { bool
在我的例子中,“RequestHandlerProxy”是一个结构,其字段为接口(interface)“IAdapter”,接口(interface)有可能被调用的方法,该方法的输入为结构“Reque
我有一个接口(interface)Interface1,它已由类A实现,并且设置了一些私有(private)变量值,并且我将类A的对象发送到下一个接受输入作为Interface2的类。那么我怎样才能将
假设我有这样的类和接口(interface)结构: interface IService {} interface IEmailService : IService { Task SendAs
有人知道我在哪里可以找到 XML-RPC 接口(interface)的定义(在 OpenERP 7 中)?我想知道创建或获取对象需要哪些参数和对象属性。每个元素的 XML 示例也将非常有帮助。 最佳答
最近,我一直在阅读有关接口(interface)是抽象的错误概念的文章。一篇这样的帖子是http://blog.ploeh.dk/2010/12/02/InterfacesAreNotAbstract
如果我有一个由第三方实现的现有 IInterface 后代,并且我想添加辅助例程,Delphi 是否提供了任何简单的方法来实现此目的,而无需手动重定向每个接口(interface)方法?也就是说,给定
我正在尝试将 Article 数组分配给我的 Mongoose 文档,但 Typescript 似乎不喜欢这样,我不知道为什么它显示此警告/错误,表明它不可分配. 我的 Mongoose 模式和接口(
我有两个接口(interface): public interface IController { void doSomething(IEntity thing); } public inte
是否可以创建一个扩展 Serializable 接口(interface)的接口(interface)? 如果是,那么扩展接口(interface)的行为是否会像 Serilizable 接口(int
我试图在两个存储之间创建一个中间层,它从存储 A 中获取数据,将其转换为相应类型的存储 B,然后存储它。由于我需要转换大约 50-100 种类型,我希望使用 map[string]func 并根据 s
我正在处理一个要求,其中我收到一个 JSON 对象,其中包含一个日期值作为字符串。我的任务是将 Date 对象存储在数据库中。 这种东西: {"start_date": "2019-05-29", "
我们的方法的目标是为我们现有的 DAO 和模型类引入接口(interface)。模型类由各种类型的资源 ID 标识,资源 ID 不仅仅是随机数,还带有语义和行为。因此,我们必须用对象而不是原始类型来表
Collection 接口(interface)有多个方法。 List 接口(interface)扩展了 Collection 接口(interface)。它声明与 Collection 接口(int
我有一个 Java 服务器应用程序,它使用 Jackson 使用反射 API 对 DTO 进行一般序列化。例如对于这个 DTO 接口(interface): package com.acme.libr
如果我在 Kotlin 中有一个接口(interface): interface KotlinInterface { val id: String } 我可以这样实现: class MyCla
我知道Java中所有访问修饰符之间的区别。然而,有人问了我一个非常有趣的问题,我很难找到答案:Java 中的 private 接口(interface)和 public 接口(interface)有什
我是一名优秀的程序员,十分优秀!