Flink源碼閱讀:JobManager的HA機(jī)制
JobManager 在 Flink 集群中發(fā)揮著重要的作用,包括任務(wù)調(diào)度和資源管理等工作。如果 JobManager 宕機(jī),那么整個(gè)集群的任務(wù)都將失敗。為了解決 JobManager 的單點(diǎn)問題,F(xiàn)link 也設(shè)計(jì)了 HA 機(jī)制來保障整個(gè)集群的穩(wěn)定性。
基本概念
在 JobManager 啟動(dòng)時(shí),調(diào)用?HighAvailabilityServicesUtils.createHighAvailabilityServices?來創(chuàng)建 HA 服務(wù),HA 依賴的服務(wù)都被封裝在 HighAvailabilityServices 中。當(dāng)前 Flink 內(nèi)部支持兩種高可用模式,分別是 ZooKeeper 和 KUBERNETES。
case?ZOOKEEPER:
return?createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
case?KUBERNETES:
return?createCustomHAServices(
"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
? ? ? ? ? ? configuration,
? ? ? ? ? ? executor);
HighAvailabilityServices 中提供的關(guān)鍵組件包括:
-
LeaderRetrievalService:服務(wù)發(fā)現(xiàn),用于獲取當(dāng)前 leader 的地址。目前用到服務(wù)發(fā)現(xiàn)的組件有 ResourceManager、Dispatcher、JobManager、ClusterRestEndpoint。
-
LeaderElection:選舉服務(wù),從多個(gè)候選者中選出一個(gè)作為 leader。用到選舉服務(wù)的同樣是 ResourceManager、Dispatcher、JobManager、ClusterRestEndpoint 這四個(gè)。
-
CheckpointRecoveryFactory:Checkpoint 恢復(fù)組件的工廠類,提供了創(chuàng)建 CompletedCheckpointStore 和 CheckpointIDCounter 的方法。CompletedCheckpointStore 是用于存儲(chǔ)已完成的 checkpoint 的元信息,CheckpointIDCounter 是用于生成 checkpoint ID。
-
ExecutionPlanStore:用于存儲(chǔ)執(zhí)行計(jì)劃。
-
JobResultStore:用于存儲(chǔ)作業(yè)結(jié)果,這里有兩種狀態(tài),一種是 dirty,表示作業(yè)沒有被完全清理,另一種是 clean,表示作業(yè)清理工作已經(jīng)執(zhí)行完成了。
-
BlobStore:存儲(chǔ)作業(yè)運(yùn)行期間的一些二進(jìn)制文件。
選舉服務(wù)
Flink 的選舉是依靠 LeaderElection 和 LeaderContender 配合完成的。LeaderElection 是 LeaderElectionService 的代理接口,提供了注冊(cè)候選者、確認(rèn) leader 和 判斷候選者是否是 leader 三個(gè)接口。LeaderContender 則是用來表示候選者對(duì)象。當(dāng)一個(gè) LeaderContender 當(dāng)選 leader 后,LeaderElectionService 會(huì)為其生成一個(gè) leaderSessionId,LeaderContender 會(huì)調(diào)用 confirmLeadershipAsync 發(fā)布自己的地址。選舉服務(wù)的具體實(shí)現(xiàn)在 LeaderElectionDriver 接口中。
服務(wù)發(fā)現(xiàn)
服務(wù)發(fā)現(xiàn)的作用是獲取各組件的 leader 地址。服務(wù)發(fā)現(xiàn)依賴 LeaderRetrievalService 和 LeaderRetrievalListener。LeaderRetrievalService 可以啟動(dòng)一個(gè)監(jiān)聽,當(dāng)有新的 leader 當(dāng)選時(shí),會(huì)調(diào)用 LeaderRetrievalListener 的 notifyLeaderAddress 方法。
信息保存
當(dāng) leader 發(fā)生切換時(shí),新的 leader 需要獲取到舊 leader 存儲(chǔ)的信息,這就需要舊 leader 把這些信息存在一個(gè)公共的存儲(chǔ)上。它可以是 ZooKeeper 或 Kubernetes 的存儲(chǔ),也可以是分布式文件系統(tǒng)的存儲(chǔ)。
基于 ZooKeeper 的 HA
選舉服務(wù)
前面我們提到了選舉服務(wù)主要依賴 LeaderElection 和 LeaderContender 配合完成。我們就以 JobManager 為例,看一下機(jī)遇 ZooKeeper 的選舉流程的具體實(shí)現(xiàn)。

