gpt4 book ai didi

org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore类的使用及代码示例

转载 作者:知者 更新时间:2024-03-24 06:01:05 36 4
gpt4 key购买 nike

本文整理了Java中org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore类的一些代码示例,展示了WALProcedureStore类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WALProcedureStore类的具体详情如下:
包路径:org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore
类名称:WALProcedureStore

WALProcedureStore介绍

[英]WAL implementation of the ProcedureStore.

When starting, the upper layer will first call #start(int), then #recoverLease(), then #load(ProcedureLoader).

In #recoverLease(), we will get the lease by closing all the existing wal files(by calling recoverFileLease), and creating a new wal writer. And we will also get the list of all the old wal files.

FIXME: notice that the current recover lease implementation is problematic, it can not deal with the races if there are two master both wants to acquire the lease...

In #load(ProcedureLoader) method, we will load all the active procedures. See the comments of this method for more details.

The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is a #slots, which is more like the ring buffer, and in the insert, update and delete methods we will put thing into the #slots and wait. And there is a background sync thread(see the #syncLoop() method) which get data from the #slots and write them to the FileSystem, and notify the caller that we have finished.

TODO: try using disruptor to increase performance and simplify the logic?

The #storeTracker keeps track of the modified procedures in the newest wal file, which is also the one being written currently. And the deleted bits in it are for all the procedures, not only the ones in the newest wal file. And when rolling a log, we will first store it in the trailer of the current wal file, and then reset its modified bits, so that it can start to track the modified procedures for the new wal file.

The #holdingCleanupTracker is used to test whether we are safe to delete the oldest wal file. When there are log rolling and there are more than 1 wal files, we will make use of it. It will first be initialized to the oldest file's tracker(which is stored in the trailer), using the method ProcedureStoreTracker#resetTo(ProcedureStoreTracker,boolean), and then merge it with the tracker of every newer wal files, using the ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker). If we find out that all the modified procedures for the oldest wal file are modified or deleted in newer wal files, then we can delete it. This is because that, every time we call ProcedureStore#insert(Procedure[]) or ProcedureStore#update(Procedure), we will persist the full state of a Procedure, so the earlier wal records for this procedure can all be deleted.
[中]沃尔实施程序恢复。
启动时,上层将首先调用#start(int),然后调用#recoverLease(),然后调用#load(ProcedureLoader)。
在#recoverLease()中,我们将通过关闭所有现有的wal文件(通过调用recoverFileLease)并创建一个新的wal writer来获取租约。我们还将获得所有旧wal文件的列表。
FIXME:请注意,当前的recover lease实现存在问题,如果有两个master都想要获得租约,它将无法处理竞争。。。
在#load(ProcedureLoader)方法中,我们将加载所有活动的过程。有关更多详细信息,请参阅此方法的注释。
实际的日志记录方式有点像RS端基于文件系统的WAL实现。有一个#槽,更像是环形缓冲区,在插入、更新和删除方法中,我们将把东西放入#槽中,然后等待。还有一个后台同步线程(请参阅#syncLoop()方法),它从#插槽获取数据并将其写入文件系统,并通知调用者我们已经完成了。
TODO:尝试使用disruptor来提高性能并简化逻辑?
#storeTracker在最新的wal文件中跟踪修改的过程,这也是当前正在编写的文件。其中删除的部分适用于所有程序,而不仅仅是最新wal文件中的程序。滚动日志时,我们将首先将其存储在当前wal文件的尾部,然后重置其修改的位,以便它可以开始跟踪新wal文件的修改过程。
#holdingCleanupTracker用于测试删除最旧的wal文件是否安全。当有日志滚动并且有超过1个wal文件时,我们将使用它。它将首先使用方法ProcedureStoreTracker#reseto(ProcedureStoreTracker,boolean)初始化为最旧文件的跟踪器(存储在预告片中),然后使用ProcedureStoreTracker#setdeletedifmodifiedintower(ProcedureStoreTracker,boolean)将其与每个较新wal文件的跟踪器合并。如果我们发现最旧的wal文件的所有修改过程都在较新的wal文件中被修改或删除,那么我们可以将其删除。这是因为,每次调用ProcedureStore#insert(Procedure[])或ProcedureStore#update(Procedure)时,我们都会保留一个过程的完整状态,因此该过程的早期wal记录都可以被删除。

代码示例

代码示例来源:origin: apache/hbase

/**
  * Parses a directory of WALs building up ProcedureState.
  * For testing parse and profiling.
  * @param args Include pointer to directory of WAL files for a store instance to parse & load.
  */
 public static void main(String [] args) throws IOException {
  Configuration conf = HBaseConfiguration.create();
  if (args == null || args.length != 1) {
   System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR.");
   System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR");
   System.exit(-1);
  }
  WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
   new WALProcedureStore.LeaseRecovery() {
    @Override
    public void recoverFileLease(FileSystem fs, Path path) throws IOException {
     // no-op
    }
   });
  try {
   store.start(16);
   ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store);
   pe.init(1, true);
  } finally {
   store.stop(true);
  }
 }
}

代码示例来源:origin: apache/hbase

