- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
在我的代码中,我有两个假设任务:一个从生成器获取 url 并使用 Twisted 的 Cooperator 批量下载它们,另一个获取下载的源并异步解析它。我试图将所有获取和解析任务封装到一个 Deferred 对象中,该对象在所有页面都已下载且所有源都已解析时回调。
我想出了以下解决方案:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks = defer.DeferredList(deferreds)
main_tasks.addCallback(on_finish)
return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
代码有效,但我觉得我要么遗漏了一些明显的东西,要么不知道一个简单的 Twisted 模式,这会使它变得更简单。有没有更好的方法来返回一个在所有提取和解析完成后回调的 Deferred?
最佳答案
如目前所写,在我看来,这段代码的并行下载数量有限,但并行解析作业的数量不受限制。那是故意的吗?我将假设“否”,因为如果您的网络恰好很快而您的解析器恰好很慢,因为 URL 的数量接近无穷大,那么您的内存使用量也是如此:)。
所以这里有一个并行性有限但通过下载顺序执行解析的东西:
from twisted.internet import defer, task
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task(reactor):
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parse)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(BATCH_SIZE)])
.addCallback(task_finished))
task.react(main_task)
之所以可行,是因为 parse
(显然)返回了一个 Deferred
,将其作为回调添加到 getPage
返回的结果中Deferred
在 parse
完成其业务之前不会调用由 coiterate
添加的回调。
既然你问的是惯用的 Twisted 代码,我也冒昧地对其进行了一些现代化改造(使用 task.react
而不是手动运行 react 器,内联表达式使事情更简洁等等)。
如果您确实希望并行解析多于并行提取,那么这样的方法可能会更好:
from twisted.internet import defer, task
from twisted.web.client import getPage
PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10
def main_task(reactor):
parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)
def parseWhenReady(r):
def parallelParse(_):
parse(r).addBoth(
lambda result: parseSemaphore.release().addCallback(
lambda _: result
)
)
return parseSemaphore.acquire().addCallback(parallelParse)
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parseWhenReady)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(PARALLEL_FETCHES)])
.addCallback(lambda done:
defer.DeferredList(
[parseSemaphore.acquire()
for _ in xrange(PARALLEL_PARSES)]
))
.addCallback(task_finished))
task.react(main_task)
您可以看到 parseWhenReady
返回从 acquire
返回的 Deferred
,因此只要并行解析可以,并行提取就会继续 begin,因此即使解析器过载,您也不会继续不加选择地获取。但是,parallelParse
谨慎地避免返回 parse
或 release
返回的 Deferred
,因为提取应该能够继续那些正在进行中。
(请注意,由于您的初始示例不可运行,因此我根本没有测试过其中任何一个。希望即使存在错误,意图也很清楚。)
关于python - Twisted:WAITING子任务完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20336476/
我正在尝试发送 RFC5424 中定义的结构化数据从 Twisted 到系统日志。我在 Ubuntu 上,系统日志实现是 rsyslog。 我正在使用“twistd --syslog”运行扭曲的应用程
摘要:无法自动向 Twisted 的检修孔发出命令。寻求解决方案/建议。 Twisted 有一个很棒的功能,称为 manhole。它允许用户通过 ssh 连接到当前正在运行的 Twisted 服务器并
我正在尝试发送 RFC5424 中定义的结构化数据从 Twisted 到系统日志。我在 Ubuntu 上,系统日志实现是 rsyslog。 我正在使用“twistd --syslog”运行扭曲的应用程
我正在尝试使用扭曲的 spawnProcess 启动一个进程,并使用 psutil 每 5 秒记录一次资源使用情况。首先,我尝试使用以下代码获取生成的进程的可执行文件名称: #!/usr/bin/py
Azure 网站能否托管 Twisted 应用程序?例如像这样的东西: from twisted.internet import reactor from twisted.web import ser
有没有办法限制 Twisted http 客户端的下载速率?如果没有,在 Twisted 中实现此类客户端的最简单方法是什么? 最佳答案 Twisted 中的流控制最常使用 IProducer.pau
我的基于 Twisted 的客户端循环发送 UDP 数据包。因此,我正在使用 DatagramProtocol 类。这是来源: #!/usr/bin/python # -*- coding: utf-
raven 有很多集成,包括 python 日志记录。一方面,twisted 不使用python 的日志记录。而另一方面,在twisted 中,raven 并没有直接的整合。 那么在基于扭曲的设置中使
有人告诉我,Twisted基于库Ampoule是创建在不同计算机上执行的进程池的好方法。然而,没有相关文档,Ampoule 的示例也没有说明这一点。 我对类似于 stdlib multiprocess
Twisted 最近推出了一个新的日志记录模块:twisted.logger。我已经通读了文档 [1],但我一直无法找到设置实际日志级别的位置。关于日志观察者[2] 的部分建议,如果您显式配置自己的
我想问一个关于如何关闭扭曲连接的问题 RPC . 我知道有人问过类似的问题,但似乎没有回答我的问题。 我正在做一些基本的连接,如下图所示: cfactory = pb.PBClientFactory(
我正在使用 Twisted 编写代码,但在为我的扭曲互联网延迟变量想出一个合理的变量名时遇到了麻烦。这是我的候选人: d :太通用,太短,违反了 pylint 规则 C0103。 def :与内置函数
我正在为 Web 服务器的静态部分使用 Twisted Web static.File 资源。 对于开发,我希望能够添加新文件或修改当前静态文件,而无需重新启动 Twisted 网络服务器。 我在 g
我想使用 Twisted 重建现有应用程序的通信部分。这个应用程序确实从客户端向服务器发送数据,只是这样,服务器不发送任何东西。 我如何使用 Twisted 的事件驱动概念来实现这一目标?我目前使用
我开始考虑通过扩展当前的 Twisted FTP 来实现显式 FTP。 大部分代码都很简单,实现 AUTH、PBSZ、PROT 很容易,我得到了一个有效的安全控制 channel 。 我的问题是数据通
我想实现 WebRTC 对等连接。我不知道如何设置 ICE 服务器或应该使用什么工具。任何人都可以给我建议吗?非常感谢。 最佳答案 实际上你不需要编写自己的 STUN/TURN 服务器 有很多现成的解
正如标题所示,ProcessProtocol 类上的这两个函数有什么区别?关于什么时候应该使用一种而不是另一种的文档有点稀疏? 我最好寻找一些可以证明这一点的用例示例。 最佳答案 我猜文档在这一点上有
使用下面的代码,我似乎可以相当轻松地使用 multiprocessing.reduction 在子进程中重建套接字.. import socket,os import multiprocessing
我正在尝试在 python 上使用 twisted 编写服务器。这是我文件的头部: from twisted.internet.protocol import Factory, Protocol fr
我使用 Twisted 制作了一个简单的 http 服务器,它发送 Content-Type: multipart/x-mixed-replace header 。我正在使用它来测试我想设置为接受长期
我是一名优秀的程序员,十分优秀!