gpt4 book ai didi

scala - 使用 Play 框架设置 Akka 集群

转载 作者:行者123 更新时间:2023-12-02 04:06:59 27 4
gpt4 key购买 nike

我目前正在尝试使用自动发现服务来实现集群 Play + akka 实现。然而,我似乎遇到了游戏中包含的 Guice DI 加载器的问题。他们的文档摘录如下:

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

While we recommend you use the built in actor system, as it sets up everything such as the correct classloader, lifecycle hooks, etc, there is nothing stopping you from using your own actor system. It is important however to ensure you do the following:

Register a stop hook to shut the actor system down when Play shuts down Pass in the correct classloader from the Play Environment otherwise Akka won’t be able to find your applications classes

Ensure that either you change the location that Play reads it’s akka configuration from using play.akka.config, or that you don’t read your akka configuration from the default akka config, as this will cause problems such as when the systems try to bind to the same remote ports

我已经完成了他们推荐的上述配置,但是我似乎无法绕过游戏仍然绑定(bind)它的内置ActorSystemProvider从BuiltInModule:

class BuiltinModule extends Module {
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] =

{
def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {
factories.flatMap(_(env, configuration))
}

Seq(
bind[Environment] to env,
bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),
bind[Configuration].toProvider[ConfigurationProvider],
bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],

// Application lifecycle, bound both to the interface, and its implementation, so that Application can access it
// to shut it down.
bind[DefaultApplicationLifecycle].toSelf,
bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),

bind[Application].to[DefaultApplication],
bind[play.Application].to[play.DefaultApplication],

bind[Router].toProvider[RoutesProvider],
bind[play.routing.Router].to[JavaRouterAdapter],
bind[ActorSystem].toProvider[ActorSystemProvider],
bind[Materializer].toProvider[MaterializerProvider],
bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],
bind[ExecutionContext].to[ExecutionContextExecutor],
bind[Executor].to[ExecutionContextExecutor],
bind[HttpExecutionContext].toSelf,

bind[CryptoConfig].toProvider[CryptoConfigParser],
bind[CookieSigner].toProvider[CookieSignerProvider],
bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],
bind[AESCrypter].toProvider[AESCrypterProvider],
bind[play.api.libs.Crypto].toSelf,
bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]
) ++ dynamicBindings(
HttpErrorHandler.bindingsFromConfiguration,
HttpFilters.bindingsFromConfiguration,
HttpRequestHandler.bindingsFromConfiguration,
ActionCreator.bindingsFromConfiguration
)
}
}

我尝试创建自己的 GuiceApplicationBuilder 来绕过这个问题,但是现在它只是将重复的绑定(bind)异常移至来自BuiltInModule。

这就是我正在尝试的:

AkkaConfigModule:

package module.akka

import com.google.inject.{AbstractModule, Inject, Provider, Singleton}
import com.typesafe.config.Config
import module.akka.AkkaConfigModule.AkkaConfigProvider
import net.codingwell.scalaguice.ScalaModule
import play.api.Application

/**
* Created by dmcquill on 8/15/16.
*/
object AkkaConfigModule {
@Singleton
class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {
override def get() = {
val classLoader = application.classloader
NodeConfigurator.loadConfig(classLoader)
}
}
}

/**
* Binds the application configuration to the [[Config]] interface.
*
* The config is bound as an eager singleton so that errors in the config are detected
* as early as possible.
*/
class AkkaConfigModule extends AbstractModule with ScalaModule {

override def configure() {
bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()
}

}

Actor 系统模块:

package module.akka


import actor.cluster.ClusterMonitor
import akka.actor.ActorSystem
import com.google.inject._
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import play.api.inject.ApplicationLifecycle

import scala.collection.JavaConversions._

/**
* Created by dmcquill on 7/27/16.
*/
object ActorSystemModule {
@Singleton
class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {
override def get() = {
val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))

// add the GuiceAkkaExtension to the system, and initialize it with the Guice injector
GuiceAkkaExtension(system).initialize(injector)

system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))
system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))

lifecycle.addStopHook { () =>
system.terminate()
}

system
}
}
}

/**
* A module providing an Akka ActorSystem.
*/
class ActorSystemModule extends AbstractModule with ScalaModule {
import module.akka.ActorSystemModule.ActorSystemProvider

override def configure() {
bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()
}
}

应用程序加载器:

class CustomApplicationLoader extends GuiceApplicationLoader {

override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
initialBuilder
.overrides(overrides(context): _*)
.bindings(new AkkaConfigModule, new ActorSystemModule)
}

}

我需要完成的主要事情是配置 ActorSystem,以便我可以通过编程方式加载 Akka 集群的种子节点。

