吉林seo刷關鍵詞排名優(yōu)化進一步優(yōu)化
文章目錄
- 整體設計
- processMail
- 1.Checkpoint Tigger
- 2.ProcessingTime Timer Trigger
- processInput
- 兼容SourceStreamTask
整體設計
Mailbox線程模型通過引入阻塞隊列配合一個Mailbox線程的方式,可以輕松修改StreamTask內(nèi)部狀態(tài)的修改。Checkpoint、ProcessingTime Timer的相關操作(Runnable任務),會以Mail的形式保存到Mailbox內(nèi)的阻塞隊列中。StreamTask在invoke階段的runMailboxLoop時期,就會輪詢Mailbox來處理隊列中保存的Mail,Mail處理完畢后才會對DataStream上的數(shù)據(jù)元素執(zhí)行處理邏輯。
MailboxProcessor的能力就是負責拉取、處理Mail,以及執(zhí)行MailboxDefaultAction(默認動作,即processInput()方法中對DataStream上的普通消息的處理邏輯,包括:處理Event、barrier、Watermark等)。
/*** 開始輪詢Mailbox內(nèi)的Mail,Checkpoint和ProcessingTime Timer的觸發(fā)事件會以Runnable的形式(作為Mail)添加到Mailbox的隊列中,等待“Mailbox線程”去處理*/
public void runMailboxLoop() throws Exception {// 獲取最新的TaskMailbox:主要用于存儲提交的Mail,并提供獲取接口。// TaskMailbox有2個隊列:// 1.queue:阻塞隊列,通過ReentrantLock控制隊列中的讀寫操作// 2.batch:非阻塞隊列,調(diào)用createBatch()方法會將queue中的Mail轉(zhuǎn)存到batch中,這樣讀操作就能通過tryTakeFromBatch()方法從batch隊列中批量獲取Mail,且只能被Mailbox線程消費final TaskMailbox localMailbox = mailbox;// 檢查當前線程是否為Mailbox線程,即StreamTask運行時所在的主線程Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");// 確認Mailbox的狀態(tài):必須為OPENassert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";// 創(chuàng)建MailboxController實例:可以控制Mailbox的循環(huán)、臨時暫停和恢復MailboxDefaultAction(默認動作)final MailboxController defaultActionContext = new MailboxController(this);/*** 核心:事件循環(huán)* processMail()方法會檢測Mailbox中是否還有Mail需要處理,新Mail會(在ReentrantLock的保護下)被添加到queue隊列并轉(zhuǎn)存到batch隊列中。* MailboxProcessor處理完batch隊列中的全部Mail后(執(zhí)行作為Mail的Runnable#run()方法),才會進入到while循環(huán)內(nèi),執(zhí)行MailboxDefaultAction的默認動作,* 即調(diào)用StreamTask#processInput()方法,對讀取到的數(shù)據(jù)(Event、Barrier、Watermark等)進行處理*/while (processMail(localMailbox)) {mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed}
}
可以看出,對Mail和MailboxDefaultAction的處理,是由唯一的Mailbox線程負責的。
processMail
在while輪詢時,首先會processMail
/*** 處理Mailbox中的Mail:Checkpoint、ProcessingTime Timer的觸發(fā)事件,會以Runnable的形式作為Mail保存在Mailbox的queue隊列中,* 并在ReentrantLock的保護下,將queue隊列中的新Mail轉(zhuǎn)移到batch隊列中。MailboxProcessor會根據(jù)queue隊列、batch隊列內(nèi)的Mail情況,* 決定處理Mail or processInput。只有當TaskMailbox內(nèi)的所有Mail全都處理完畢后,MailboxProcessor才會去processInput()*/
private boolean processMail(TaskMailbox mailbox) throws Exception {/*** 新Mail寫入queue隊列,TaskMailbox會將queue隊列中的新Mail轉(zhuǎn)移到batch隊列中,MailboxProcessor會根據(jù)queue隊列、batch隊列內(nèi)的Mail情況,* 判斷執(zhí)行Mail的run() or processInput()。只有當TaskMailbox內(nèi)的所有Mail全部處理完成后,MailboxProcessor才會去processInput()。*/if (!mailbox.createBatch()) {return true;}Optional<Mail> maybeMail;/*** 能走到這,說明queue隊列中的Mail已被全部轉(zhuǎn)移至batch隊列?,F(xiàn)在要從batch隊列中獲取到Mail并執(zhí)行(它作為Runnable的run()方法),* 直到batch隊列中的所有Mail全都處理完畢*/while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {maybeMail.get().run();}/**如果默認操作處于Unavailable狀態(tài),那就先阻塞住,直到它重新回歸available狀態(tài)*/while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {mailbox.take(MIN_PRIORITY).run();}// 返回Mailbox是否還在Loopreturn isMailboxLoopRunning();
}
很核心的一個點就是Mailbox要去createBatch,TaskMailboxImpl提供了具體的實現(xiàn)邏輯。Mailbox引入了2個隊列,新Mail被add到Mailbox內(nèi)的queue隊列中(此過程受ReentrantLock保護)。同時為了減少讀取queue隊列時的同步開銷,Mailbox還構建了一個batch隊列專門用來后續(xù)消費(避免加鎖操作)。
/*** 對Deque<Mail>隊列的讀寫,通過ReentrantLock加以保護*/
private final ReentrantLock lock = new ReentrantLock();/*** Internal queue of mails.* 使用Deque(內(nèi)部隊列)保存所有的Mail*/
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
/*** 為了減少讀取queue隊列所造成的同步開銷,TaskMailbox會創(chuàng)建一個batch隊列,queue隊列中的Mail會被轉(zhuǎn)移到batch隊列中,* 有效避免了后續(xù)消費時的加鎖操作*/
private final Deque<Mail> batch = new ArrayDeque<>();@Override
public boolean createBatch() {checkIsMailboxThread();/*** 如果queue隊列中沒有新Mail,那就要看batch隊列是否為空。* 1.如果batch也是空的(Mailbox里已經(jīng)沒有任何Mail了,需要去processInput()了),那processMail()也會return true,* MailboxProcessor就會進入到while循環(huán)內(nèi)部,執(zhí)行processInput()來處理DataStream上的數(shù)據(jù);* 2.如果batch不空,說明MailboxProcessor還需要繼續(xù)processMail(),即取出Mail執(zhí)行它(作為Runnable)的run()方法;* 由此可見,Mailbox中的batch隊列中的Mail最終一定會被Mailbox線程消耗殆盡(輪詢、處理),然后才會去processInput()*/if (!hasNewMail) { // 只要queue隊列里還有Mail,hasNewMail就為truereturn !batch.isEmpty();}/**能走到這說明queue隊列中仍有新Mail,接下來需要將它的新Mail向batch隊列轉(zhuǎn)移,該過程受ReentrantLock保護*/final ReentrantLock lock = this.lock;// 獲取鎖lock.lock();try {Mail mail;/**每次循環(huán)都將queue隊列中的First Mail,轉(zhuǎn)移到batch隊列中,直至queue隊列被消耗殆盡。此時一定return true*/while ((mail = queue.pollFirst()) != null) {batch.addLast(mail);}// 此時queue隊列內(nèi)的所有Mail都被轉(zhuǎn)移到batch隊列中了,queue中沒有新Mail了hasNewMail = false;// 此時根據(jù)batch隊列是否為空,MailboxProcessor會判斷執(zhí)行Mail的run() or processInput()return !batch.isEmpty();} finally {// 最終釋放鎖lock.unlock();}
}
如果Mailbox內(nèi)的queue隊列中仍有新Mail,那就在ReentrantLock的加持下將queue內(nèi)的Mail全都轉(zhuǎn)移到batch隊列中;如果Mailbox內(nèi)的queue隊列中沒有新Mail,那就看batch隊列的情況了。決斷權交給外層的MailboxProcessor,總的來看:
- 如果batch隊列中有Mail,MailboxProcessor會從Mailbox內(nèi)的batch隊列中逐個pollFirst,然后執(zhí)行(它作為Runnable#run()方法),直到batch隊列中的所有Mail全都被“消耗殆盡”為止
- 如果batch隊列中沒有Mail,MailboxProcessor此時就沒有Mail可處理了,那就直接processInput
1.Checkpoint Tigger
對Checkpoint的觸發(fā),是通過MailboxExecutor向Mailbox提交Mail的
/*** 觸發(fā)執(zhí)行StreamTask中的Checkpoint操作:異步的通過MailboxExecutor,將“執(zhí)行Checkpoint”的請求封裝成Mail后,* 提交到TaskMailbox中,最終由MailboxProcessor來處理*/
@Override
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,boolean advanceToEndOfEventTime) {// 通過MailboxExecutor,將“觸發(fā)執(zhí)行Checkpoint”的具體邏輯封裝成Mail,提交到Mailbox中,后期會被MailboxProcessor執(zhí)行return mailboxProcessor.getMainMailboxExecutor().submit(// 觸發(fā)Checkpoint的具體邏輯() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),"checkpoint %s with %s",checkpointMetaData,checkpointOptions);
}
triggerCheckpoint操作會被封裝成Mail,添加到Mailbox中等待被處理。
@Override
public void execute(@Nonnull final RunnableWithException command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(command, priority, actionExecutor, descriptionFormat, descriptionArgs));} catch (IllegalStateException mbex) {throw new RejectedExecutionException(mbex);}
}
當然,Checkpoint的完成操作,也是同樣的套路。
2.ProcessingTime Timer Trigger
/*** 借助Mailbox線程模型,由MailboxExecutor負責將"ProcessingTime Timer觸發(fā)的消息"封裝成Mail提交到TaskMailbox中,后續(xù)由MailboxProcessor處理*/
public ProcessingTimeService getProcessingTimeService(int operatorIndex) {Preconditions.checkState(timerService != null, "The timer service has not been initialized.");MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(operatorIndex);// 通過MailboxExecutor將Mail提交到Mailbox中等待處理return new ProcessingTimeServiceImpl(timerService, callback -> deferCallbackToMailbox(mailboxExecutor, callback));
}private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp -> {mailboxExecutor.execute(() -> invokeProcessingTimeCallback(callback, timestamp),"Timer callback for %s @ %d",callback,timestamp);};
}
processInput
StreamInputProcessor會對輸入的數(shù)據(jù)進行處理、輸出,包含:StreamTaskInput + OperatorChain + DataOutput。每次processInput都相當于是在處理一個有界流(外層MailboxProcessor在不斷地的輪詢),處理完DataStream上的StreamRecord后,會返回InputStatus的枚舉值,根據(jù)InputStatus值來決定下一步該“何去何從”。
/*** StreamTask的執(zhí)行邏輯:處理輸入的數(shù)據(jù),返回InputStatus狀態(tài),并根據(jù)InputStatus決定是否需要結(jié)束當前Task。* 該方法會通過MailboxProcessor調(diào)度、執(zhí)行(作為MailboxProcessor的默認動作),底層調(diào)用StreamInputProcessor#processInput()方法*/
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 核心:借助StreamInputProcessor完成數(shù)據(jù)的讀取,并交給算子處理,處理完畢后會返回InputStatus。* 每次觸發(fā),相當于處理一個有界流,在外層Mailbox拉取Mail才是while循環(huán)無限拉取*/InputStatus status = inputProcessor.processInput();/*** case 1:上游如果還有數(shù)據(jù) && RecordWriter是可用的,立即返回。意為:繼續(xù)處理!*/if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}/*** case 2:當狀態(tài)為END_OF_INPUT,說明本批次的有界流數(shù)據(jù)已經(jīng)處理完畢,* 通過MailboxCollector來告訴Mailbox*/if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}/*** case 3:當前有界流中沒有數(shù)據(jù),但未來可能會有。此時處理線程會被掛起:直到有新的可用數(shù)據(jù)到來 && RecordWriter可用* 此時會先臨時暫停對MailboxDefaultAction的處理,等啥時候又有新數(shù)據(jù)了,再重新恢復MailboxDefaultAction的處理。*/CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);// 通過MailboxCollector讓Mailbox線程暫時停止對MailboxDefaultAction的處理MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();// 等啥時候又有了input、output,RecordWriter也變得可用了以后,再重新繼續(xù)執(zhí)行默認操作jointFuture.thenRun(suspendedDefaultAction::resume);
}
MailboxController是MailboxDefaultAction和Mailbox之間交互的橋梁,在StreamTask處理DataStream元素的過程中,會利用MailboxController將處理狀態(tài)及時通知給Mailbox。如果這批有界流處理完畢,就會通過MailboxController通知Mailbox(本質(zhì)就是向Mailbox發(fā)送一個Mail),進行下一輪的處理。
private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs));
}
兼容SourceStreamTask
作為DataStream Source是專門用來生產(chǎn)無界流數(shù)據(jù)的,并不能穿插兼顧Mailbox內(nèi)Mail的檢測。如果僅有一個線程生產(chǎn)無界流數(shù)據(jù)的話,那將永遠無法檢測Mailbox內(nèi)的Mail。作為StreamTask的子類,SourceStreamTask會額外啟動另一個獨立的LegacySourceFunctionThread線程來執(zhí)行SourceFunction中的循環(huán)(生產(chǎn)無界流),Mailbox線程(主線程)依然負責處理Mail和默認操作。
/*** 專門為Source源生產(chǎn)數(shù)據(jù)的線程*/
private class LegacySourceFunctionThread extends Thread {private final CompletableFuture<Void> completionFuture;LegacySourceFunctionThread() {this.completionFuture = new CompletableFuture<>();}@Overridepublic void run() {try {// CheckpointLock保證線程安全headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}}public void setTaskDescription(final String taskDescription) {setName("Legacy Source Thread - " + taskDescription);}CompletableFuture<Void> getCompletionFuture() {return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;}
}
負責為Source生產(chǎn)無界流數(shù)據(jù)的LegacySourceFunctionThread線程啟動后,不管是啟動成功 or 出現(xiàn)異常,都會封裝對應的Mail并發(fā)送給Mailbox,而Mailbox線程的processMail一直在等待處理Mail。
/*** SourceStreamTask中,一個Thread負責專門生產(chǎn)無界流,另一個MailBox Thread處理Checkpoint、ProcessingTime Timer等事件Mail*/
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 通過MailboxDefaultAction.Controller告訴Mailbox:讓MailboxThread先暫停處理MailboxDefaultAction。* TaskMailbox收到該消息后,就會在processMail()中一直等待并處理Mail(在MailboxThread中會一直處理Mail)*/controller.suspendDefaultAction();/**啟動LegacySourceFunctionThread線程:專門生產(chǎn)Source無界流數(shù)據(jù)的,和MailboxThread線程一起運行*/sourceThread.setTaskDescription(getName());sourceThread.start();/**LegacySourceFunctionThread線程啟動后,會通知Mailbox,Mailbox會在processMail()中一直等待并處理mail(不會返回,即Mailbox線程會一直處理mail)*/sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {/**LegacySourceFunctionThread線程啟動過程中發(fā)生的任何異常、以及啟動成功,都會以Mail的形式發(fā)送給Mailbox*/if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));} else if (!isFinished && sourceThreadThrowable != null) {mailboxProcessor.reportThrowable(sourceThreadThrowable);} else {mailboxProcessor.allActionsCompleted();}});
}
Mailbox主線程和LegacySourceFunctionThread線程線程都在運行,通過CheckpointLock鎖來保證線程安全。