- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
SOFAJRaft的SOFAJRaft-RheaKV 是基于 SOFAJRaft 和 RocksDB 实现的嵌入式、分布式、高可用、强一致的 KV 存储类库。SOFAJRaft-RheaKV 集群主要包括三个核心组件:PD,Store 和 Region。 @Author:Akai-yuan @更新时间:2023/2/3 。
SOFAJRaft-RheaKV 存储类库主要包括 PD,Store 和 Region 三个核心组件 ,支持轻量级的状态/元信息存储以及集群同步,分布式锁服务使用场景 。
PD 是 全局的中心总控节点 ,负责整个集群的 调度管理 、Region ID 生成、维护 RegionRouteTable 路由表。一个 PDServer 管理多个集群,集群之间基于 clusterId 隔离;PD Server 需要单独部署, 很多场景其实并不需要自管理,RheaKV 也支持不启用 PD,不需要自管理的集群可不启用 PD,设置 PlacementDriverOptions 的 fake选项为 true 即可 。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表.
Store 是集群中的一个物理存储节点,一个 Store 包含一个或多个 Region。通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳 ,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息, 比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等.
Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。Region对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的Epoch 感知 Region 是否有变化。 为了让大家更清楚 PD,Store 和 Region 三个核心组件 的功能,这里放一张官方图片以便于理解:
我们从JRaft-Example模块的RheaKV部分开始,首先看到com.alipay.sofa.jraft.example.rheakv.Server1的main方法.
public static void main(final String[] args) {
final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
.withFake(true) // use a fake pd
.config();
final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
//StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现
.withStorageType(StorageType.RocksDB)
.withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
.withRaftDataPath(Configs.RAFT_DATA_PATH)
.withServerAddress(new Endpoint("127.0.0.1", 8181))
.config();
final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
.withClusterName(Configs.CLUSTER_NAME) //
.withUseParallelCompress(true) //
.withInitialServerList(Configs.ALL_NODE_ADDRESSES)
.withStoreEngineOptions(storeOpts) //
.withPlacementDriverOptions(pdOpts) //
.config();
System.out.println(opts);
final Node node = new Node(opts);
node.start();
Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
System.out.println("server1 start OK");
}
关于 Node节点 的实现: 里面维护了一个RheaKVStoreOptions、RheaKVStore.
public class Node {
private final RheaKVStoreOptions options;
private RheaKVStore rheaKVStore;
public Node(RheaKVStoreOptions options) {
this.options = options;
}
public void start() {
this.rheaKVStore = new DefaultRheaKVStore();
this.rheaKVStore.init(this.options);
}
public void stop() {
this.rheaKVStore.shutdown();
}
public RheaKVStore getRheaKVStore() {
return rheaKVStore;
}
}
可以看到调用了 DefaultRheaKVStore ,还调用了他的 init方法 进行初始化.
public synchronized boolean init(final RheaKVStoreOptions opts) {
//判断是否已经启动
if (this.started) {
LOG.info("[DefaultRheaKVStore] already started.");
return true;
}
DescriberManager.getInstance().addDescriber(RouteTable.getInstance());
this.opts = opts;
//根据PlacementDriverOptions初始化PD
final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
final String clusterName = opts.getClusterName();
Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
Requires.requireNonNull(clusterName, "opts.clusterName");
if (Strings.isBlank(pdOpts.getInitialServerList())) {
// 如果为空,则继承父级的值
pdOpts.setInitialServerList(opts.getInitialServerList());
}
//这里不启用 PD,就实例化一个FakePlacementDriverClient
if (pdOpts.isFake()) {
this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
//启用 PD,就实例化一个RemotePlacementDriverClient
} else {
this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
}
//初始化FakePlacementDriverClient/RemotePlacementDriverClient
if (!this.pdClient.init(pdOpts)) {
LOG.error("Fail to init [PlacementDriverClient].");
return false;
}
// 初始化压缩策略
ZipStrategyManager.init(opts);
// 初始化存储引擎
final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
if (stOpts != null) {
stOpts.setInitialServerList(opts.getInitialServerList());
this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);
if (!this.storeEngine.init(stOpts)) {
LOG.error("Fail to init [StoreEngine].");
return false;
}
}
//获取当前节点的ip和端口号
final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
final RpcOptions rpcOpts = opts.getRpcOptions();
Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
//初始化一个RpcService,并重写getLeader方法
this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
@Override
public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
final Endpoint leader = getLeaderByRegionEngine(regionId);
if (leader != null) {
return leader;
}
return super.getLeader(regionId, forceRefresh, timeoutMillis);
}
};
if (!this.rheaKVRpcService.init(rpcOpts)) {
LOG.error("Fail to init [RheaKVRpcService].");
return false;
}
//获取重试次数,默认重试两次
this.failoverRetries = opts.getFailoverRetries();
//默认5000
this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
//是否只从leader读取数据,默认为true
this.onlyLeaderRead = opts.isOnlyLeaderRead();
//初始化kvDispatcher, 这里默认为true
if (opts.isUseParallelKVExecutor()) {
final int numWorkers = Utils.cpus();
//乘以16
final int bufSize = numWorkers << 4;
final String name = "parallel-kv-executor";
final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
//初始化Dispatcher
this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory);
}
this.batchingOpts = opts.getBatchingOptions();
//默认是true
if (this.batchingOpts.isAllowBatching()) {
this.getBatching = new GetBatching(KeyEvent::new, "get_batching",
new GetBatchingHandler("get", false));
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
new GetBatchingHandler("get_only_safe", true));
this.putBatching = new PutBatching(KVEvent::new, "put_batching",
new PutBatchingHandler("put"));
}
LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
return this.started = true;
}
其中有些代码操作与 DefaultRheaKVStore的init方法 中的某些代码一致,就不再重复赘述,其余的用注释的方式标注在以下代码块中.
public synchronized boolean init(final StoreEngineOptions opts) {
if (this.started) {
LOG.info("[StoreEngine] already started.");
return true;
}
DescriberManager.getInstance().addDescriber(this);
this.storeOpts = Requires.requireNonNull(opts, "opts");
Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
//获取ip和端口
final int port = serverAddress.getPort();
final String ip = serverAddress.getIp();
//如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip
if (ip == null || Utils.IP_ANY.equals(ip)) {
serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
opts.setServerAddress(serverAddress);
}
//获取度量上报时间
final long metricsReportPeriod = opts.getMetricsReportPeriod();
// 初始化RegionEngineOptions
List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
//如果RegionEngineOptions为空,则初始化一个
if (rOptsList == null || rOptsList.isEmpty()) {
// -1 region
final RegionEngineOptions rOpts = new RegionEngineOptions();
rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
rOptsList = Lists.newArrayList();
rOptsList.add(rOpts);
opts.setRegionEngineOptionsList(rOptsList);
}
//获取集群名
final String clusterName = this.pdClient.getClusterName();
//遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数
for (final RegionEngineOptions rOpts : rOptsList) {
//用集群名+“-”+RegionId 拼接设置为RaftGroupId
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
if (Strings.isBlank(rOpts.getInitialServerList())) {
// if blank, extends parent's value
rOpts.setInitialServerList(opts.getInitialServerList());
}
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
.getCommonNodeOptions().copy());
}
//如果原本没有设置度量上报时间,那么就重置一下
if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
// extends store opts
rOpts.setMetricsReportPeriod(metricsReportPeriod);
}
}
// 初始化Store和Store里面的region
final Store store = this.pdClient.getStoreMetadata(opts);
if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
LOG.error("Empty store metadata: {}.", store);
return false;
}
this.storeId = store.getId();
this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
// 初始化执行器
if (this.readIndexExecutor == null) {
this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
}
if (this.raftStateTrigger == null) {
this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
}
if (this.snapshotExecutor == null) {
this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads(),
opts.getSnapshotMaxThreads());
}
// init rpc executors
final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
// 初始化rpc远程执行器,用来执行RPCServer的Processors
if (!useSharedRpcExecutor) {
if (this.cliRpcExecutor == null) {
this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
}
if (this.raftRpcExecutor == null) {
this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
}
if (this.kvRpcExecutor == null) {
this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());
}
}
// 初始化指标度量
startMetricReporters(metricsReportPeriod);
// 初始化rpcServer,供其他服务调用
this.rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverAddress, this.raftRpcExecutor,
this.cliRpcExecutor);
//为server加入各种processor
StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
if (!this.rpcServer.init(null)) {
LOG.error("Fail to init [RpcServer].");
return false;
}
// init db store
// 根据不同的类型选择db
if (!initRawKVStore(opts)) {
return false;
}
if (this.rawKVStore instanceof Describer) {
DescriberManager.getInstance().addDescriber((Describer) this.rawKVStore);
}
// init all region engine
// 为每个region初始化RegionEngine
if (!initAllRegionEngine(opts, store)) {
LOG.error("Fail to init all [RegionEngine].");
return false;
}
// heartbeat sender
// 如果开启了自管理的集群,那么需要初始化心跳发送器
if (this.pdClient instanceof RemotePlacementDriverClient) {
HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
if (heartbeatOpts == null) {
heartbeatOpts = new HeartbeatOptions();
}
this.heartbeatSender = new HeartbeatSender(this);
if (!this.heartbeatSender.init(heartbeatOpts)) {
LOG.error("Fail to init [HeartbeatSender].");
return false;
}
}
this.startTime = System.currentTimeMillis();
LOG.info("[StoreEngine] start successfully: {}.", this);
return this.started = true;
}
在上述StoreEngine的初始化中,可以看到有对regionEngine进行初始化,接下来我们单独再拿出该部分代码进行分析.
// init region options
List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
if (rOptsList == null || rOptsList.isEmpty()) {
// -1 region
final RegionEngineOptions rOpts = new RegionEngineOptions();
rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
rOptsList = Lists.newArrayList();
rOptsList.add(rOpts);
opts.setRegionEngineOptionsList(rOptsList);
}
final String clusterName = this.pdClient.getClusterName();
for (final RegionEngineOptions rOpts : rOptsList) {
rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
rOpts.setServerAddress(serverAddress);
if (Strings.isBlank(rOpts.getInitialServerList())) {
// if blank, extends parent's value
rOpts.setInitialServerList(opts.getInitialServerList());
}
if (rOpts.getNodeOptions() == null) {
// copy common node options
rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
.getCommonNodeOptions().copy());
}
if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
// extends store opts
rOpts.setMetricsReportPeriod(metricsReportPeriod);
}
}
调用pdClient的getStoreMetadata方法进行初始化:
final Store store = this.pdClient.getStoreMetadata(opts);
当调用 FakePlacementDriverClient#getStoreMetadata时:
public Store getStoreMetadata(final StoreEngineOptions opts) {
final Store store = new Store();
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
store.setId(-1);
store.setEndpoint(opts.getServerAddress());
for (final RegionEngineOptions rOpts : rOptsList) {
regionList.add(getLocalRegionMetadata(rOpts));
}
store.setRegions(regionList);
return store;
}
我们来看 AbstractPlacementDriverClient#getLocalRegionMetadata方法:
protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
+ Region.MIN_ID_WITH_MANUAL_CONF);
Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
+ Region.MAX_ID_WITH_MANUAL_CONF);
final byte[] startKey = opts.getStartKeyBytes();
final byte[] endKey = opts.getEndKeyBytes();
final String initialServerList = opts.getInitialServerList();
final Region region = new Region();
final Configuration conf = new Configuration();
// region
region.setId(regionId);
region.setStartKey(startKey);
region.setEndKey(endKey);
region.setRegionEpoch(new RegionEpoch(-1, -1));
// peers
Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
conf.parse(initialServerList);
region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
this.regionRouteTable.addOrUpdateRegion(region);
return region;
}
RegionEpoch 涉及到两个版本号: (1)confVer: Conf 变化的版本号, 当增加或者移除一个peer时,版本号自增 (2)version: Region 版本号, 分裂或合并时,版本号自增 。
public RegionEpoch(long confVer, long version) {
this.confVer = confVer
this.version = version;
}
关于 RegionRouteTable#addOrUpdateRegion:
public void addOrUpdateRegion(final Region region) {
Requires.requireNonNull(region, "region");
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
final long regionId = region.getId();
final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
this.regionTable.put(regionId, region.copy());
this.rangeTable.put(startKey, regionId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
我们看看 RegionRouteTable 的几个字段:
private static final Comparator<byte[]> keyBytesComparator = BytesUtil.getDefaultByteArrayComparator();
private final StampedLock stampedLock = new StampedLock();
private final NavigableMap<byte[], Long> rangeTable = new TreeMap<>(keyBytesComparator);
private final Map<Long, Region> regionTable = Maps.newHashMap();
StoreEngine#initAllRegionEngine 。
private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
Requires.requireNonNull(opts, "opts");
Requires.requireNonNull(store, "store");
//获取主目录
String baseRaftDataPath = opts.getRaftDataPath();
if (Strings.isNotBlank(baseRaftDataPath)) {
try {
FileUtils.forceMkdir(new File(baseRaftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
return false;
}
} else {
baseRaftDataPath = "";
}
final Endpoint serverAddress = opts.getServerAddress();
//获取RegionEngineOptions和region
final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
final List<Region> regionList = store.getRegions();
Requires.requireTrue(rOptsList.size() == regionList.size());
for (int i = 0; i < rOptsList.size(); i++) {
final RegionEngineOptions rOpts = rOptsList.get(i);
if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
continue;
}
final Region region = regionList.get(i);
//检验region路径是否为空,为空则重新设值
if (Strings.isBlank(rOpts.getRaftDataPath())) {
final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
}
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
//根据Region初始化RegionEngine
final RegionEngine engine = new RegionEngine(region, this);
if (engine.init(rOpts)) {
// 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求
final RegionKVService regionKVService = new DefaultRegionKVService(engine);
registerRegionKVService(regionKVService);
//放入到ConcurrentMap<Long, RegionEngine> regionKVServiceTable 中
this.regionEngineTable.put(region.getId(), engine);
} else {
LOG.error("Fail to init [RegionEngine: {}].", region);
return false;
}
}
return true;
}
** 。
public synchronized boolean init(final RegionEngineOptions opts) {
if (this.started) {
LOG.info("[RegionEngine: {}] already started.", this.region);
return true;
}
this.regionOpts = Requires.requireNonNull(opts, "opts");
//实例化状态机
this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);
// node options
NodeOptions nodeOpts = opts.getNodeOptions();
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}
//如果度量间隔时间大于零,那么开启度量
final long metricsReportPeriod = opts.getMetricsReportPeriod();
if (metricsReportPeriod > 0) {
// metricsReportPeriod > 0 means enable metrics
nodeOpts.setEnableMetrics(true);
}
final Configuration initialConf = new Configuration();
if (!initialConf.parse(opts.getInitialServerList())) {
LOG.error("Fail to parse initial configuration {}.", opts.getInitialServerList());
return false;
}
//初始化集群配置
nodeOpts.setInitialConf(initialConf);
nodeOpts.setFsm(this.fsm);
//初始化各种日志的路径
final String raftDataPath = opts.getRaftDataPath();
try {
FileUtils.forceMkdir(new File(raftDataPath));
} catch (final Throwable t) {
LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
return false;
}
if (Strings.isBlank(nodeOpts.getLogUri())) {
final Path logUri = Paths.get(raftDataPath, "log");
nodeOpts.setLogUri(logUri.toString());
}
if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
final Path meteUri = Paths.get(raftDataPath, "meta");
nodeOpts.setRaftMetaUri(meteUri.toString());
}
if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
nodeOpts.setSnapshotUri(snapshotUri.toString());
}
LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,
nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
final Endpoint serverAddress = opts.getServerAddress();
final PeerId serverId = new PeerId(serverAddress, 0);
final RpcServer rpcServer = this.storeEngine.getRpcServer();
this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
this.node = this.raftGroupService.start(false);
//初始化node节点
RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
if (this.node != null) {
final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
//RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现
//RheaKV 的 Raft 入口,从这里开始 Raft 流程
this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
//拦截请求做指标度量
this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
// metrics config
if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
if (metricRegistry != null) {
final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
// start raft node metrics reporter
this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
.prefixedWith("region_" + this.region.getId()) //
.withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
.outputTo(LOG) //
.scheduleOn(scheduler) //
.shutdownExecutorOnStop(scheduler != null) //
.build();
this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);
}
}
this.started = true;
LOG.info("[RegionEngine] start successfully: {}.", this);
}
return this.started;
}
最后此篇关于SOFAJRaft源码阅读(伍)-初识RheaKV的文章就讲到这里了,如果你想了解更多关于SOFAJRaft源码阅读(伍)-初识RheaKV的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我是一名优秀的程序员,十分优秀!