- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在玩 Akka Streams 2.4.2,我想知道是否有可能设置一个使用数据库表作为源的流,并且只要有一条记录添加到表中,该记录就会具体化并推送到下游?
我已经实现了@PH88 的解决方案。这是我的表定义:
case class Record(id: Int, value: String)
class Records(tag: Tag) extends Table[Record](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (Record.tupled, Record.unapply)
}
实现如下:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
try{
val newRecStream = Source.unfold((0, List[Record]())) { n =>
try {
val q = for (r <- TableQuery[Records].filter(row => row.id > n._1)) yield (r)
val r = Source.fromPublisher(db.stream(q.result)).collect {
case rec => println(s"${rec.id}, ${rec.value}"); rec
}.runFold((n._1, List[Record]())) {
case ((id, xs), current) => (current.id, current :: xs)
}
val answer: (Int, List[Record]) = Await.result(r, 5.seconds)
Option(answer, None)
}
catch { case e:Exception => println(e); Option(n, e) }
}
Await.ready(newRecStream.throttle(1, 1.second, 1, ThrottleMode.shaping).runForeach(_ => ()), Duration.Inf)
}
finally {
system.shutdown
db.close
}
但我的问题是,当我尝试调用 flatMapConcat
时,我得到的类型是 Serializable
。
已更新以尝试来自@PH88 的db.run
建议:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val disableAutoCommit = SimpleDBIO(_.connection.setAutoCommit(false))
val queryLimit = 1
try {
val newRecStream = Source.unfoldAsync(0) { n =>
val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
db.run(q.result).map { recs =>
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(s"${rec.id}, ${rec.value}")
}
Await.ready(newRecStream, Duration.Inf)
}
catch
{
case ex: Throwable => println(ex)
}
finally {
system.shutdown
db.close
}
这行得通(我将查询限制更改为 1,因为目前我的数据库表中只有几个项目)- 除非它打印程序存在的表中的最后一行。这是我的日志输出:
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/xxxxxxx/dev/src/scratch/scala/fpp-in-scala/target/scala-2.11/classes/logback.xml]
17:09:28,062 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:09:28,064 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:09:28,079 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
17:09:28,102 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [application] to DEBUG
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
17:09:28,103 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:09:28,104 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@4278284b - Registering current configuration as safe fallback point
17:09:28.117 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
1, WASSSAAAAAAAP!
2, WHAAAAT?!?
3, booyah!
4, what!
5, This rocks!
6, Again!
7, Again!2
8, I love this!
9, Akka Streams rock
10, Tuning jdbc
17:09:39.000 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
Process finished with exit code 0
找到丢失的部分 - 需要更换它:
Some(recs.last.id, recs)
用这个:
val lastId = if(recs.isEmpty) n else recs.last.id
Some(lastId, recs)
当结果集为空时,对 recs.last.id 的调用抛出 java.lang.UnsupportedOperationException: empty.last
。
最佳答案
一般来说,SQL 数据库是一种“被动”构造,不会像您描述的那样主动推送更改。您只能通过定期轮询来“模拟”“推送”,例如:
val newRecStream = Source
// Query for table changes
.unfold(initState) { lastState =>
// query for new data since lastState and save the current state into newState...
Some((newState, newRecords))
}
// Throttle to limit the poll frequency
.throttle(...)
// breaks down into individual records...
.flatMapConcat { newRecords =>
Source.unfold(newRecords) { pendingRecords =>
if (records is empty) {
None
} else {
// take one record from pendingRecords and save to newRec. Save the rest into remainingRecords.
Some(remainingRecords, newRec)
}
}
}
更新时间:2016 年 2 月 24 日
基于问题 2/23/2016 更新的伪代码示例:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val queryLimit = 10
try {
val completion = Source
.unfoldAsync(0) { lastRowId =>
val q = TableQuery[Records].filter(row => row.id > lastRowId).take(queryLimit)
db.run(q.result).map { recs =>
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(s"${rec.id}, ${rec.value}")
}
// Block forever
Await.ready(completion, Duration.Inf)
} catch {
case ex: Throwable => println(ex)
} finally {
system.shutdown
db.close
}
它将在 unfoldAsync
中重复执行查询 DB,一次最多检索 10 (queryLimit
) 条记录并将记录发送到下游 (-> throttle
-> flatMapConcat
-> runForeach
)。最后的 Await
实际上会永远阻塞。
更新时间:2016 年 2 月 25 日
可执行的“概念验证”代码:
import akka.actor.ActorSystem
import akka.stream.{ThrottleMode, ActorMaterializer}
import akka.stream.scaladsl.Source
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object Infinite extends App{
implicit val system = ActorSystem("Publisher")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
case class Record(id: Int, value: String)
try {
val completion = Source
.unfoldAsync(0) { lastRowId =>
Future {
val recs = (lastRowId to lastRowId + 10).map(i => Record(i, s"rec#$i"))
Some(recs.last.id, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(rec)
}
Await.ready(completion, Duration.Inf)
} catch {
case ex: Throwable => println(ex)
} finally {
system.shutdown
}
}
关于slick-3.0 - 是否可以使用 Akka Stream 从数据库表创建 "infinite"流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35509602/
我有一台 MySQL 服务器和一台 PostgreSQL 服务器。 需要从多个表中复制或重新插入一组数据 MySQL 流式传输/同步到 PostgreSQL 表。 这种复制可以基于时间(Sync)或事
如果两个表的 id 彼此相等,我尝试从一个表中获取数据。这是我使用的代码: SELECT id_to , email_to , name_to , status_to
我有一个 Excel 工作表。顶行对应于列名称,而连续的行每行代表一个条目。 如何将此 Excel 工作表转换为 SQL 表? 我使用的是 SQL Server 2005。 最佳答案 这取决于您使用哪
我想合并两个 Django 模型并创建一个模型。让我们假设我有第一个表表 A,其中包含一些列和数据。 Table A -------------- col1 col2 col3 col
我有两个表:table1,table2,如下所示 table1: id name 1 tamil 2 english 3 maths 4 science table2: p
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 1 年前。 Improve th
下面两个语句有什么区别? newTable = orginalTable 或 newTable.data(originalTable) 我怀疑 .data() 方法具有性能优势,因为它在标准 AX 中
我有一个表,我没有在其中显式定义主键,它并不是真正需要的功能......但是一位同事建议我添加一个列作为唯一主键以随着数据库的增长提高性能...... 谁能解释一下这是如何提高性能的? 没有使用索引(
如何将表“产品”中的产品记录与其不同表“图像”中的图像相关联? 我正在对产品 ID 使用自动增量。 我觉得不可能进行关联,因为产品 ID 是自动递增的,因此在插入期间不可用! 如何插入新产品,获取产品
我有一个 sql 表,其中包含关键字和出现次数,如下所示(尽管出现次数并不重要): ____________ dog | 3 | ____________ rat | 7 | ____
是否可以使用目标表中的LAST_INSERT_ID更新源表? INSERT INTO `target` SELECT `a`, `b` FROM `source` 目标表有一个自动增量键id,我想将其
我正在重建一个搜索查询,因为它在“我看到的”中变得多余,我想知道什么 (albums_artists, artists) ( ) does in join? is it for boosting pe
以下是我使用 mysqldump 备份数据库的开关: /usr/bin/mysqldump -u **** --password=**** --single-transaction --databas
我试图获取 MySQL 表中的所有行并将它们放入 HTML 表中: Exam ID Status Assigned Examiner
如何查询名为 photos 的表中的所有记录,并知道当前用户使用单个查询将哪些结果照片添加为书签? 这是我的表格: -- -- Table structure for table `photos` -
我的网站都在 InnoDB 表上运行,目前为止运行良好。现在我想知道在我的网站上实时发生了什么,所以我将每个页面浏览量(页面、引荐来源网址、IP、主机名等)存储在 InnoDB 表中。每秒大约有 10
我在想我会为 mysql 准备两个表。一个用于存储登录信息,另一个用于存储送货地址。这是传统方式还是所有内容都存储在一张表中? 对于两个表...有没有办法自动将表 A 的列复制到表 B,以便我可以引用
我不是程序员,我从这个表格中阅读了很多关于如何解决我的问题的内容,但我的搜索效果不好 我有两张 table 表 1:成员 id*| name | surname -------------------
我知道如何在 ASP.NET 中显示真实表,例如 public ActionResult Index() { var s = db.StaffInfoDBSet.ToList(); r
我正在尝试运行以下查询: "insert into visits set source = 'http://google.com' and country = 'en' and ref = '1234
我是一名优秀的程序员,十分优秀!