@Test
public void testRollAndRemove() throws IOException {
 // Insert something in the log
 Procedure<?> proc1 = new TestSequentialProcedure();
 procStore.insert(proc1, null);
 Procedure<?> proc2 = new TestSequentialProcedure();
 procStore.insert(proc2, null);
 // roll the log, now we have 2
 procStore.rollWriterForTesting();
 assertEquals(2, procStore.getActiveLogs().size());
 // everything will be up to date in the second log
 // so we can remove the first one
 procStore.update(proc1);
 procStore.update(proc2);
 assertEquals(1, procStore.getActiveLogs().size());
 // roll the log, now we have 2
 procStore.rollWriterForTesting();
 assertEquals(2, procStore.getActiveLogs().size());
 // remove everything active
 // so we can remove all the logs
 procStore.delete(proc1.getProcId());
 procStore.delete(proc2.getProcId());
 assertEquals(1, procStore.getActiveLogs().size());
}

代码示例来源:origin: apache/hbase

Path newLogFile = null;
long startPos = -1;
newLogFile = getLogFilePath(logId);
try {
 newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
closeCurrentLogStream(false);
 buildHoldingCleanupTracker();
} else if (logs.size() > walCountWarnThreshold) {
 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" +
 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds());

代码示例来源:origin: apache/hbase

private long syncSlots() throws Throwable {
 int retry = 0;
 int logRolled = 0;
 long totalSynced = 0;
 do {
  try {
   totalSynced = syncSlots(stream, slots, 0, slotIndex);
   break;
  } catch (Throwable e) {
   LOG.warn("unable to sync slots, retry=" + retry);
   if (++retry >= maxRetriesBeforeRoll) {
    if (logRolled >= maxSyncFailureRoll && isRunning()) {
     LOG.error("Sync slots after log roll failed, abort.", e);
     throw e;
    }
    if (!rollWriterWithRetries()) {
     throw e;
    }
    logRolled++;
    retry = 0;
   }
  }
 } while (isRunning());
 return totalSynced;
}

代码示例来源:origin: apache/hbase

private boolean rollWriterWithRetries() {
 for (int i = 0; i < rollRetries && isRunning(); ++i) {
  if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
  try {
   if (rollWriter()) {
    return true;
   }
  } catch (IOException e) {
   LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
  }
 }
 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log");
 return false;
}

代码示例来源:origin: apache/hbase

private void periodicRoll() throws IOException {
 if (storeTracker.isEmpty()) {
  LOG.trace("no active procedures");
  tryRollWriter();
  removeAllLogs(flushLogId - 1, "no active procedures");
 } else {
  if (storeTracker.isAllModified()) {
   LOG.trace("all the active procedures are in the latest log");
   removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log");
  }
  // if the log size has exceeded the roll threshold
  // or the periodic roll timeout is expired, try to roll the wal.
  if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
   tryRollWriter();
  }
  removeInactiveLogs();
 }
}

代码示例来源:origin: apache/hbase

