gpt4 book ai didi

apache-spark - Spark结构化流Redis接收器性能不理想

转载 作者:行者123 更新时间:2023-12-03 06:45:30 25 4
gpt4 key购买 nike

我使用了Spark结构化的流式消费kafka消息并将数据保存到Redis。通过扩展ForeachWriter [org.apache.spark.sql.Row],我使用了redis接收器来保存数据。该代码运行良好,但是每秒仅将100多个数据保存到redis。有没有更好的方法来加快此过程?虽然下面的代码将在每个mico批次中连接和断开与Redis服务器的连接,但是有什么方法可以只连接一次并保持连接以最小化连接成本,我认为这是浪费时间的主要原因?
我尝试了广播jedis,但是jedis和jedispool都不可序列化,因此无法正常工作。

我的接收器代码如下:

class StreamDataSink extends ForeachWriter[org.apache.spark.sql.Row]{

var jedis:Jedis = _

override def open(partitionId:Long,version:Long):Boolean={
if(null == jedis){
jedis = FPCRedisUtils.getPool.getResource
}
true
}

override def process(record: Row): Unit = {

if(0 == record(3)){
jedis.select(Constants.REDIS_DATABASE_INDEX)
if(jedis.exists("counter")){
jedis.incr("counter")
}else{
jedis.set("counter",1.toString)
}
}
}

override def close(errorOrNull: Throwable): Unit = {
if(null != jedis){
jedis.close()
jedis.disconnect()
}
}

任何建议将不胜感激。

最佳答案

不要做jedis.disconnect()。实际上,这将关闭套接字,并在下一次强制建立新连接。仅使用jedis.close(),它将把连接返回到池中。

当您对不存在的键调用INCR时,它将自动创建,默认为零,然后递增,从而产生一个值为1的新键。

这将if-else简化为jedis.incr("counter")

有了这个你有:

jedis.select(Constants.REDIS_DATABASE_INDEX)
jedis.incr("counter")

查看是否确实需要 SELECT。这是针对每个连接的,所有连接默认为DB0。如果共享同一个Jedis池的所有工作负载都使用DB 0,则无需调用select。

如果确实需要select和incr,则将它们 pipeline:
Pipeline pipelined = jedis.pipelined()
pipelined.select(Constants.REDIS_DATABASE_INDEX)
pipelined.incr("counter")
pipelined.sync()

这将在一条网络消息中发送两个命令,从而进一步提高性能。

关于apache-spark - Spark结构化流Redis接收器性能不理想,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59710142/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com