- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如果这看起来令人困惑,我提前道歉,因为我在这里倾倒了很多。基本上,我有一个小服务获取一些 Json,解析并将其提取到案例类,然后将其写入数据库。该服务需要按计划运行,Akka 调度程序可以很好地处理这一点。我的数据库不喜欢 Slick 同时尝试请求新的 AutoInc id,所以我内置了一个 Await.result 来阻止这种情况发生。所有这些都工作得很好,但我的问题从这里开始:有 7 个这些服务正在运行,所以我想使用类似的 Await.result 系统来阻止每个服务。每次我尝试将请求的结束时间作为响应发回时(在 else 块的末尾),它会被发送到死信而不是分发者。基本上:为什么 sender ! time
去死信而不是经销商。对于一个简单的问题来说,这是一个很长的问题,但这就是开发的方式......
ClickActor.scala
import java.text.SimpleDateFormat
import java.util.Date
import Message._
import akka.actor.{Actor, ActorLogging, Props}
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import net.liftweb.json._
import spray.client.pipelining._
import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse, Uri}
import akka.pattern.ask
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
case class ClickData(recipient : String, geolocation : Geolocation, tags : Array[String],
url : String, timestamp : Double, campaigns : Array[String],
`user-variables` : JObject, ip : String,
`client-info` : ClientInfo, message : ClickedMessage, event : String)
case class Geolocation(city : String, region : String, country : String)
case class ClientInfo(`client-name`: String, `client-os`: String, `user-agent`: String,
`device-type`: String, `client-type`: String)
case class ClickedMessage(headers : ClickHeaders)
case class ClickHeaders(`message-id` : String)
class ClickActor extends Actor with ActorLogging{
implicit val formats = DefaultFormats
implicit val timeout = new Timeout(3 minutes)
import context.dispatcher
val con = ConfigFactory.load("connection.conf")
val countries = ConfigFactory.load("country.conf")
val regions = ConfigFactory.load("region.conf")
val df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss -0000")
var time = System.currentTimeMillis()
var begin = new Date(time - (12 hours).toMillis)
var end = new Date(time)
val pipeline : HttpRequest => Future[HttpResponse] = (
addCredentials(BasicHttpCredentials("api", con.getString("mailgun.key")))
~> sendReceive
)
def get(lastrun : Long): Future[String] = {
if(lastrun != 0) {
begin = new Date(lastrun)
end = new Date(time)
}
val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin), "end" -> df.format(end),
"ascending" -> "yes", "limit" -> "100", "pretty" -> "yes", "event" -> "clicked")
val request = Get(uri)
val futureResponse = pipeline(request)
return futureResponse.map(_.entity.asString)
}
def receive = {
case lastrun : Long => {
val start = System.currentTimeMillis()
val responseFuture = get(lastrun)
responseFuture.onSuccess {
case payload: String => val json = parse(payload)
//println(pretty(render(json)))
val elements = (json \\ "items").children
if (elements.length == 0) {
log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " +
begin.toString + " and " + end.toString)
sender ! time
context.stop(self)
}
else {
for (item <- elements) {
val data = item.extract[ClickData]
var tags = ""
if (data.tags.length != 0) {
for (tag <- data.tags)
tags += (tag + ", ")
}
var campaigns = ""
if (data.campaigns.length != 0) {
for (campaign <- data.campaigns)
campaigns += (campaign + ", ")
}
val timestamp = (data.timestamp * 1000).toLong
val msg = new ClickMessage(
data.recipient, data.geolocation.city,
regions.getString(data.geolocation.country + "." + data.geolocation.region),
countries.getString(data.geolocation.country), tags, data.url, timestamp,
campaigns, data.ip, data.`client-info`.`client-name`,
data.`client-info`.`client-os`, data.`client-info`.`user-agent`,
data.`client-info`.`device-type`, data.`client-info`.`client-type`,
data.message.headers.`message-id`, data.event, compactRender(item))
val csqla = context.actorOf(Props[ClickSQLActor])
val future = csqla.ask(msg)
val result = Await.result(future, timeout.duration).asInstanceOf[Int]
if (result == 1) {
log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables")
context.stop(csqla)
}
else if(result == 0) {
log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB")
}
}
sender ! time
log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " +
(System.currentTimeMillis() - start) + " ms")
}
}
}
}
}
import akka.actor.{Props, ActorSystem}
import akka.event.Logging
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Await
class Distributor {
implicit val timeout = new Timeout(10 minutes)
var lastClick : Long = 0
def distribute(system : ActorSystem) = {
val log = Logging(system, getClass)
val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick)
lastClick = Await.result(clickFuture, timeout.duration).asInstanceOf[Long]
log.info(lastClick.toString)
//repeat process with other events (open, unsub, etc)
}
}
最佳答案
原因是因为 'sender' 的值(这是一种检索值的方法)在离开接收块后不再有效,但在上面的例子中使用的 future 仍然会运行,到那个时候它完成了 Actor 将离开接收块并爆炸;无效的发件人会导致消息进入死信队列。
解决方法是要么不使用 future ,要么在组合 future 时, Actor 和发送者然后在触发 future 之前捕获发送者的值(value)。
val s = sender
val responseFuture = get(lastrun)
responseFuture.onSuccess {
....
s ! time
}
关于scala - Akka 死信与询问模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25402349/
我正在创建一个死信 channel 错误处理程序,如下所示 errorHandler(deadLetterChannel("direct:myDLC").useOriginalMessage().ma
以下是我的 Camel 路线代码 .errorHandler(deadLetterChannel("jmstx:queue:ErrorHandler") .useOriginalMessage()
我正在尝试跨不同的集群系统进行分布式发布-订阅,但无论我尝试什么,它都不起作用。 我想做的就是创建一个简单的示例。 1)我创建一个主题,说“内容”。 2) 假设 jvm A 中的一个节点创建主题、订阅
我是一名优秀的程序员,十分优秀!