gpt4 book ai didi

python - Python请求-线程/进程与IO

转载 作者:行者123 更新时间:2023-12-03 22:31:12 25 4
gpt4 key购买 nike

我正在通过HTTP连接到本地服务器(OSRM),以提交路由并获取行驶时间。我注意到I/O比线程处理慢,因为似乎计算的等待时间小于发送请求和处理JSON输出所花费的时间(我认为服务器花费一些时间来处理I/O更好。处理您的请求->您不希望它被阻止,因为您必须等待,这不是我的情况)。线程受全局解释器锁的影响,因此(以下证据表明)我最快的选择是使用多处理。

多重处理的问题在于它是如此之快,以至于耗尽了我的套接字,并且我得到了一个错误(每次请求都会发出一个新的连接)。我可以(以串行方式)使用requests.Sessions()对象使连接保持 Activity 状态,但是我无法使其并行工作(每个进程都有其自己的 session )。

目前,我需要处理的最接近的代码是以下多处理代码:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

但是,我无法使HTTPConnectionPool正常工作,并且每次都会创建新的套接字(我认为),然后出现错误:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))



我的目标是从本地运行的 OSRM-routing server中获取距离计算(尽快)。

我有一个两部分的问题-基本上我正在尝试使用multiprocessing.Pool()将一些代码转换为更好的代码(适当的异步函数-这样执行永不中断,并且运行速度尽可能快)。

我遇到的问题是,我尝试执行的所有操作似乎都比多处理程序慢(我在下面提供了一些我尝试过的示例)。

一些潜在的方法是:gevents,grequests, Tornado ,请求- future ,asyncio等。

A-Multiprocessing.Pool()

我最初是从这样的东西开始的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....

pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)

我连接到本地服务器(localhost,port:5005)的位置,该服务器在8个线程和 supports parallel execution上启动。

经过一番搜索,我意识到我得到的错误是因为请求是 opening a new connection/socket for each-request。因此,这实际上太快了,过了一会儿,套接字就耗尽了。解决这个问题的方法似乎是使用request.Session()- ,但是我无法通过多处理(每个进程都有自己的 session )来实现这一点。

问题1.

在某些计算机上,这可以正常运行,例如:

enter image description here

要与以后进行比较:45%的服务器使用率和每秒1700个请求

但是,在某些情况下却不是,我也不完全理解为什么:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))



我的猜测是,由于请求在使用时会锁定套接字-有时服务器太慢以至于无法响应旧请求并生成了新请求。服务器支持排队,但是请求不支持而不是添加到队列中,我得到了错误?

问题2。

我发现:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.



B-请求- future

为了解决这个问题,我需要重写代码以使用异步请求,因此我尝试使用以下方法:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(通过我选择使用所有线程的方式启动服务器的方式)

和主要代码:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)

现在将我的函数(ReqOsrm)重写为:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out

但是,此代码比多处理代码要慢!在我每秒收到约1700个请求之前,现在我收到了600秒。我猜这是因为我没有完全的CPU使用率,但是我不确定如何增加它?

enter image description here

C-线程

我尝试了另一种方法( creating threads)-但是再次不确定如何获得此方法以最大化CPU使用率(理想情况下,我想查看我的服务器使用的是50%,不是吗?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()

def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None

def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

这种方法比我认为的requests_futures快,但是我不知道要设置多少线程来最大化此方法-

enter image description here

D- Tornado (不起作用)

我现在正在尝试 Tornado -但是不能完全正常工作,如果我使用curl,它会与现有代码-1073741819中断-如果我使用simple_httpclient它可以工作,但我会收到超时错误:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout


def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E-asyncio/aiohttp

决定尝试使用asyncio和aiohttp的另一种方法(虽然使 Tornado 正常工作会很棒)。
import asyncio
import aiohttp

def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out

def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get

def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

可以,但是仍然比多处理慢!

enter image description here

最佳答案

在问题的顶部查看您的多处理代码。似乎每次调用ReqOsrm时都会调用HttpConnectionPool()。因此,将为每个URL创建一个新的池。而是使用initializerargs参数为每个进程创建一个池。

conn_pool = None

def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
ul, qid = url_input

try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out

else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]

except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
# run:
pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

request-futures版本似乎有缩进错误。循环 for future in as_completed(futures):在外循环下缩进 for i in range(len(url_routes)):。因此,在外循环中发出一个请求,然后内循环等待该 future 返回,然后再进行外循环的下一次迭代。这使得请求以串行方式而不是并行方式运行。

我认为代码应如下所示:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit all the requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid

# this was indented under the code in section B of the question
# process the futures as they become copmlete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data

except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
print(row)
calc_routes.append(row)

关于python - Python请求-线程/进程与IO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35747235/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com