for (int i = 0; i < procs.length; ++i) {
 procs[i] = new TestProcedure(i + 1, 0);
 procStore.insert(procs[i], null);
procStore.rollWriterForTesting();
for (int i = 0; i < procs.length; ++i) {
 procStore.update(procs[i]);
 procStore.rollWriterForTesting();
procStore.stop(false);
procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
  new WALProcedureStore.LeaseRecovery() {
 private int count = 0;
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
procStore.load(loader);
assertEquals(procs.length, loader.getMaxProcId());
assertEquals(1, loader.getRunnableCount());

代码示例来源:origin: apache/hbase

lock.lock();
try {
 while (isRunning()) {
  try {
     periodicRoll();
     float rollTsSec = getMillisFromLastRoll() / 1000.0f;
     LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
          StringUtils.humanSize(totalSynced.get()),
    waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
    if (slotIndex == 0) {
   final float rollSec = getMillisFromLastRoll() / 1000.0f;
   final float syncedPerSec = totalSyncedToStore / rollSec;
   if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) {
   long slotSize = syncSlots();
   logs.getLast().addToSize(slotSize);
   totalSyncedToStore = totalSynced.addAndGet(slotSize);
   Thread.currentThread().interrupt();
   syncException.compareAndSet(null, e);
   sendAbortProcessSignal();
   throw e;
  } catch (Throwable t) {
   syncException.compareAndSet(null, t);
   sendAbortProcessSignal();

代码示例来源:origin: apache/hbase

Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
  ((WALProcedureStore)procStore).getWALDir(),
  null,
  new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
procStore2.start(1);
procStore2.recoverLease();
  procStore2.insert(proc2, null);
  procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later

代码示例来源:origin: apache/hbase

Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
  ((WALProcedureStore)masterStore).getWALDir(),
  null,
  new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
 @Override
 public void postSync() {}
backupStore3.start(1);
backupStore3.recoverLease();
backupStore3.getStoreTracker().setDeleted(1, false);
try {
 backupStore3.delete(1);
 fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {

代码示例来源:origin: apache/hbase

private boolean rollWriter() throws IOException {
 if (!isRunning()) {
  return false;
 }
 // Create new state-log
 if (!rollWriter(flushLogId + 1)) {
  LOG.warn("someone else has already created log {}", flushLogId);
  return false;
 }
 // We have the lease on the log,
 // but we should check if someone else has created new files
 if (getMaxLogId(getLogFiles()) > flushLogId) {
  LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId);
  logs.getLast().removeFile(this.walArchiveDir);
  return false;
 }
 // We have the lease on the log
 return true;
}

代码示例来源:origin: apache/hbase

procStore.insert(proc0, null);
Procedure<?> proc1 = new TestSequentialProcedure();
procStore.insert(proc1, null);
Procedure<?> proc2 = new TestSequentialProcedure();
procStore.insert(proc2, null);
procStore.rollWriterForTesting();
procStore.delete(proc1.getProcId());
procStore.rollWriterForTesting();
procStore.update(proc2);
procStore.rollWriterForTesting();
procStore.delete(proc2.getProcId());
procStore.stop(false);
FileStatus[] logs = fs.listStatus(logDir);
assertEquals(4, logs.length);
procStore.delete(proc0.getProcId());
procStore.periodicRollForTesting();
assertEquals(1, fs.listStatus(logDir).length);
storeRestart(loader);

代码示例来源:origin: apache/hbase

@Test
public void testWalCleanerUpdates() throws Exception {
 TestSequentialProcedure p1 = new TestSequentialProcedure();
 TestSequentialProcedure p2 = new TestSequentialProcedure();
 procStore.insert(p1, null);
 procStore.insert(p2, null);
 procStore.rollWriterForTesting();
 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
 procStore.update(p1);
 procStore.rollWriterForTesting();
 procStore.update(p2);
 procStore.rollWriterForTesting();
 procStore.removeInactiveLogsForTesting();
 assertFalse(procStore.getActiveLogs().contains(firstLog));
}

代码示例来源:origin: apache/hbase

private void createProcedureExecutor() throws IOException {
 MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
 procedureStore =
  new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
 procedureStore.registerListener(new ProcedureStoreListener() {
  @Override
  public void abortProcess() {
   abort("The Procedure Store lost the lease", null);
  }
 });
 MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
 procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
 configurationManager.registerObserver(procEnv);
 int cpus = Runtime.getRuntime().availableProcessors();
 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
  (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
 final boolean abortOnCorruption =
  conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
   MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
 procedureStore.start(numThreads);
 // Just initialize it but do not start the workers, we will start the workers later by calling
 // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
 // details.
 procedureExecutor.init(numThreads, abortOnCorruption);
 procEnv.getRemoteDispatcher().start();
}

代码示例来源:origin: apache/hbase

public void setUpProcedureStore() throws IOException {
 Path testDir = UTIL.getDataTestDir();
 FileSystem fs = testDir.getFileSystem(conf);
 Path logDir = new Path(testDir, "proc-logs");
 System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
 fs.delete(logDir, true);
 store = ProcedureTestingUtility.createWalStore(conf, logDir);
 store.start(1);
 store.recoverLease();
 store.load(new LoadCounter());
}

代码示例来源:origin: apache/hbase

private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException {
 System.out.println("Restarting procedure store to read back the WALs");
 store.stop(false);
 store.start(1);
 store.recoverLease();
 long startTime = currentTimeMillis();
 store.load(loader);
 long timeTaken = System.currentTimeMillis() - startTime;
 System.out.println("******************************************");
 System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
 System.out.println("******************************************");
 System.out.println("Raw format for scripts");
   System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
       + "total_time_ms=%s]",
   NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length,
   UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(),
   deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
}

代码示例来源:origin: apache/hbase

private void setupProcedureStore() throws IOException {
 Path testDir = UTIL.getDataTestDir();
 FileSystem fs = testDir.getFileSystem(conf);
 Path logDir = new Path(testDir, "proc-logs");
 System.out.println("Logs directory : " + logDir.toString());
 fs.delete(logDir, true);
 if ("nosync".equals(syncType)) {
  store = new NoSyncWalProcedureStore(conf, logDir);
 } else {
  store = ProcedureTestingUtility.createWalStore(conf, logDir);
 }
 store.start(numThreads);
 store.recoverLease();
 store.load(new ProcedureTestingUtility.LoadCounter());
 System.out.println("Starting new log : "
   + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}

代码示例来源:origin: apache/hbase

WALProcedureStore walStore = master.getWalProcedureStore();
ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
long millisFromLastRoll = walStore.getMillisFromLastRoll();
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
Collections.sort(procedures, new Comparator<Procedure>() {

代码示例来源:origin: apache/hbase

@Override
 public void abortProcess() {
  LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store");
  store.stop(true);
 }
};

代码示例来源:origin: apache/hbase

procStore.insert(proc1, null);
procIds.add(child2[0].getProcId());
procIds.add(child2[1].getProcId());
procStore.insert(proc2, child2);
procStore.update(proc1);
procStore.update(child2[1]);
procStore.delete(child2[1].getProcId());
procIds.remove(child2[1].getProcId());
procStore.stop(false);
FileStatus[] logs = fs.listStatus(logDir);
assertEquals(3, logs.length);

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com