gpt4 book ai didi

scala - Akka 持久性与确认交付会产生不一致的结果

转载 作者:行者123 更新时间:2023-12-04 11:05:07 24 4
gpt4 key购买 nike

我一直在玩 Akka Persistence,并编写了以下程序来测试我的理解。问题是每次运行这个程序时我都会得到不同的结果。正确答案是 49995000,但我并不总是明白这一点。我已经清除了每次运行之间的日志目录,但它没有任何区别。任何人都可以看到发生了什么问题吗?该程序简单地对从 1 到 n 的所有数字求和(在下面的代码中,n 是 9999)。

正确答案是:(n * (n+1))/2。对于 n=9999,即 49995000。

编辑:与 JDK 7 相比,JDK 8 似乎更一致地工作。我应该只使用 JDK 8 吗?

package io.github.ourkid.akka.aggregator.guaranteed

import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.persistence.AtLeastOnceDelivery
import akka.persistence.PersistentActor

case class ExternalRequest(updateAmount : Int)
case class CountCommand(deliveryId : Long, updateAmount : Int)
case class Confirm(deliveryId : Long)

sealed trait Evt
case class CountEvent(updateAmount : Int) extends Evt
case class ConfirmEvent(deliveryId : Long) extends Evt

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery {

override def persistenceId = "persistent-actor-ref-1"

override def receiveCommand : Receive = {
case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState)
case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState)
}

override def receiveRecover : Receive = {
case evt : Evt => updateState(evt)
}

def updateState(evt:Evt) = evt match {
case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount))
case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId)
}
}

class FactorialActor extends Actor {
var count = 0
def receive = {
case CountCommand(deliveryId : Long, updateAmount:Int) => {
count = count + updateAmount
sender() ! Confirm(deliveryId)
}
case "print" => println(count)
}
}

object GuaranteedDeliveryTest extends App {
val system = ActorSystem()

val factorial = system.actorOf(Props[FactorialActor])

val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path))

import system.dispatcher

system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" }

for (i <- 1 to 9999)
delActor ! ExternalRequest(i)



}

SBT文件
name := "akka_aggregator"

organization := "io.github.ourkid"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.4"

scalacOptions ++= Seq("-unchecked", "-deprecation")

resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
)

val Akka = "2.3.7"
val Spray = "1.3.2"

libraryDependencies ++= Seq(
// Core Akka
"com.typesafe.akka" %% "akka-actor" % Akka,
"com.typesafe.akka" %% "akka-cluster" % Akka,
"com.typesafe.akka" %% "akka-persistence-experimental" % Akka,
"org.iq80.leveldb" % "leveldb" % "0.7",
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",

// For future REST API
"io.spray" %% "spray-httpx" % Spray,
"io.spray" %% "spray-can" % Spray,
"io.spray" %% "spray-routing" % Spray,
"org.typelevel" %% "scodec-core" % "1.3.0",

// CSV reader
"net.sf.opencsv" % "opencsv" % "2.3",

// Logging
"com.typesafe.akka" %% "akka-slf4j" % Akka,
"ch.qos.logback" % "logback-classic" % "1.0.13",

// Testing
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"com.typesafe.akka" %% "akka-testkit" % Akka % "test",
"io.spray" %% "spray-testkit" % Spray % "test",
"org.scalacheck" %% "scalacheck" % "1.11.6" % "test"
)
fork := true
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor")

应用程序.conf文件
##########################################
# Akka Persistence Reference Config File #
##########################################

akka {

# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
# to STDOUT)
loggers = ["akka.event.slf4j.Slf4jLogger"]

# Log level used by the configured loggers (see "loggers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"

# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "INFO"

# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

# Protobuf serialization for persistent messages
actor {

serializers {

akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
}

serialization-bindings {

"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.serialization.Message" = akka-persistence-message
}
}

persistence {

journal {

# Maximum size of a persistent message batch written to the journal.
max-message-batch-size = 200

# Maximum size of a deletion batch written to the journal.
max-deletion-batch-size = 10000

# Path to the journal plugin to be used
plugin = "akka.persistence.journal.leveldb"

# In-memory journal plugin.
inmem {

# Class name of the plugin.
class = "akka.persistence.journal.inmem.InmemJournal"

# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}

# LevelDB journal plugin.
leveldb {

# Class name of the plugin.
class = "akka.persistence.journal.leveldb.LeveldbJournal"

# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

# Storage location of LevelDB files.
dir = "journal"

# Use fsync on write
fsync = on

# Verify checksum on read.
checksum = off

# Native LevelDB (via JNI) or LevelDB Java port
native = on
# native = off
}

# Shared LevelDB journal plugin (for testing only).
leveldb-shared {

# Class name of the plugin.
class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"

# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"

# timeout for async journal operations
timeout = 10s

store {

# Dispatcher for shared store actor.
store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# Storage location of LevelDB files.
dir = "journal"

# Use fsync on write
fsync = on

# Verify checksum on read.
checksum = off

# Native LevelDB (via JNI) or LevelDB Java port
native = on
}
}
}

snapshot-store {

# Path to the snapshot store plugin to be used
plugin = "akka.persistence.snapshot-store.local"

# Local filesystem snapshot store plugin.
local {

# Class name of the plugin.
class = "akka.persistence.snapshot.local.LocalSnapshotStore"

# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"

# Storage location of snapshot files.
dir = "snapshots"
}
}

view {

# Automated incremental view update.
auto-update = on

# Interval between incremental updates
auto-update-interval = 5s

# Maximum number of messages to replay per incremental view update. Set to
# -1 for no upper limit.
auto-update-replay-max = -1
}

at-least-once-delivery {
# Interval between redelivery attempts
redeliver-interval = 5s

# Maximum number of unconfirmed messages that will be sent in one redelivery burst
redelivery-burst-limit = 10000

# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
# message will be sent to the actor.
warn-after-number-of-unconfirmed-attempts = 5

# Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
# allowed to hold in memory.
max-unconfirmed-messages = 100000
}

dispatchers {
default-plugin-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
default-replay-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
default-stream-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}
}
}

正确的输出:
18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3974790
24064453
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
49995000
49995000
49995000

运行不正确:
17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started
0
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
3727815
22167811
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
49995000
51084018
51084018
52316760
52316760
52316760
52316760
52316760

另一个不正确的运行:
17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started
0
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl]
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent]
2982903
17710176
49347145
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent]
51704199
51704199
55107844
55107844
55107844
55107844

最佳答案

您正在使用 AtLeastOnceDelivery语义。正如它所说的here :

Note At-least-once delivery implies that original message send order is not always preserved and the destination may receive duplicate messages. That means that the semantics do not match those of a normal ActorRef send operation:

it is not at-most-once delivery message order for the same sender–receiver pair is not preserved due to possible resends after a crash and restart of the destination messages are still delivered—to the new actor incarnation These semantics is similar to what an ActorPath represents (see Actor Lifecycle), therefore you need to supply a path and not a reference when delivering messages. The messages are sent to the path with an actor selection.



所以有些号码可能会收到不止一次。您可以忽略 FactorialActor 中的重复数字或者不要使用这个语义。

关于scala - Akka 持久性与确认交付会产生不一致的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27592304/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com