圖中 JobMasterServiceLeadershipRunner 是 LeaderContender 的實(shí)現(xiàn)類。在啟動(dòng)服務(wù)時(shí),會(huì)向 LeaderElection 注冊(cè)自己的信息,實(shí)際執(zhí)行者是 DefaultLeaderElectionService。它先創(chuàng)建了 LeaderElectionDriver,然后將 LeaderContender 保存在 leaderContenderRegistry 中。選舉的核心邏輯封裝在 LeaderElectionDriver 中。
在創(chuàng)建 LeaderElectionDriver 時(shí),會(huì)創(chuàng)建 LeaderLatch 對(duì)象和 TreeCache 對(duì)象, LeaderLatch 封裝了與 ZooKeeper 關(guān)聯(lián)的回調(diào),會(huì)接收一個(gè) LeaderElectionDriver 作為監(jiān)聽。TreeCache 主要用于監(jiān)聽 ZooKeeper 中 leader 節(jié)點(diǎn)的變更。
publicZooKeeperLeaderElectionDriver(
? ? ? ? CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
throws?Exception?{
? ? ...
this.leaderLatch =?new?LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
this.treeCache =
? ? ? ? ? ? ZooKeeperUtils.createTreeCache(
? ? ? ? ? ? ? ? ? ? curatorFramework,
"/",
new?ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());
? ? treeCache
? ? ? ? ? ? .getListenable()
? ? ? ? ? ? .addListener(
? ? ? ? ? ? ? ? ? ? (client, event) -> {
switch?(event.getType()) {
case?NODE_ADDED:
case?NODE_UPDATED:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Preconditions.checkNotNull(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? event.getData(),
"The ZooKeeper event data must not be null.");
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? handleChangedLeaderInformation(event.getData());
break;
case?NODE_REMOVED:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Preconditions.checkNotNull(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? event.getData(),
"The ZooKeeper event data must not be null.");
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? handleRemovedLeaderInformation(event.getData().getPath());
break;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? });
? ? leaderLatch.addListener(this);
? ? ...
? ? leaderLatch.start();
? ? treeCache.start();
}
我們進(jìn)入到 LeaderLatch 的 start 方法。它的內(nèi)部是在 ZooKeeper 上創(chuàng)建 latch-xxx 節(jié)點(diǎn)。xxx 是當(dāng)前 LeaderLatch 的 ID,它由 ZooKeeper 生成,ID 最小的當(dāng)選 Leader。
privatevoidcheckLeadership(List<String> children)throws?Exception?{
if?(this.debugCheckLeaderShipLatch !=?null) {
this.debugCheckLeaderShipLatch.await();
? ? }
? ? String localOurPath = (String)this.ourPath.get();
? ? List<String> sortedChildren = LockInternals.getSortedChildren("latch-", sorter, children);
int?ourIndex = localOurPath !=?null?? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
this.log.debug("checkLeadership with id: {}, ourPath: {}, children: {}",?new?Object[]{this.id, localOurPath, sortedChildren});
if?(ourIndex <?0) {
this.log.error("Can't find our node. Resetting. Index: "?+ ourIndex);
this.reset();
? ? }?elseif?(ourIndex ==?0) {
this.lastPathIsLeader.set(localOurPath);
this.setLeadership(true);
? ? }?else?{
this.setLeadership(false);
? ? ? ? String watchPath = (String)sortedChildren.get(ourIndex -?1);
? ? ? ? Watcher watcher =?new?Watcher() {
publicvoidprocess(WatchedEvent event){
if?(LeaderLatch.this.state.get() == LeaderLatch.State.STARTED && event.getType() == EventType.NodeDeleted) {
try?{
? ? ? ? ? ? ? ? ? ? ? ? LeaderLatch.this.getChildren();
? ? ? ? ? ? ? ? ? ? }?catch?(Exception ex) {
? ? ? ? ? ? ? ? ? ? ? ? ThreadUtils.checkInterrupted(ex);
? ? ? ? ? ? ? ? ? ? ? ? LeaderLatch.this.log.error("An error occurred checking the leadership.", ex);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? };
? ? ? ? BackgroundCallback callback =?new?BackgroundCallback() {
publicvoidprocessResult(CuratorFramework client, CuratorEvent event)throws?Exception?{
if?(event.getResultCode() == Code.NONODE.intValue()) {
? ? ? ? ? ? ? ? ? ? LeaderLatch.this.getChildren();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? };
? ? ? ? ((ErrorListenerPathable)((BackgroundPathable)this.client.getData().usingWatcher(watcher)).inBackground(callback)).forPath(ZKPaths.makePath(this.latchPath, watchPath));
? ? }
}
當(dāng)選 Leader 后,會(huì)回調(diào) LeaderElectionDriver 的 isLeader 方法,如果未當(dāng)選,則繼續(xù)監(jiān)聽 latch 節(jié)點(diǎn)的變更。isLeader 會(huì)繼續(xù)回調(diào) LeaderElection 的 onGrantLeadership 方法,接著調(diào)用 LeaderContender 的 grantLeadership。這時(shí)會(huì)啟動(dòng) JobMaster 服務(wù),然后調(diào)用 LeaderElection 的 confirmLeadershipAsync 來確認(rèn)當(dāng)選成功。確認(rèn)的過程是由 LeaderElectionDriver 來執(zhí)行的。主要作用是把當(dāng)前 leader 的信息寫回到 ZooKeeper 的 connection_info 節(jié)點(diǎn)。
publicvoidpublishLeaderInformation(String componentId, LeaderInformation leaderInformation){
? ? Preconditions.checkState(running.get());
if?(!leaderLatch.hasLeadership()) {
return;
? ? }
final?String connectionInformationPath =
? ? ? ? ? ? ZooKeeperUtils.generateConnectionInformationPath(componentId);
? ? LOG.debug(
"Write leader information {} for component '{}' to {}.",
? ? ? ? ? ? leaderInformation,
? ? ? ? ? ? componentId,
? ? ? ? ? ? ZooKeeperUtils.generateZookeeperPath(
? ? ? ? ? ? ? ? ? ? curatorFramework.getNamespace(), connectionInformationPath));
try?{
? ? ? ? ZooKeeperUtils.writeLeaderInformationToZooKeeper(
? ? ? ? ? ? ? ? leaderInformation,
? ? ? ? ? ? ? ? curatorFramework,
? ? ? ? ? ? ? ? leaderLatch::hasLeadership,
? ? ? ? ? ? ? ? connectionInformationPath);
? ? }?catch?(Exception e) {
? ? ? ? leaderElectionListener.onError(e);
? ? }
}
服務(wù)發(fā)現(xiàn)
梳理完選舉服務(wù)的源碼后,我們?cè)賮砜匆幌路?wù)發(fā)現(xiàn)的過程。我們以 TaskManager 獲取 JobManager 的 leader 為例。

當(dāng)我們往 TaskManager 添加任務(wù)時(shí),會(huì)調(diào)用 JobLeaderService 的 addJob 方法。這里會(huì)先獲取 LeaderRetrieval,然后調(diào)用 start 方法注冊(cè) LeaderRetrievalListener 監(jiān)聽,并創(chuàng)建 LeaderRetrievalDriver。在 LeaderRetrievalDriver 中主要是向 ZooKeeper 注冊(cè) connection_info 節(jié)點(diǎn)的變更。
如果發(fā)生變更,ZooKeeper 會(huì)回調(diào)?LeaderRetrievalDriver.retrieveLeaderInformationFromZooKeeper?方法。我們從 ZooKeeper 獲取到 leader 的地址和 sessionId 后,就回調(diào)?LeaderRetrievalService.notifyLeaderAddress?方法。最終調(diào)用到 JobLeaderService 的 notifyLeaderAddress 方法,這個(gè)方法中就是斷開與舊 leader 的連接,增加與新 leader 的連接。
信息保存
最后我們?cè)賮砜葱畔⒈4嫦嚓P(guān)的源碼。在 JobManager 完成一次 Checkpoint 時(shí),會(huì)執(zhí)行?CheckpointCoordinator.completePendingCheckpoint?方法,跟隨調(diào)用鏈路可以找到?ZooKeeperStateHandleStore.addAndLock?方法,這里會(huì)把狀態(tài)寫入到文件系統(tǒng)中,然后把文件路徑保存在 ZooKeeper 中。
public?RetrievableStateHandle<T>?addAndLock(String pathInZooKeeper, T state)
throws?PossibleInconsistentStateException, Exception?{
? ? checkNotNull(pathInZooKeeper,?"Path in ZooKeeper");
? ? checkNotNull(state,?"State");
final?String path = normalizePath(pathInZooKeeper);
final?Optional<Stat> maybeStat = getStat(path);
if?(maybeStat.isPresent()) {
if?(isNotMarkedForDeletion(maybeStat.get())) {
thrownew?AlreadyExistException(
? ? ? ? ? ? ? ? ? ? String.format("ZooKeeper node %s already exists.", path));
? ? ? ? }
? ? ? ? Preconditions.checkState(
? ? ? ? ? ? ? ? releaseAndTryRemove(path),
"The state is marked for deletion and, therefore, should be deletable.");
? ? }
final?RetrievableStateHandle<T> storeHandle = storage.store(state);
finalbyte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
try?{
? ? ? ? writeStoreHandleTransactionally(path, serializedStoreHandle);
return?storeHandle;
? ? }?catch?(KeeperException.NodeExistsException e) {
// Transactions are not idempotent in the curator version we're currently using, so it
// is actually possible that we've re-tried a transaction that has already succeeded.
// We've ensured that the node hasn't been present prior executing the transaction, so
// we can assume that this is a result of the retry mechanism.
return?storeHandle;
? ? }?catch?(Exception e) {
if?(indicatesPossiblyInconsistentState(e)) {
thrownew?PossibleInconsistentStateException(e);
? ? ? ? }
// In case of any other failure, discard the state and rethrow the exception.
? ? ? ? storeHandle.discardState();
throw?e;
? ? }
}
至此,基于 ZooKeeper 的 HA 邏輯我們就梳理完了。從 1.12 版本開始,F(xiàn)link 還支持了 Kubernetes 高可用,下面我們?cè)賮硪幌滤侨绾螌?shí)現(xiàn)的。
基于 Kubernetes 的 HA
選舉服務(wù)
通過前面的學(xué)習(xí),我們已經(jīng)了解到,選舉的主要邏輯是在 LeaderElectionDriver 中,因此,我們直接來看 KubernetesLeaderElectionDriver 的邏輯即可。創(chuàng)建 KubernetesLeaderElectionDriver 時(shí),創(chuàng)建并啟動(dòng)了 KubernetesLeaderElector。這個(gè)類似于 ZooKeeper 邏輯中 LeaderLatch,會(huì)跟 Kubernetes 底層的選舉邏輯交互,同時(shí)注冊(cè)監(jiān)聽。
publicKubernetesLeaderElector(
? ? ? ? NamespacedKubernetesClient kubernetesClient,
? ? ? ? KubernetesLeaderElectionConfiguration leaderConfig,
? ? ? ? LeaderCallbackHandler leaderCallbackHandler,
? ? ? ? ExecutorService executorService){
this.kubernetesClient = kubernetesClient;
this.leaderElectionConfig =
new?LeaderElectionConfigBuilder()
? ? ? ? ? ? ? ? ? ? .withName(leaderConfig.getConfigMapName())
? ? ? ? ? ? ? ? ? ? .withLeaseDuration(leaderConfig.getLeaseDuration())
? ? ? ? ? ? ? ? ? ? .withLock(
new?ConfigMapLock(
new?ObjectMetaBuilder()
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .withNamespace(kubernetesClient.getNamespace())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .withName(leaderConfig.getConfigMapName())
// Labels will be used to clean up the ha related
// ConfigMaps.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .withLabels(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? KubernetesUtils.getConfigMapLabels(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leaderConfig.getClusterId()))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .build(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leaderConfig.getLockIdentity()))
? ? ? ? ? ? ? ? ? ? .withRenewDeadline(leaderConfig.getRenewDeadline())
? ? ? ? ? ? ? ? ? ? .withRetryPeriod(leaderConfig.getRetryPeriod())
? ? ? ? ? ? ? ? ? ? .withReleaseOnCancel(true)
? ? ? ? ? ? ? ? ? ? .withLeaderCallbacks(
new?LeaderCallbacks(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leaderCallbackHandler::isLeader,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leaderCallbackHandler::notLeader,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? newLeader ->
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? LOG.info(
"New leader elected {} for {}.",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? newLeader,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? leaderConfig.getConfigMapName())))
? ? ? ? ? ? ? ? ? ? .build();
this.executorService = executorService;
? ? LOG.info(
"Create KubernetesLeaderElector on lock {}.",
? ? ? ? ? ? leaderElectionConfig.getLock().describe());
}
選舉成功后,會(huì)回調(diào)?LeaderElectionListener.onGrantLeadership?方法。后續(xù)的調(diào)用鏈路還是會(huì)調(diào)用到?KubernetesLeaderElectionDriver.publishLeaderInformation?方法。這個(gè)方法是把 leader 信息寫到 Kubernetes 的 configMap 中。
publicvoidpublishLeaderInformation(String componentId, LeaderInformation leaderInformation){
? ? Preconditions.checkState(running.get());
try?{
? ? ? ? kubeClient
? ? ? ? ? ? ? ? .checkAndUpdateConfigMap(
? ? ? ? ? ? ? ? ? ? ? ? configMapName,
? ? ? ? ? ? ? ? ? ? ? ? updateConfigMapWithLeaderInformation(componentId, leaderInformation))
? ? ? ? ? ? ? ? .get();
? ? }?catch?(InterruptedException | ExecutionException e) {
? ? ? ? leaderElectionListener.onError(e);
? ? }
? ? LOG.debug(
"Successfully wrote leader information {} for leader {} into the config map {}.",
? ? ? ? ? ? leaderInformation,
? ? ? ? ? ? componentId,
? ? ? ? ? ? configMapName);
}
服務(wù)發(fā)現(xiàn)
服務(wù)發(fā)現(xiàn)的邏輯在 KubernetesLeaderRetrievalDriver 類中,在創(chuàng)建時(shí),會(huì)將內(nèi)部類 ConfigMapCallbackHandlerImpl 注冊(cè)為監(jiān)聽回調(diào)類。
當(dāng) configMap 有新增或變更后,會(huì)回調(diào)?LeaderRetrievalService.notifyLeaderAddress?方法。
privateclassConfigMapCallbackHandlerImpl
implementsFlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>?{
@Override
publicvoidonAdded(List<KubernetesConfigMap> configMaps){
// The ConfigMap is created by KubernetesLeaderElectionDriver with
// empty data. We don't really need to process anything unless the retriever was started
// after the leader election has already succeeded.
final?KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName);
final?LeaderInformation leaderInformation = leaderInformationExtractor.apply(configMap);
if?(!leaderInformation.isEmpty()) {
? ? ? ? ? ? leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation);
? ? ? ? }
? ? }
@Override
publicvoidonModified(List<KubernetesConfigMap> configMaps){
final?KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName);
? ? ? ? leaderRetrievalEventHandler.notifyLeaderAddress(
? ? ? ? ? ? ? ? leaderInformationExtractor.apply(configMap));
? ? }
? ? ...
}
信息保存
信息保存的邏輯和 ZooKeeper 也非常類似。即先把 state 保存在文件系統(tǒng),然后把存儲(chǔ)路徑寫到 Kubernetes 寫到 configMap 中。具體可以看?KubernetesStateHandleStore.addAndLock?方法。
總結(jié)
本文我們一起梳理了 Flink 中 JobManager 的 HA 機(jī)制相關(guān)源碼。目前 Flink 支持 ZooKeeper 和 Kubernetes 兩種實(shí)現(xiàn)。在梳理過程中,我們以 JobManager 為例,其他幾個(gè)用到高可用的服務(wù)的選舉邏輯也是一樣的。
夜雨聆風(fēng)
