gpt4 book ai didi

scala - 使用Akka Streams插入Cassandra

转载 作者:行者123 更新时间:2023-12-04 13:22:27 25 4
gpt4 key购买 nike

我正在学习Akka Streams,作为一项练习,我想将日志插入Cassandra。问题是我无法使流将日志插入数据库中。

我天真地尝试了以下方法:

object Application extends AkkaApp with LogApacheDao {

// The log file is read line by line
val source: Source[String, Unit] = Source.fromIterator(() => scala.io.Source.fromFile(filename).getLines())

// Each line is converted to an ApacheLog object
val flow: Flow[String, ApacheLog, Unit] = Flow[String]
.map(rawLine => {
rawLine.split(",") // implicit conversion Array[String] -> ApacheLog
})

// Log objects are inserted to Cassandra
val sink: Sink[ApacheLog, Future[Unit]] = Sink.foreach[ApacheLog] { log => saveLog(log) }

source.via(flow).to(sink).run()

}


saveLog()像这样在LogApacheDao中定义(我省略了列值以获得更清晰的代码):

val session = cluster.connect()

session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};")

session.execute(s"DROP TABLE IF EXISTS $keyspace.$table;")

session.execute(s"CREATE TABLE $keyspace.$table (...)")

val preparedStatement = session.prepare(s"INSERT INTO $keyspace.$table (...) VALUES (...);")

def saveLog(logEntry: ApacheLog) = {
val stmt = preparedStatement.bind(...)

session.executeAsync(stmt)
}


在接收器中输入时,从Array [String]到ApacheLog的转换没有问题(已通过println验证)。同样,键空间和表都已创建,但是当执行saveLog时,似乎有些东西阻塞了,没有插入。

我没有收到任何错误,但是Cassandra驱动程序核心(3.0.0)一直给我:

Connection[/172.17.0.2:9042-1, inFlight=0, closed=false] was inactive for 30 seconds, sending heartbeat
Connection[/172.17.0.2:9042-2, inFlight=0, closed=false] heartbeat query succeeded


我应该补充一点,我使用的是dockerized Cassandra。

最佳答案

关于scala - 使用Akka Streams插入Cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37173206/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com