- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Python reques接口测试框架实现代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
1、框架菜单 。
1.1 common模块 。
1.2 其他 。
2、Excel接口测试案例编写 。
3、读取Excel测试封装(核心封装) 。
excel_utils.py 读取Excel中的数据 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import
os
import
xlrd
#内置模块、第三方模块pip install 自定义模块
class
ExcelUtils():
def
__init__(
self
,file_path,sheet_name):
self
.file_path
=
file_path
self
.sheet_name
=
sheet_name
self
.sheet
=
self
.get_sheet()
# 整个表格对象
def
get_sheet(
self
):
wb
=
xlrd.open_workbook(
self
.file_path)
sheet
=
wb.sheet_by_name(
self
.sheet_name)
return
sheet
def
get_row_count(
self
):
row_count
=
self
.sheet.nrows
return
row_count
def
get_col_count(
self
):
col_count
=
self
.sheet.ncols
return
col_count
def
__get_cell_value(
self
,row_index, col_index):
cell_value
=
self
.sheet.cell_value(row_index,col_index)
return
cell_value
def
get_merged_info(
self
):
merged_info
=
self
.sheet.merged_cells
return
merged_info
def
get_merged_cell_value(
self
,row_index, col_index):
"""既能获取普通单元格的数据又能获取合并单元格数据"""
cell_value
=
None
for
(rlow, rhigh, clow, chigh)
in
self
.get_merged_info():
if
(row_index >
=
rlow
and
row_index < rhigh):
if
(col_index >
=
clow
and
col_index < chigh):
cell_value
=
self
.__get_cell_value(rlow, clow)
break
;
# 防止循环去进行判断出现值覆盖的情况
else
:
cell_value
=
self
.__get_cell_value(row_index, col_index)
else
:
cell_value
=
self
.__get_cell_value(row_index, col_index)
return
cell_value
def
get_sheet_data_by_dict(
self
):
all_data_list
=
[]
first_row
=
self
.sheet.row(
0
)
#获取首行数据
for
row
in
range
(
1
,
self
.get_row_count()):
row_dict
=
{}
for
col
in
range
(
0
,
self
.get_col_count()):
row_dict[first_row[col].value]
=
self
.get_merged_cell_value(row, col)
all_data_list.append(row_dict)
return
all_data_list
if
__name__
=
=
'__main__'
:
current_path
=
os.path.dirname(__file__)
excel_path
=
os.path.join( current_path,
'..'
,
'samples/data/test_case.xlsx'
)
excelUtils
=
ExcelUtils(excel_path,
"Sheet1"
)
for
row
in
excelUtils.get_sheet_data_by_dict():
print
( row )
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import
os
from
common.excel_utils
import
ExcelUtils
from
common.config_utils
import
config
current_path
=
os.path.dirname(__file__)
test_data_path
=
os.path.join( current_path,
'..'
, config.CASE_DATA_PATH )
class
TestdataUtils():
def
__init__(
self
,test_data_path
=
test_data_path):
self
.test_data_path
=
test_data_path
self
.test_data
=
ExcelUtils(test_data_path,
"Sheet1"
).get_sheet_data_by_dict()
self
.test_data_by_mysql
=
SqlUtils().get_mysql_test_case_info()
def
__get_testcase_data_dict(
self
):
testcase_dict
=
{}
for
row_data
in
self
.test_data:
testcase_dict.setdefault( row_data[
'测试用例编号'
],[] ).append( row_data )
return
testcase_dict
def
def_testcase_data_list(
self
):
testcase_list
=
[]
for
k,v
in
self
.__get_testcase_data_dict().items():
one_case_dict
=
{}
one_case_dict[
"case_id"
]
=
k
one_case_dict[
"case_info"
]
=
v
testcase_list.append( one_case_dict )
return
testcase_list
if
__name__
=
=
"__main__"
:
testdataUtils
=
TestdataUtils()
for
i
in
testdataUtils.def_testcase_data_list():
print
( i )
|
testdata_utils.py 读取Excel中的数据后处理成需要的数据 。
4、request封装(核心封装) 。
requests_utils.py 包含post请求,get请求,异常,调用断言 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import
ast
import
re
import
requests
import
jsonpath
from
requests.exceptions
import
RequestException
from
requests.exceptions
import
ProxyError
from
requests.exceptions
import
ConnectionError
from
common.config_utils
import
config
from
common.check_utils
import
CheckUtils
class
RequestsUtils():
def
__init__(
self
):
self
.hosts
=
config.hosts
self
.headers
=
{
"ContentType"
:
"application/json;charset=utf-8"
}
self
.session
=
requests.session()
self
.temp_variables
=
{}
def
__get(
self
,get_info):
try
:
url
=
self
.hosts
+
get_info[
"请求地址"
]
response
=
self
.session.get( url
=
url,
params
=
ast.literal_eval(get_info[
"请求参数(get)"
])
)
response.encoding
=
response.apparent_encoding
if
get_info[
"取值方式"
]
=
=
"json取值"
:
value
=
jsonpath.jsonpath( response.json(),get_info[
"取值代码"
] )[
0
]
self
.temp_variables[ get_info[
"传值变量"
] ]
=
value
elif
get_info[
"取值方式"
]
=
=
"正则取值"
:
value
=
re.findall(get_info[
"取值代码"
],response.text)[
0
]
self
.temp_variables[get_info[
"传值变量"
]]
=
value
result
=
CheckUtils(response).run_check(get_info[
'期望结果类型'
], get_info[
'期望结果'
])
except
ProxyError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:代理错误异常'
%
(get_info[
"接口名称"
])}
except
ConnectionError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:连接超时异常'
%
(get_info[
"接口名称"
])}
except
RequestException as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:Request异常,原因:%s'
%
(get_info[
"接口名称"
], e.__str__())}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:系统异常,原因:%s'
%
(get_info[
"接口名称"
],e.__str__())}
return
result
def
__post(
self
,post_info):
try
:
url
=
self
.hosts
+
post_info[
"请求地址"
]
response
=
self
.session.post( url
=
url,
headers
=
self
.headers,
params
=
ast.literal_eval(post_info[
"请求参数(get)"
]),
# data = post_infos["提交数据(post)"],
json
=
ast.literal_eval(post_info[
"提交数据(post)"
])
)
response.encoding
=
response.apparent_encoding
if
post_info[
"取值方式"
]
=
=
"json取值"
:
value
=
jsonpath.jsonpath( response.json(),post_info[
"取值代码"
] )[
0
]
self
.temp_variables[ post_info[
"传值变量"
] ]
=
value
elif
post_info[
"取值方式"
]
=
=
"正则取值"
:
value
=
re.findall(post_info[
"取值代码"
],response.text)[
0
]
self
.temp_variables[post_info[
"传值变量"
]]
=
value
#调用CheckUtils()
result
=
CheckUtils(response).run_check(post_info[
'期望结果类型'
],post_info[
'期望结果'
])
except
ProxyError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:代理错误异常'
%
(post_info[
"接口名称"
])}
except
ConnectionError as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:连接超时异常'
%
(post_info[
"接口名称"
])}
except
RequestException as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:Request异常,原因:%s'
%
(post_info[
"接口名称"
], e.__str__())}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'[%s]请求:系统异常,原因:%s'
%
(post_info[
"接口名称"
],e.__str__())}
return
result
def
request(
self
,step_info):
try
:
request_type
=
step_info[
"请求方式"
]
param_variable_list
=
re.findall(
'\\${\w+}'
, step_info[
"请求参数(get)"
])
if
param_variable_list:
for
param_variable
in
param_variable_list:
step_info[
"请求参数(get)"
]
=
step_info[
"请求参数(get)"
]\
.replace(param_variable,
'"%s"'
%
self
.temp_variables.get(param_variable[
2
:
-
1
]))
if
request_type
=
=
"get"
:
result
=
self
.__get( step_info )
elif
request_type
=
=
"post"
:
data_variable_list
=
re.findall(
'\\${\w+}'
, step_info[
"提交数据(post)"
])
if
data_variable_list:
for
param_variable
in
data_variable_list:
step_info[
"提交数据(post)"
]
=
step_info[
"提交数据(post)"
] \
.replace(param_variable,
'"%s"'
%
self
.temp_variables.get(param_variable[
2
:
-
1
]))
result
=
self
.__post( step_info )
else
:
result
=
{
'code'
:
1
,
'result'
:
'请求方式不支持'
}
except
Exception as e:
result
=
{
'code'
:
4
,
'result'
:
'用例编号[%s]的[%s]步骤出现系统异常,原因:%s'
%
(step_info[
'测试用例编号'
],step_info[
"测试用例步骤"
],e.__str__())}
return
result
def
request_by_step(
self
,step_infos):
self
.temp_variables
=
{}
for
step_info
in
step_infos:
temp_result
=
self
.request( step_info )
# print( temp_result )
if
temp_result[
'code'
]!
=
0
:
break
return
temp_result
if
__name__
=
=
"__main__"
:
case_info
=
[
{
'请求方式'
:
'get'
,
'请求地址'
:
'/cgi-bin/token'
,
'请求参数(get)'
:
'{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}'
,
'提交数据(post)'
: '
', '
取值方式
': '
json取值
', '
传值变量
': '
token
', '
取值代码
': '
$.access_token
', '
期望结果类型
': '
正则匹配
', '
期望结果
': '
{
"access_token"
:
"(.+?)"
,
"expires_in"
:(.
+
?)}'},
{
'请求方式'
:
'post'
,
'请求地址'
:
'/cgi-bin/tags/create'
,
'请求参数(get)'
:
'{"access_token":${token}}'
,
'提交数据(post)'
:
'{"tag" : {"name" : "衡东"}}'
,
'取值方式'
:
'无'
,
'传值变量'
: '
', '
取值代码
': '
', '
期望结果类型
': '
正则匹配
', '
期望结果
': '
{
"tag"
:{
"id"
:(.
+
?),
"name"
:
"衡东"
}}'}
]
RequestsUtils().request_by_step(case_info)
|
5、断言封装(核心封装) 。
check_utils.py 断言封装,与实际结果核对 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
import
re
import
ast
class
CheckUtils():
def
__init__(
self
,check_response
=
None
):
self
.ck_response
=
check_response
self
.ck_rules
=
{
'无'
:
self
.no_check,
'json键是否存在'
:
self
.check_key,
'json键值对'
:
self
.check_keyvalue,
'正则匹配'
:
self
.check_regexp
}
self
.pass_result
=
{
'code'
:
0
,
'response_reason'
:
self
.ck_response.reason,
'response_code'
:
self
.ck_response.status_code,
'response_headers'
:
self
.ck_response.headers,
'response_body'
:
self
.ck_response.text,
'check_result'
:
True
,
'message'
: ''
# 扩招作为日志输出等
}
self
.fail_result
=
{
'code'
:
2
,
'response_reason'
:
self
.ck_response.reason,
'response_code'
:
self
.ck_response.status_code,
'response_headers'
:
self
.ck_response.headers,
'response_body'
:
self
.ck_response.text,
'check_result'
:
False
,
'message'
: ''
# 扩招作为日志输出等
}
def
no_check(
self
):
return
self
.pass_result
def
check_key(
self
,check_data
=
None
):
check_data_list
=
check_data.split(
','
)
#把需要判断的值做切割,取出键值
res_list
=
[]
#存放每次比较的结果
wrong_key
=
[]
#存放比较失败key
for
check_data
in
check_data_list:
#把切割的键值和取出响应结果中的所有的键一个一个对比
if
check_data
in
self
.ck_response.json().keys():
res_list.append(
self
.pass_result )
else
:
res_list.append(
self
.fail_result )
wrong_key.append(check_data)
#把失败的键放进来,便于后续日志输出
# print(res_list)
# print(wrong_key)
if
self
.fail_result
in
res_list:
return
self
.fail_result
else
:
return
self
.pass_result
def
check_keyvalue(
self
,check_data
=
None
):
res_list
=
[]
# 存放每次比较的结果
wrong_items
=
[]
# 存放比较失败 items
for
check_item
in
ast.literal_eval(check_data).items():
#literal_eval()安全性的把字符串转成字典,items()取出键值对
if
check_item
in
self
.ck_response.json().items():
res_list.append(
self
.pass_result )
else
:
res_list.append(
self
.fail_result )
wrong_items.append(check_item)
# print( res_list )
# print( wrong_items )
if
self
.fail_result
in
res_list:
return
self
.fail_result
else
:
return
self
.pass_result
def
check_regexp(
self
,check_data
=
None
):
pattern
=
re.
compile
(check_data)
if
re.findall(pattern
=
pattern,string
=
self
.ck_response.text):
#匹配到了,不为空,为true
return
self
.pass_result
else
:
return
self
.fail_result
def
run_check(
self
,check_type
=
None
,check_data
=
None
):
code
=
self
.ck_response.status_code
if
code
=
=
200
:
if
check_type
in
self
.ck_rules.keys():
result
=
self
.ck_rules[check_type](check_data)
return
result
else
:
self
.fail_result[
'message'
]
=
'不支持%s判断方法'
%
check_type
return
self
.fail_result
else
:
self
.fail_result[
'message'
]
=
'请求的响应状态码非%s'
%
str
(code)
return
self
.fail_result
if
__name__
=
=
"__main__"
:
# 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
CheckUtils({
"access_token"
:
"hello"
,
"expires_"
:
7200
}).check_key(
"access_token,expires_in"
)
#检查键值对是否存在
CheckUtils({
"access_token"
:
"hello"
,
"expires_i"
:
7200
}).check_keyvalue(
'{"expires_in": 7200}'
)
#正则对比
#TURE
print
(CheckUtils(
'{"access_token":"hello","expires_in":7200}'
).check_regexp(
'"expires_in":(.+?)'
))
#False
print
(CheckUtils(
'{"access_token":"hello","expires":7200}'
).check_regexp(
'"expires_in":(.+?)'
))
|
6、api_testcase下的api_test.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import
warnings
import
unittest
import
paramunittest
from
common.testdata_utils
import
TestdataUtils
from
common.requests_utils
import
RequestsUtils
#如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list()
case_infos
=
TestdataUtils().def_testcase_data_list_by_mysql()
@paramunittest
.parametrized(
*
case_infos
)
class
APITest(paramunittest.ParametrizedTestCase):
def
setUp(
self
)
-
>
None
:
warnings.simplefilter(
'ignore'
, ResourceWarning)
#不会弹出警告提示
def
setParameters(
self
, case_id, case_info):
self
.case_id
=
case_id
self
.case_info
=
case_info
def
test_api_common_function(
self
):
'''测试描述'''
self
._testMethodName
=
self
.case_info[
0
].get(
"测试用例编号"
)
self
._testMethodDoc
=
self
.case_info[
0
].get(
"测试用例名称"
)
actual_result
=
RequestsUtils().request_by_step(
self
.case_info)
self
.assertTrue( actual_result.get(
'check_result'
),actual_result.get(
'message'
) )
if
__name__
=
=
'__main__'
:
unittest.main()
|
7、common下的log_utils.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import
os
import
logging
import
time
from
common.config_utils
import
config
current_path
=
os.path.dirname(__file__)
log_output_path
=
os.path.join( current_path,
'..'
, config.LOG_PATH )
class
LogUtils():
def
__init__(
self
,log_path
=
log_output_path):
self
.log_name
=
os.path.join( log_output_path ,
'ApiTest_%s.log'
%
time.strftime(
'%Y_%m_%d'
) )
self
.logger
=
logging.getLogger(
"ApiTestLog"
)
self
.logger.setLevel( config.LOG_LEVEL )
console_handler
=
logging.StreamHandler()
# 控制台输出
file_handler
=
logging.FileHandler(
self
.log_name,
'a'
,encoding
=
'utf-8'
)
# 文件输出
formatter
=
logging.Formatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
self
.logger.addHandler( console_handler )
self
.logger.addHandler( file_handler )
console_handler.close()
# 防止打印日志重复
file_handler.close()
# 防止打印日志重复
def
get_logger(
self
):
return
self
.logger
logger
=
LogUtils().get_logger()
# 防止打印日志重复
if
__name__
=
=
'__main__'
:
logger.info(
'hello'
)
|
8、common下的config_utils.py的封装 。
配置文件的编写:
对配置文件的读取封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
import
os
import
configparser
current_path
=
os.path.dirname(__file__)
cfgpath
=
os.path.join(current_path,
"../conf/local_config.ini"
)
print
(cfgpath)
class
ConfigUtils:
def
__init__(
self
,config_path
=
cfgpath):
self
.__conf
=
configparser.ConfigParser()
self
.__conf.read(config_path, encoding
=
"utf-8"
)
def
read_ini(
self
,sec,option):
value
=
self
.__conf.get(sec,option)
return
value
@property
def
hosts(
self
):
value
=
self
.read_ini(
'default'
,
'hosts'
)
return
value
@property
def
LOG_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'LOG_PATH'
)
return
value
@property
def
CASE_DATA_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'CASE_DATA_PATH'
)
return
value
@property
def
REPORT_PATH(
self
):
value
=
self
.read_ini(
'path'
,
'REPORT_PATH'
)
return
value
@property
def
LOG_LEVEL(
self
):
value
=
int
(
self
.read_ini(
'log'
,
'LOG_LEVEL'
))
return
value
@property
def
smtp_server(
self
):
smtp_server_value
=
self
.read_ini(
'email'
,
'smtp_server'
)
return
smtp_server_value
@property
def
smtp_sender(
self
):
smtp_sender_value
=
self
.read_ini(
'email'
,
'smtp_sender'
)
return
smtp_sender_value
@property
def
smtp_password(
self
):
smtp_password_value
=
self
.read_ini(
'email'
,
'smtp_password'
)
return
smtp_password_value
@property
def
smtp_receiver(
self
):
smtp_receiver_value
=
self
.read_ini(
'email'
,
'smtp_receiver'
)
return
smtp_receiver_value
@property
def
smtp_cc(
self
):
smtp_cc_value
=
self
.read_ini(
'email'
,
'smtp_cc'
)
return
smtp_cc_value
@property
def
smtp_subject(
self
):
smtp_subject_value
=
self
.read_ini(
'email'
,
'smtp_subject'
)
return
smtp_subject_value
config
=
ConfigUtils()
if
__name__
=
=
'__main__'
:
current_path
=
os.path.dirname(__file__)
cfgpath
=
os.path.join(current_path,
"../conf/local_config.ini"
)
config_u
=
ConfigUtils()
print
(config_u.hosts)
print
(config_u.LOG_LEVEL)
|
9、test_runner下的run_case.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
class
RunCase():
def
__init__(
self
):
self
.test_case_path
=
test_case_path
self
.report_path
=
test_report_path
self
.title
=
'P1P2接口自动化测试报告'
self
.description
=
'自动化接口测试框架'
self
.tester
=
'测试开发组'
def
load_test_suite(
self
):
discover
=
unittest.defaultTestLoader.discover(start_dir
=
self
.test_case_path,
pattern
=
'api_test.py'
,
top_level_dir
=
self
.test_case_path)
all_suite
=
unittest.TestSuite()
all_suite.addTest( discover )
return
all_suite
def
run(
self
):
report_dir
=
HTMLTestReportCN.ReportDirectory(
self
.report_path)
report_dir.create_dir(
self
.title)
report_file_path
=
HTMLTestReportCN.GlobalMsg.get_value(
'report_path'
)
fp
=
open
( report_file_path ,
'wb'
)
runner
=
HTMLTestReportCN.HTMLTestRunner(stream
=
fp,
title
=
self
.title,
description
=
self
.description,
tester
=
self
.tester)
runner.run(
self
.load_test_suite() )
fp.close()
return
report_file_path
if
__name__
=
=
'__main__'
:
report_path
=
RunCase().run()
EmailUtils(
open
(report_path,
'rb'
).read(), report_path).send_mail()
|
10、common下的email_utils.py 封装 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import
os
import
smtplib
from
email.mime.text
import
MIMEText
from
email.mime.multipart
import
MIMEMultipart
from
common.config_utils
import
config
class
EmailUtils():
def
__init__(
self
,smtp_body,smtp_attch_path
=
None
):
self
.smtp_server
=
config.smtp_server
self
.smtp_sender
=
config.smtp_sender
self
.smtp_password
=
config.smtp_password
self
.smtp_receiver
=
config.smtp_receiver
self
.smtp_cc
=
config.smtp_cc
self
.smtp_subject
=
config.smtp_subject
self
.smtp_body
=
smtp_body
self
.smtp_attch
=
smtp_attch_path
def
mail_message_body(
self
):
message
=
MIMEMultipart()
message[
'from'
]
=
self
.smtp_sender
message[
'to'
]
=
self
.smtp_receiver
message[
'Cc'
]
=
self
.smtp_cc
message[
'subject'
]
=
self
.smtp_subject
message.attach( MIMEText(
self
.smtp_body,
'html'
,
'utf-8'
) )
if
self
.smtp_attch:
attach_file
=
MIMEText(
open
(
self
.smtp_attch,
'rb'
).read(),
'base64'
,
'utf-8'
)
attach_file[
'Content-Type'
]
=
'application/octet-stream'
attach_file.add_header(
'Content-Disposition'
,
'attachment'
, filename
=
(
'gbk'
, '', os.path.basename(
self
.smtp_attch)))
message.attach(attach_file)
return
message
def
send_mail(
self
):
smtp
=
smtplib.SMTP()
smtp.connect(
self
.smtp_server)
smtp.login(user
=
self
.smtp_sender, password
=
self
.smtp_password)
smtp.sendmail(
self
.smtp_sender,
self
.smtp_receiver.split(
","
)
+
self
.smtp_cc.split(
","
),
self
.mail_message_body().as_string())
if
__name__
=
=
'__main__'
:
html_path
=
os.path.dirname(__file__)
+
'/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html'
EmailUtils(
'<h3 align="center">自动化测试报告</h3>'
,html_path).send_mail()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://www.cnblogs.com/123anqier-blog/p/13376455.html 。
最后此篇关于Python reques接口测试框架实现代码的文章就讲到这里了,如果你想了解更多关于Python reques接口测试框架实现代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!