- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
这是一个比较复杂的问题,所以我会尽量在解释中通俗易懂,不要给出太多不必要的细节。
我去年为工作开发了一个 python 脚本。它获取基本系统数据并将其发送到 HTTP/S 服务器,如果用户选择,该服务器可以发回命令。这是去年的一个大实验,看看什么有效,什么无效。测试公司内部的不同需求等。但现在我对我们的需求有了相当透彻的了解。所以我开始我的版本 2 之旅。
这个新版本的目的是在减少系统/CPU 负载和带宽的同时保持功能。开发完这个Python脚本后,剩下的工作就在HTTP/S服务器上完成了。我的问题专门针对客户端,即 Python 脚本。我使用的是 Python 2.7.x,最常用于基于 Debian 的系统。
v1 脚本获取系统数据,读取包含要将数据发送到的服务器的配置文件,使用线程将数据发送到每个服务器。 (仍在那些线程中)每个服务器可以返回 1 个或多个命令,然后也通过自己的线程处理这些命令。该脚本通过 crontab 每分钟运行一次。您可以让 5 台或更多服务器每台发送 10 条命令,脚本仍然可以顺利、有效地执行所有操作,而无需花费很长时间来完成服务器发出的命令。
在 v2 脚本中,我试图进行以下必要的更改:
将作为系统服务运行。因此,该脚本将每隔几秒循环一次,而不是每分钟由 cron 运行一次代码。
循环需要每次通过循环收集一次数据,然后将其发送到每个网络服务器(在配置文件中定义)
我需要持久的 HTTP/S 连接来优化性能和带宽。
我不想在每个 HTTP/S 服务器的循环中每次都收集数据。我只想通过驱动服务的主循环每次迭代收集一次数据,然后将该数据发送到管理已建立的 HTTP/S 持久连接的线程。
这就是我的问题。我如何在各自的线程中获得持久连接,并在只收集一次数据的情况下将数据发送到这些线程?
来自 does httplib reuse TCP connections?我看到持久连接可以通过这种方式完成(谢谢 Corey Goldberg ):
con = httplib.HTTPConnection("myweb.com")
while True:
con.request("GET", "/x.css", headers={"Connection":" keep-alive"})
result = con.getresponse()
result.read()
print result.reason, result.getheaders()
数据收集需要在此循环内进行。但我需要在多个线程中同时与各种服务器对话,并且不想浪费资源去多次获取数据。鉴于我对 Python 的了解相对有限,我只是不明白这怎么可能。
基本上,正如我现在所看到的,需要有一个循环来驱动线程内部的 HTTP/S。然后我需要某种循环来收集我的数据并准备将其转到 HTTP/S 连接。但是如何以这种方式在第二个循环中获取第一个循环呢?这就像我需要数据收集循环内的 HTTP/S 持久连接循环,但我还需要 HTTP/S 循环内的数据收集循环。
我想探索任何可能实现的纯 2.7.x pythonic 方式。由于各种原因,依赖外部实用程序可能会出现问题。此脚本完成后将部署到 150 多个 linux 系统,并且出错的次数越少越好。
感谢您的帮助和考虑!
最佳答案
我将把它留给像我一样正在寻求扩展他们对 Python 理解的其他人。我花了一段时间才弄清楚如何解决这个问题,但在与了解此类问题的同事交谈后,解决方案变得清晰了。
简而言之,对我有用的答案使用了 Python 2.7.x 的 native 线程和队列模块。
我有这个我的主程序,它管理我设置的各种线程和队列。扩展线程模块的 NetworkWorker 类在初始化时也会为每个实例旋转新的队列。队列引用/处理程序存储在全局列表变量中。我只是循环遍历队列列表并将数据发送到我的主线程 (main.py) 中的每个线程队列。然后每个线程获取它的数据并执行它应该做的事情。从每个 HTTP 连接接收回来的数据被加载到另一个队列中,该队列由 main.py 中的单个命令执行线程处理。
以下代码已从其原始上下文中修改/提取。我已经对其进行了测试,只要您在位于 main.py > my_service > init 中的 self.conf DICT 中正确配置服务器,并且服务器响应具有有效的 JSON,它就可以完美运行。老实说,它可以使用一些清理。为确保代码保持公开和可访问性,我添加了知识共享许可证。任何觉得此代码与他们自己的代码相似的人都可以联系我以获得正确的归属。
除了main.py,其他2个文件的名字很重要。 shared_globals.py 和 workerThread.py 文件名区分大小写,并且必须与 main.py 在同一文件夹中
主要可执行文件:main.py
#!/usr/bin/python
# encoding=utf8
from time import sleep, time
import subprocess, sys, os # used to get IP, system calls, etc
import json
# For web support
import httplib
import urllib
import zlib
import base64
# wokerThread Dependancy
import shared_globals
from workerThread import NetworkWorker
import Queue
import threading
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
class my_service:
# * * * *
def __init__(self):
# Manually list off the servers I want to talk to
self.conf = {}
self.conf['servers'] = {}
self.conf['servers']['ServerName1'] = {}
self.conf['servers']['ServerName1']['protocol'] = "http"
self.conf['servers']['ServerName1']['url'] = "server.com"
self.conf['servers']['ServerName1']['port'] = "80"
self.conf['servers']['ServerName1']['path'] = "/somefile.php"
self.conf['servers']['ServerName1']['timeout'] = "10" # Seconds. Make sure this is long enough for your largest OR mission critical HTTP/S transactions to finish + time it takes to wait for your data to come into your persistant HTTP/S thread. Data comes in every 2 seconds, so 5-10 seconds should be fine. Anything that takes too long will cause the queue to back up too much.
self.conf['servers']['ServerName2'] = {}
self.conf['servers']['ServerName2']['protocol'] = "http"
self.conf['servers']['ServerName2']['url'] = "otherserver.net"
self.conf['servers']['ServerName2']['port'] = "80"
self.conf['servers']['ServerName2']['path'] = "/dataio.php"
self.conf['servers']['ServerName2']['timeout'] = "5"
# Start the Threading Manager, which will manage the various threads and their components
# All cross thread communication needs to be managed with Queues
self.threadManager()
def threadManager(self):
# A place to reference all threads
self.threads = []
print "Loading Shared Globals"
# This is the 3rd file in this project. I would not need this if
# the NetworkWorker Thread was inside of this same file. But since it
# is in another file, we use this shared_globals file to make the Queue's
# list and other shared resources available between the main thread and the NetworkWorker Threads
shared_globals.init()
# Keep track of all the threads / classes we are initializing
self.workers = {} # Keep track of all the worker threads
print "Initalizing Network Worker Threads from Config"
# For each server we want to talk to, we start a worker thread
# Read servers from self.conf and init threads / workers
for t in self.conf['servers']: # Loop through servers in config
# T = server name
#print "T: ", self.conf['servers'][t]
self.workers[t] = NetworkWorker() # Save worker handlers to workers dict
# Set the server data for each NetworkWorker Thread
self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path'])
print "Initalizing Command Processing Queue"
cmd_q = Queue.Queue()
cmd_q.daemon = True
shared_globals.cmd_active_queue = cmd_q
print "Starting Command Processing thread"
# Start the data gathering thread
t_cmd = threading.Thread(target=self.command_que_thread_manager)
t_cmd.daemon = True
self.threads.append(t_cmd)
t_cmd.start()
print "Start Data Gathering thread"
# Start the data gathering thread
t = threading.Thread(target=self.data_collector_thread)
t.daemon = True
self.threads.append(t)
t.start()
print "Starting Worker threads"
for w in self.workers: # Loop through all worker handlers
self.workers[w].start() # Start the jobs
# We have our NetworkWorker Threads running, and they init their own queues which we
# send data to using the def below titled self.send_data_to_networkWorkers
print "Service Started\n\n\n"
# This keeps the main thread listening so you can perform actions like killing the application with CTRL+C
while threading.active_count() > 0:
try:
sleep(0.1)
except (KeyboardInterrupt, SystemExit): # Exits the main thread without complainnt!
print "\n"
os._exit(0)
os._exit(0)
def data_collector_thread(self):
'''
Gather all the data we want to send to each server
Send data to the queues for each NetworkWorker thread we init'd above
'''
# Loop indefinately
while True:
# Gather your data and load into data Dict
data = {"data":"values"}
print "\n\nData to be sent to all NetworkWorker threads: ", data, "\n\n"
# Prep the data for HTTP/S
# If you need to do something else with the data besides sending it to the threads, do it here
data = self.prep_data_for_HTTP(data) # Do any pre-HTTP/S processing here
self.send_data_to_networkWorkers(data) # Send the data out to all the Threads Queue's
sleep(2) # wait for a little bit and then iterate through the loop again. This is your main loop timer.
def prep_data_for_HTTP(self, data):
'''
I am converting my data from a python dict to a JSON Starting
I compress the JSON Starting
I load the compressed string into another dict, as the HTTP/S object (in the NetworkWorker thread) expects a DICT
URL encode the data for HTTP/S POST transit
Return the manipulated data object, now ready for HTTP/S
'''
data = json.dumps(data, encoding='utf8') # Now continue preparing for HTTP/S
data = zlib.compress(data, 8)
# In PHP, get the data from the $_POST['data'] key
data = {"data":data}
data = urllib.urlencode(data)
return data
# END DEF
def command_que_thread_manager(self):
'''
Run as a thread
Send data to this thread via it's queue, init'd above in thread Manager
Grabs data, and then does something to process it
'''
while True:
data = shared_globals.cmd_active_queue.get()
print "Processing Command: ", data
# END DEF
def send_data_to_networkWorkers(self,data):
'''
Send data to all the NetworkWorker threads
'''
for q in shared_globals.network_active_queues:
q.put(data)
def clean_exit(self):
'''
Run when exiting the program for a clean exit
I don't think I actually call this in my example,
but upon main thread exit it would be a good idea to do so
'''
for w in self.workers: # Loop through all worker handlers
self.workers[w].stop() # Stop the jobs
# END DEF
# END CLASS
if __name__ == "__main__":
my_service = my_service()
共享全局文件:shared_globals.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
def init():
global network_active_queues
global cmd_active_queues
global cmd_q
# Keep track of the data going to the Network Worker Threads
print "Initalizing Network Active Queues"
network_active_queues = []
# Keep track of the commands
print "Initalizing Command Active Queues"
cmd_active_queue = ""
# ?
#cmd_q = []
NetworkWorker 类:workerThread.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
import Queue
import threading
import httplib
import urllib
import json
# wokerThread Dependancy
# Add another queue list for HTTP/S Responses
import shared_globals
class NetworkWorker(threading.Thread):
def __init__(self):
'''
Extend the Threading module
Start a new Queue for this instance of this class
Run the thread as a daemon
shared_globals is an external file for my globals between main script and this class.
Append this Queue to the list of Queue's in shared_globals.network_active_queues
Loop through shared_globals.network_active_queues to send data to all Queues that were started with this class
'''
threading.Thread.__init__(self)
self.q = Queue.Queue()
self.q.daemon = True
shared_globals.network_active_queues.append(self.q)
# Init the queue for processing commands
def run(self):
'''
Establish a persistant HTTP Connection
Pull data from the Queue
When data comes in, send it to the server
I send the response from the HTTP server to another queue / thread
You can do what you want to do with responses from the HTTP Server
'''
# Set your headers
headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "Connection": "keep-alive"} # "Connection": "keep-alive" for persistance
# Init the presistant HTTP connection
http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )
# Init response_data
response_data = str()
# Start the loop
while True:
# The code waits here for the queue to have data. If no data, it just sleeps until you send it data via it's Queue.
data = self.q.get()
# .... When it gets data, we proceed with the data variable.
try:
http_request.request( "POST", self.path, data, headers )
response = http_request.getresponse()
response_data = response.read()
# This is the response from the HTTP/S Server
print "Response: ", response_data
except Exception, e:
# In the event something goes wrong, we can simply try to reestablish the HTTP
print e, "Re-establishing HTTP/S Connection"
http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )
# If the HTTP transaction was successful, we will have our HTTP response data in response_data variable
if response_data:
# Try Except will fail on bad JSON object
try:
# Validate JSON & Convert from JSON to native Python Dict
json_data = json.loads(response_data)
# Send response from server to the command thread manager
shared_globals.cmd_active_queue.put(json_data)
except ValueError, e:
print "Bad Server Response: Discarding Invalid JSON"
# Repackage the invalid JSON, or some identifier thereof, and send to command processing thread
# Load into THIS NetworkWorker's thread queue a new data object to tell the server that there was malformed JSON and to resend the data.
#http_request.request( "POST", self.path, data, headers )
#response = http_request.getresponse()
#response_data = response.read()
# Place this here for good measure, if we ever exit the while loop we will close the HTTP/S connection
http_request.close()
# END DEF
def set_server(self, url, port, timeout, path):
'''
Use this to set the server for this class / thread instance
Variables that are passed in are translated to class instance variables (self)
'''
self.url = url
self.port = port
self.timeout = timeout
self.path = path
# END DEF
def stop(self):
'''
Stop this queue
Stop this thread
Clean up anything else as needed - tell other threads / queues to shutdown
'''
shared_globals.network_active_queues.remove(self.q)
#self.q.put("shutdown") # Do we need to tell the threads to shutdown? Perhaps if reloading the config
self.join()
# END DEF
# END CLASS
关于linux - Py 2.7 架构 : How to persistent HTTP/S with multiple servers and not gather data to send multiple times?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37667441/
这个问题在这里已经有了答案: Why don't Java's +=, -=, *=, /= compound assignment operators require casting? (11 个
我搜索了很多,但没有一个链接能帮助我解决这个问题。我得到了 ORA-21500: internal error code, arguments: [%s], [%s], [%s], [%s], [%s
我正在做 RegexOne 正则表达式教程,它有一个 question关于编写正则表达式以删除不必要的空格。 教程中提供的解决方案是 We can just skip all the starting
([\s\S]+|\s?) 中 |\s? 的目的或作用是什么?如果没有它,表达式会不会与 ([\s\S]+) 相同? 最佳答案 这不是完全相同的。 ([\s\S]+|\s?) 会匹配空字符串,而 ([
这个正则表达式有一组还是两组? 我正在尝试使用第二组访问 bookTitle 但出现错误: Pattern pattern = Pattern.compile("^\\s*(.*?)\\s+-\\s+
在 C 中给定一个字符串指针 s,下面的迭代会做什么?即它以什么方式遍历字符串? for (++s ; *s; ++s); 最佳答案 for (++s ; *s;++s) 表示 将指针 s 递增到字符
我正在用一个 node.js 应用程序解析一个大列表并有这段代码 sizeCode = dbfr.CN_DESC.split('\s+-\s*|\s*-\s+') 这似乎不起作用,因为它返回了 [ '
我正在编写一个简单的字符串连接程序。 该程序按照我发布的方式运行。但是,我首先使用以下代码编写它来查找字符串的结尾: while (*s++) ; 但是,这个方法并没有奏效。我传递给它的字符串
这个问题已经有答案了: What does (?和aramchand来自Mohandas Karamchand G 因此,在使用这些匹配来分割字符串后,您最终会得到 {"M", "K", "G"} 注
我正在尝试转换 Map到 List使用 lambda。 本质上,我想将键和值与 '=' 连接起来之间。这看起来微不足道,但我找不到如何去做。 例如 Map map = new HashMap<>();
我正在经历 K & R,并且在递增指针时遇到困难。练习 5.3(第 107 页)要求您使用指针编写一个 strcat 函数。 在伪代码中,该函数执行以下操作: 将 2 个字符串作为输入。 找到字符串
在下面的代码中,pS 和 s.pS 在最后一行是否保证相等?也就是说,在语句S s = S();中,是否可以确定不会构造一个临时的S? #include using namespace std; s
演示示例代码: public void ReverseString(char[] s) { for(int i = 0, j = s.Length-1; i < j; i++, j--){
我一直在寻找类似于 .NET examples 中的示例的 PowerShell 脚本.取一个 New-TimeSpan 并显示为 1 天 2 小时 3 分钟 4 秒。排除其零的地方,在需要的地方添加
def func(s): s = s + " is corrected" return s string_list = ["She", "He"] for s in string_li
我是 python 的新手。当我在互联网上搜索 lambda 时。我在 lambda_functions 中找到了这个声明. processFunc = collapse and (lambda s:
我最近开始学习正则表达式,并试图为上面的问题写一个正则表达式。如果限制只放在一个字母上(例如不超过 2 个“b”),这并不困难。 那么答案就是:a* c*(b|ε)a* c*(b|ε)a* c* 但是
当我运行 npm install 时出现以下错误,但我无法修复它。 我试过:npm install -g windows-build-tools 也没有修复这个错误 ERR! configure
有很多有趣的haskell网上可以找到片段。 This post可以在 this (awesome) Stack Overflow question 下找到. The author写道: discou
我知道以下三行代码旨在将字符串提取到$ value中并将其存储在$ header中。但是我不知道$value =~ s/^\s+//;和$value =~ s/\s+$//;之间有什么区别。 $val
我是一名优秀的程序员,十分优秀!