上述方法是正确的方法还是有更好的方法来实现此目的?如果这是正确的方法,那么对于游戏/指导的 DI 设置,我是否从根本上不理解什么?

更新

对于该架构,play+akka 位于同一节点。

最佳答案

最后我尝试做一些比必要的更复杂的事情。我没有执行上述流程,而是简单地以编程方式扩展了初始配置,以便可以以编程方式检索必要的网络信息。

最终结果基本上由几个类组成:

NodeConfigurator:此类包含相关实用程序方法,用于从 application.conf 检索属性,然后以编程方式创建配置以与 kubernetes 发现服务结合使用。

object NodeConfigurator {

/**
* This method given a class loader will return the configuration object for an ActorSystem
* in a clustered environment
*
* @param classLoader the configured classloader of the application
* @return Config
*/
def loadConfig(classLoader: ClassLoader) = {
val config = ConfigFactory.load(classLoader)

val clusterName = config.getString(CLUSTER_NAME_PROP)
val seedPort = config.getString(SEED_PORT_CONF_PROP)

val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
} else {
config.getString(HOST_CONF_PROP)
}

ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
.withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
.withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
.withFallback(config)
.resolve()
}

/**
* Get the local ip address which defaults to localhost if not
* found on the eth0 adapter
*
* @return Option[String]
*/
def getLocalHostAddress: Option[String] = {
import java.net.NetworkInterface

import scala.collection.JavaConversions._

NetworkInterface.getNetworkInterfaces
.find(_.getName equals "eth0")
.flatMap { interface =>
interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
}
}

/**
* Retrieves a set of seed nodes that are currently running in our cluster
*
* @param config akka configuration object
* @return Array[String]
*/
def getSeedNodes(config: Config) = {
if(config.hasPath(SEED_NODES_CONF_PROP)) {
config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
} else {
Array.empty[String]
}
}

/**
* formats the seed node addresses in the proper format
*
* @param clusterName name of akka cluster
* @param seedNodeAddresses listing of current seed nodes
* @param seedNodePort configured seed node port
* @param defaultSeedNodeAddress default seed node address
* @return
*/
def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
if(seedNodeAddresses.isEmpty) {
s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
} else {
seedNodeAddresses.map { address =>
s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
}.mkString("\n")
}
}

val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
val HOST_CONF_PROP = "fitnessAkka.host"
val PORT_CONF_PROP = "fitnessAkka.port"
val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}

CustomApplicationLoader:只需使用 play 的可重写应用程序加载器即可从 NodeConfigurator 获取生成的配置,然后用它扩展初始配置。

class CustomApplicationLoader extends GuiceApplicationLoader {

override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
val classLoader = context.environment.classLoader
val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

initialBuilder
.in(context.environment)
.loadConfig(context.initialConfiguration ++ configuration)
.overrides(overrides(context): _*)
}

}

AkkaActorModule:提供依赖项可注入(inject) actor 引用,与 API 一起使用来显示集群成员。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
def configure = {
bindActor[ClusterMonitor]("cluster-monitor")
}
}

ClusterMonitor:这是一个参与者,它只是监听集群事件并另外接收消息以生成当前集群状态。

class ClusterMonitor @Inject() extends Actor with ActorLogging {
import actor.cluster.ClusterMonitor.GetClusterState

val cluster = Cluster(context.system)
private var nodes = Set.empty[Address]

override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
}

override def postStop(): Unit = cluster.unsubscribe(self)

override def receive = {
case MemberUp(member) => {
nodes += member.address
log.info(s"Cluster member up: ${member.address}")
}
case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
case MemberRemoved(member, previousStatus) => {
nodes -= member.address
log.info(s"Cluster member removed: ${member.address}")
}
case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
case GetClusterState => sender() ! nodes
case _: MemberEvent =>
}

}

object ClusterMonitor {
case class GetClusterState()
}

应用程序:只是一个测试 Controller ,用于输出已加入集群的节点列表

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

implicit val addressWrites = new Writes[Address] {
def writes(address: Address) = Json.obj(
"host" -> address.host,
"port" -> address.port,
"protocol" -> address.protocol,
"system" -> address.system
)
}

implicit val timeout = Timeout(5, TimeUnit.SECONDS)

def listClusterNodes = Action.async {
(clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
Ok(Json.toJson(addresses))
}
}

}

上述 Controller 的结果产生类似于以下的输出:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
{
"host": "172.17.0.3",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
},
{
"host": "172.17.0.4",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
},
{
"host": "172.17.0.5",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
}
]

关于scala - 使用 Play 框架设置 Akka 集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38995437/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com