- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试持续消费来自 kafka 的事件。同一应用程序还使用此消耗的数据来执行一些分析并以 n 秒的间隔更新数据库(假设 n = 60 秒)。
在同一个应用中,如果process1 = Kafka Consumer,process2=数据分析和数据库更新逻辑。
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
涉及计算和数据库更新,因此执行需要 5-10 秒。我不希望 process1
在 process2
执行期间停止。因此,我正在使用多处理模块
(如果我使用Threading
,process1,process2
将是thread1,thread2
> python 中的模块,但由于我读过有关 GIL 的内容,并且 Threading
模块无法利用多核架构,所以我决定使用 multiprocessing
模块。 )在这种情况下实现并发。 (如果我对上面提到的 GIL
或 Threading
模块限制的理解不正确,我深表歉意,并请随时纠正我)。
我的应用程序在两个进程之间有一个相当简单的交互,其中 process1
只是用 60 秒内收到的所有消息填充队列,并在 60 秒结束时传输所有消息到process2
。
我在这个传输逻辑上遇到了问题。如何将队列的内容从 process1
传输到 process2
(我猜这将是主进程或另一个进程?这是我的另一个问题,我应该实例化 2除了主进程之外的进程?)在 60 秒结束时,然后清除队列内容,以便它在另一次迭代中重新开始。
到目前为止,我有以下内容:
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
任何帮助将不胜感激。
最佳答案
您正在处理的问题不是卡夫卡特有的,所以我将使用通用的“消息”,它们只是整数。
在我看来,主要问题是一方面你想要处理消息一产生,另一方面只想更新每 60 秒更新一次数据库。
如果您使用q.get()
,默认情况下此方法调用将阻塞,直到队列中有可用消息。这可能需要超过 60 秒的时间,这会使数据库更新延迟太长时间。所以我们不能使用阻塞的q.get
。我们需要使用带有超时的 q.get
,以便调用是非阻塞的:
import time
import multiprocessing as mp
import random
import Queue
def process_messages(q):
messages = []
start = time.time()
while True:
try:
message = q.get(timeout=1)
except Queue.Empty:
pass
else:
messages.append(message)
print('Doing data analysis on {}'.format(message))
end = time.time()
if end-start > 60:
print('Updating database: {}'.format(messages))
start = end
messages = []
def get_messages(q):
while True:
time.sleep(random.uniform(0,5))
message = random.randrange(100)
q.put(message)
if __name__ == "__main__":
q = mp.Queue()
proc1 = mp.Process(target=get_messages, args=[q])
proc1.start()
proc2 = mp.Process(target=process_messages, args=[q])
proc2.start()
proc1.join()
proc2.join()
产生如下输出:
Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]
关于python - 使用多处理连续使用 Kafka 并按特定时间间隔更新队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32300401/
我试图根据表格看起来像这样的状态代码来查找表格中的空白。 状态表: StateID (PK) | Code -------------------- 1 | AK 2
我有一个配对字符串列表。我想找到两个字母之间的长度/间隔。到目前为止,我可以使用找到有序字母的间隔 alpha =["AM", "KQ", "ZN", "XM", "UK"] leng
我有一个配对字符串列表。我想找到两个字母之间的长度/间隔。到目前为止,我可以使用找到有序字母的间隔 alpha =["AM", "KQ", "ZN", "XM", "UK"] leng
我正在努力弄清楚如何将时间选择器的下拉间隔设置为 15 分钟间隔。默认为 30 分钟 atm。让它工作的正确调用/符号是什么?我已经尝试了很多将它们放入 '' 的变体,但没有任何进展。谢谢! $
假设我有 table teach_subject(teacher_id, subject_id, min_grade_of_school, max_grade_of_school, color_in_
我有下面的图像,我试图以 3 秒的间隔一张一张地显示它们,但我无法让它工作。它继续停留在 0 并且不显示图像,帮助会很好: JavaScript: window.animate = functio
我认为这个问题类似于加权间隔调度问题,但略有不同。 假设您有一个具有开始时间和结束时间的类次 s,该类次从 s.start 开始有 n 个空位到s.end。时隙是从 s.start 到 s.end 的
我试图将一个 GeometryReader 作为按钮推到屏幕底部,但 Spacer 在这里不起作用...... 这个想法是让应用程序响应所有屏幕尺寸。 VStack { GeometryRea
我问了一个相关问题 here但意识到我在计算这个复杂的度量时花费了太多时间(目标是与随机化测试一起使用,所以速度是一个问题)。所以我决定放弃权重,只使用两个度量之间的最小距离。所以这里我有 2 个向量
我最近成立 healthcheck s 在我的 docker-compose配置。 它做得很好,我喜欢它。下面是一个典型的例子: services: app: healthcheck:
我正在 Cocoa 中使用如下设置的 NSTimer 运行 mainLoop: mainLoopTimer = [NSTimer scheduledTimerWithTimeInter
目前正在开发家庭自动化应用程序,其中有事件 API 可以在事件被触发时为我提供事件。但我想持续运行 API,以便跟踪在整个应用程序中触发的事件。还有一个主页,我在其中显示曾经发生的事件。它是一个简单的
我有一个查询应该是这样的要求: { "size": 0, "_source": [ "dateCreated" ], "query": { "bool": {
我有一个 UNIX 格式的时间字符串。我需要将该字符串四舍五入到最接近的 30 分钟间隔。 例如:我的时间是上午 9:20,而不是应该四舍五入到上午 9:30。 如果分钟数大于 30,例如上午 9:4
我有网络调用,我想定期调用它。我只想将运算符 Interval 与 flatMap 一起使用,但在间隔线程上。你能解释一下这种情况吗?我知道Interval只使用一个线程,任务是按顺序处理的。 我有
我在我的 iOS 应用程序中使用了 NSTimer,但由于 SetNeedsDisplay,我没有得到我想要的结果。 我做了一些研究并找到了 CADisplayLink,它为我提供了我想要的动画结果。
我需要通过给出值数组来生成 map 上图例的值。Java 库中是否有函数可以从值数组和计数值生成范围或区间?像这样的东西: Integer[] getIntervals(Number[] values
我的函数中有以下代码,我试图从数据库中获取参数MAX_FAILED_ATTEMPT,并且基于此,如果检查失败,我将发送警报。当前代码将尝试从 MAX_FIELD_ATTEMPT 获取值并立即依次进行检
我在这里要做的是像 Windows XP 上的那样放下一个轨迹栏来更改分辨率:( http://puu.sh/7Li5h.png ) 我想设置特定的间隔/增量值,如上图所示。目前,实际栏下方的线条已经
是否可以停止当前作为 setInterval 运行的函数? 这是我的代码: 这是我调用的函数 function pull_light_status (lights_array) { $.get
我是一名优秀的程序员,十分优秀!