我有一个数据框,长度为数千行,其中一列中包含两对 GPS 坐标,我试图用它们来计算这些坐标之间的行驶时间。我有一个函数可以接收这些坐标并返回行驶时间,计算每个条目可能需要 3-8 秒。因此,整个过程可能需要相当长的时间。我希望能够做的是:使用大约 3-5 个线程,迭代列表并计算驱动时间,然后在其他线程完成且不会在列表中创建超过 5 个线程时继续执行下一个条目。过程。独立地,我让一切正常工作 - 我可以运行多个线程,我可以跟踪线程计数并等待,直到允许的最大线程数降至限制以下,直到下一个开始,并且可以迭代数据帧并计算驱动时间。然而,我很难将它们拼凑在一起。这是我所拥有的经过编辑、精简的版本。
import pandas
import threading
import arcgis
class MassFunction:
#This is intended to keep track of the active threads
MassFunction.threadCount = 0
def startThread(functionName,params=None):
#This kicks off a new thread and should count up to keep track of the threads
MassFunction.threadCount +=1
if params is None:
t = threading.Thread(target=functionName)
else:
t = threading.Thread(target=functionName,args=[params])
t.daemon = True
t.start()
class GeoAnalysis:
#This class handles the connection to the ArcGIS services
def __init__(self):
super(GeoAnalysis, self).__init__()
self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)
def drivetimeCalc(self, coordsString):
#The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
#This is the bottleneck of the process, as this calculation/response
#below takes a few seconds to get a response
points = coordsString.split(", ")
route_service_url = self.my_gis.properties.helperServices.route.url
self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
#This is intended to 'remove' one of the active threads
MassFunction.threadCount -=1
return travel_time
class MainFunction:
#This is to give access to the GeoAnalysis class from this class
GA = GeoAnalysis()
def closureDriveTimeCalc(self,coordsList):
#This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
while MassFunction.threadCount > 4:
pass
MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)
def driveTimeAnalysis(self,location):
#This reads a csv file containing a few thousand entries.
#Each entry/row contains gps coordinates, which need to be
#iterated over to calculate the drivetimes
locationMemberFile = pandas.read_csv(someFileName)
#The built-in apply() method in pandas seems to be the
#fastest way to iterate through the rows
locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)
当我现在使用 VS Code 运行这个程序时,我可以看到调用堆栈中的线程计数上升到数千,所以我觉得它没有等待线程完成并从 threadCount 中添加/减去值(value)。任何想法/建议/技巧将不胜感激。
编辑:本质上我的问题是如何取回 trip_time 值,以便将其放入数据框中。我目前没有针对closureDriveTimeCalc 函数的返回语句,因此虽然该函数正确运行,但它不会将任何信息发送回apply() 方法。
我不会在应用程序中执行此操作,而是使用 multiprocessing Pool.map :
from multiprocessing import Pool
with Pool(processes=4) as pool:
locationMemberFile['DRIVETIME'] = pool.map(self.closureDriveTimeCalc, locationMemberFile['COORDS_COL']))
我是一名优秀的程序员,十分优秀!