- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我一直在玩 Akka Persistence,并编写了以下程序来测试我的理解。问题是每次运行这个程序时我都会得到不同的结果。正确答案是 49995000,但我并不总是明白这一点。我已经清除了每次运行之间的日志目录,但它没有任何区别。任何人都可以看到发生了什么问题吗?该程序简单地对从 1 到 n 的所有数字求和(在下面的代码中,n 是 9999)。
正确答案是:(n * (n+1))/2。对于 n=9999,即 49995000。
编辑:与 JDK 7 相比,JDK 8 似乎更一致地工作。我应该只使用 JDK 8 吗?
package io.github.ourkid.akka.aggregator.guaranteed
import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.persistence.AtLeastOnceDelivery
import akka.persistence.PersistentActor
case class ExternalRequest(updateAmount : Int)
case class CountCommand(deliveryId : Long, updateAmount : Int)
case class Confirm(deliveryId : Long)
sealed trait Evt
case class CountEvent(updateAmount : Int) extends Evt
case class ConfirmEvent(deliveryId : Long) extends Evt
class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId = "persistent-actor-ref-1"
override def receiveCommand : Receive = {
case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState)
case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState)
}
override def receiveRecover : Receive = {
case evt : Evt => updateState(evt)
}
def updateState(evt:Evt) = evt match {
case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount))
case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId)
}
}
class FactorialActor extends Actor {
var count = 0
def receive = {
case CountCommand(deliveryId : Long, updateAmount:Int) => {
count = count + updateAmount
sender() ! Confirm(deliveryId)
}
case "print" => println(count)
}
}
object GuaranteedDeliveryTest extends App {
val system = ActorSystem()
val factorial = system.actorOf(Props[FactorialActor])
val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path))
import system.dispatcher
system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" }
for (i <- 1 to 9999)
delActor ! ExternalRequest(i)
}
name := "akka_aggregator"
organization := "io.github.ourkid"
version := "0.0.1-SNAPSHOT"
scalaVersion := "2.11.4"
scalacOptions ++= Seq("-unchecked", "-deprecation")
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)
val Akka = "2.3.7"
val Spray = "1.3.2"
libraryDependencies ++= Seq(
// Core Akka
"com.typesafe.akka" %% "akka-actor" % Akka,
"com.typesafe.akka" %% "akka-cluster" % Akka,
"com.typesafe.akka" %% "akka-persistence-experimental" % Akka,
"org.iq80.leveldb" % "leveldb" % "0.7",
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",
// For future REST API
"io.spray" %% "spray-httpx" % Spray,
"io.spray" %% "spray-can" % Spray,
"io.spray" %% "spray-routing" % Spray,
"org.typelevel" %% "scodec-core" % "1.3.0",
// CSV reader
"net.sf.opencsv" % "opencsv" % "2.3",
// Logging
"com.typesafe.akka" %% "akka-slf4j" % Akka,
"ch.qos.logback" % "logback-classic" % "1.0.13",
// Testing
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"com.typesafe.akka" %% "akka-testkit" % Akka % "test",
"io.spray" %% "spray-testkit" % Spray % "test",
"org.scalacheck" %% "scalacheck" % "1.11.6" % "test"
)
fork := true
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor")
##########################################
# Akka Persistence Reference Config File #
##########################################
akka {
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
# to STDOUT)
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Log level used by the configured loggers (see "loggers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "INFO"
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
# Protobuf serialization for persistent messages
actor {
serializers {
akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
}
serialization-bindings {
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.serialization.Message" = akka-persistence-message
}
}
persistence {
journal {
# Maximum size of a persistent message batch written to the journal.
max-message-batch-size = 200
# Maximum size of a deletion batch written to the journal.
max-deletion-batch-size = 10000
# Path to the journal plugin to be used
plugin = "akka.persistence.journal.leveldb"
# In-memory journal plugin.
inmem {
# Class name of the plugin.
class = "akka.persistence.journal.inmem.InmemJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
# LevelDB journal plugin.
leveldb {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.LeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Storage location of LevelDB files.
dir = "journal"
# Use fsync on write
fsync = on
# Verify checksum on read.
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port
native = on
# native = off
}
# Shared LevelDB journal plugin (for testing only).
leveldb-shared {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# timeout for async journal operations
timeout = 10s
store {
# Dispatcher for shared store actor.
store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Storage location of LevelDB files.
dir = "journal"
# Use fsync on write
fsync = on
# Verify checksum on read.
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port
native = on
}
}
}
snapshot-store {
# Path to the snapshot store plugin to be used
plugin = "akka.persistence.snapshot-store.local"
# Local filesystem snapshot store plugin.
local {
# Class name of the plugin.
class = "akka.persistence.snapshot.local.LocalSnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"
# Storage location of snapshot files.
dir = "snapshots"
}
}
view {
# Automated incremental view update.
auto-update = on
# Interval between incremental updates
auto-update-interval = 5s
# Maximum number of messages to replay per incremental view update. Set to
# -1 for no upper limit.
auto-update-replay-max = -1
}
at-least-once-delivery {
# Interval between redelivery attempts
redeliver-interval = 5s
# Maximum number of unconfirmed messages that will be sent in one redelivery burst
redelivery-burst-limit = 10000
# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
# message will be sent to the actor.
warn-after-number-of-unconfirmed-attempts = 5
# Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
# allowed to hold in memory.
max-unconfirmed-messages = 100000
}
dispatchers {
default-plugin-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
default-replay-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
default-stream-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}
}
}
18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3974790
24064453
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
49995000
49995000
49995000
17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started
0
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3727815
22167811
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
51084018
51084018
52316760
52316760
52316760
52316760
52316760
17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
2982903
17710176
49347145
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
51704199
51704199
55107844
55107844
55107844
55107844
最佳答案
您正在使用 AtLeastOnceDelivery
语义。正如它所说的here :
Note At-least-once delivery implies that original message send order is not always preserved and the destination may receive duplicate messages. That means that the semantics do not match those of a normal ActorRef send operation:
it is not at-most-once delivery message order for the same sender–receiver pair is not preserved due to possible resends after a crash and restart of the destination messages are still delivered—to the new actor incarnation These semantics is similar to what an ActorPath represents (see Actor Lifecycle), therefore you need to supply a path and not a reference when delivering messages. The messages are sent to the path with an actor selection.
FactorialActor
中的重复数字或者不要使用这个语义。
关于scala - Akka 持久性与确认交付会产生不一致的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27592304/
我正在尝试使用 Scala 2.12.3、sbt 0.13.6 以及 build.sbt 文件中的以下库依赖项构建一个项目: libraryDependencies ++= Seq( "com.t
我根本没有任何 Akka 经验。我想知道 Akka 消息传递如何在 JVM 内以及 JVM 之间工作。 JVM 中的消息是一些类似 POJO 的对象吗? JVM之间的通信是否需要任何类型的JMS(如服
Akka的EventBus是否可以与远程 Actor 一起使用? 据我所知,它本身不支持此功能。任何人都可以确认吗? 看起来有可能对提供相似功能的Actor进行编码。例如。在订阅远程服务器上的Even
我在单个 JVM 上使用 Akka 开发了我的应用程序。现在我想在多台机器上分配工作负载。我开始阅读文档并感到困惑。 有两种方法可以通过集群和远程处理使 Akka 应用程序分发。 我不明白两者之间的区
我想使用 Akka HTTP 构建一个连接到现有接收器(带有 Kafka react 流)的 REST 服务,但我不知道如何将 HTTP 流链接到 Akka 流接收器... 我应该选择使用 Flows
我在某处听说默认情况下是一个 Actor 系统,这意味着它的 ExecutorService/Dispatcher 正在创建一个非 Deamon 线程池来运行 Actor。如果确实如此,那将解释我所经
在我的应用程序中,我有一个角色需要在等待某些操作完成时存储消息,同时它需要支持高优先级消息(控制消息)。 stash trait 需要一个 Dequeue邮箱类型,我找不到控制感知出队邮箱是否有意义。
Akka.NET 和原始 Akka 可以使用 Remoting 进行通信吗? 换句话说,Akka 可以用于连接系统中的 JVM 和 CLR 吗? 最佳答案 这个问题在 akka.net Github
Akka 新手。创建一个扩展 SupervisorStrategy 的新 Scala 类为我提供了以下模板: class MySupervisorStrategy extends Supervisor
我正在尝试为包含并行处理流的 Akka 流定义一个图(我正在使用 Akka.NET,但这应该无关紧要)。想象一个订单的数据源,每个订单由一个订单 ID 和一个产品列表(订单商品)组成。工作流程如下:
我有一个 akka actor(worker)接收请求并回复它。请求处理可能需要 3-60 分钟。来电者(也是 Actor )目前正在使用 !!!并等待 future.get,但是如果需要,可以更改
我应该如何在 Akka 持久化 (Eventsourcing/CQRS) 中构建我的 Actor? 分层 平行 我的电子商务应用程序中有这些域对象 用户 - 用户可以创建帐户 商店 - 用户可以创建商
我正在尝试构建和运行一个 akka 流(在 Java DSL 中),以 2 个 actor 作为源,然后是一个合并结点,然后是 1 个接收器: Source src1 = Source.act
我正在尝试监督 Akka Actor ,更具体地说是 Cluster Singleton创建使用 ClusterSingletonManager .我试图更好地控制异常、日志和 Actor 的生命周期
我试图了解何时何地使用不同的 built-in Akka mailboxes以及何时适合自己滚动。但是,该页面上的任何地方都没有解释“ 有界邮箱 ”实际上是什么,或者它的行为方式与无界邮箱有何不同。此
在Akka中等待多个actor的结果的正确方法是什么? Principles of Reactive Programming Coursera 类(class)有一个带有复制键值存储的练习。无需深入研
我正在为一个项目评估 Akka,我正在尝试弄清楚我是否可以通过将参与者状态保存在高可用数据存储中来使用 Akka-Persistence 实现服务的高可用性。 (我不打算使用 Akka-Cluster
我阅读了 Akka 文档并找到了这个 As mentioned before, if a node is unreachable then gossip convergence is not poss
我正在使用 akka 流,并且我有一段我需要有条件地跳过的图表,因为流程无法处理某些值。具体来说,我有一个接受字符串并发出 http 请求的流,但是当字符串为空时,服务器无法处理这种情况。但我只需要返
我们正在考虑使用 Akka 进行客户端服务器通信,并尝试对数据传输进行基准测试。目前我们正在尝试发送一百万条消息,其中每条消息都是一个具有 8 个字符串字段的案例类。 目前,我们正在努力获得可接受的性
我是一名优秀的程序员,十分优秀!