gpt4 book ai didi

python - 在Python中使用多处理来改善Cassandra写入指令不起作用

转载 作者:太空宇宙 更新时间:2023-11-03 17:40:21 25 4
gpt4 key购买 nike

我试图使用 python 中的多处理来提高 Cassandra 数据库写入性能,如给定的 here但这个过程所花费的时间增加了很多。我想知道我的代码是否有任何错误。发布我的 python 代码片段。我使用两种不同的工作方法将数据插入到两个表中。这是第一个 worker

    def worker(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p):

cluster = Cluster(['127.0.0.1'])
metadata = cluster.metadata
session = cluster.connect()

session.execute("USE db;")
print current_process().name

session.execute("INSERT INTO db.day (daymonthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, s_sp, s_dp, s_vtag, s_chksm) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(daymonthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(s_sp), int(s_dp), int(s_vtag), str(s_chksm)))

session.cluster.shutdown()
session.shutdown()

第二个 worker :

    def worker1(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, p):

cluster = Cluster(['127.0.0.1'])
metadata = cluster.metadata
session = cluster.connect()
session.execute("USE db;")
print current_process().name
session.execute("INSERT INTO db.month (monthyear, ts, c_country, c_lat, c_lon, e_sma, e_dma, e_etype, ip_version, ip_ihl, ip_tos_dscp, ip_totallen, ip_idnum, ip_fragoff, ip_ttl, ip_proto, ip_hdrchksm, ip_sip, ip_dip, ip_opts, u_sp, u_dp, u_len, u_chksm) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);",(str(monthyear), int(ts1), str(country), str(lat), str(lon), str(sma), str(dma), str(etype), str(version), str(ihl), str(tos_dscp), int(totallen), int(idnum), str(fragoff), int(ttl), int(proto), str(hdrchksm), str(sip), str(dip), str(opts), int(u_sp), int(u_dp), int(u_len), str(u_chksm)))

session.cluster.shutdown()
session.shutdown()

调用方法:

def dump():

for ts1,buf in pcap:
if ip.p == dpkt.ip.IP_PROTO_TCP:
res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))
res.wait()
res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))
res.wait()
if type(ip.data) == UDP :
res = pool.apply_async(worker, args=(daymonthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))
res.wait()
res = pool.apply_async(worker1, args=(monthyear, ts1, country, lat, lon, sma, dma, etype, version, ihl, tos_dscp, totallen, idnum, fragoff, ttl, proto, hdrchksm, sip, dip, opts, t_sp, t_dp, t_sqnum, t_acknum, t_dataoff, t_flags, t_winsz, t_chksm, t_urgptr, t_opts, process_n,))
res.wait()

所有使用的变量都已声明,代码中没有错误。唯一的问题是,它比转储方法中顺序执行插入语句花费的时间要多得多。谁能告诉我我是否以正确的方式使用多处理?

最佳答案

与 Cassandra 的连接非常广泛。如果从每个进程进行连接,您将花费更多资源。当进程数量增加时,这一点尤其明显。建立 N 个连接(对于一个大的 N)原则上就像你自己唱歌一样。

关于python - 在Python中使用多处理来改善Cassandra写入指令不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30650709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com