互聯(lián)網(wǎng)投訴中心官網(wǎng)入口seo點擊工具
- 一、服務(wù)端接收消費者拉取數(shù)據(jù)的方法
- 二、遍歷請求中需要拉取數(shù)據(jù)的主題分區(qū)集合,分別執(zhí)行查詢數(shù)據(jù)操作,
- 1、會選擇合適的副本讀取本地日志數(shù)據(jù)(2.4版本后支持主題分區(qū)多副本下的讀寫分離)
- 三、會判斷當(dāng)前請求是主題分區(qū)Follower發(fā)送的拉取數(shù)據(jù)請求還是消費者客戶端拉取數(shù)據(jù)請求
- 1、拉取數(shù)據(jù)之前首先要得到leaderIsrUpdateLock的讀鎖
- 2、readFromLocalLog讀取本地日志數(shù)據(jù)
- 四、讀取日志數(shù)據(jù)就是讀取的segment文件(忽視零拷貝的加持)
- 1、獲取當(dāng)前本地日志的基礎(chǔ)數(shù)據(jù)(高水位線,偏移量等),
- 2、遍歷segment,直到從segment讀取到數(shù)據(jù)
- 五、創(chuàng)建文件日志流對象FileRecords
- 1、根據(jù)位點創(chuàng)建文件流FileLogInputStream
- 2、把文件流構(gòu)建成數(shù)據(jù)批量迭代器對象RecordBatchIterator
- 3、DefaultRecordBatch實現(xiàn)iterator方法,在內(nèi)存中創(chuàng)建數(shù)據(jù)
一、服務(wù)端接收消費者拉取數(shù)據(jù)的方法
kafka服務(wù)端接收生產(chǎn)者數(shù)據(jù)的API在KafkaApis.scala類中,handleFetchRequest方法
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {//省略代碼request.header.apiKey match {//消費者拉取消息請求,這個接口進行處理case ApiKeys.FETCH => handleFetchRequest(request)//省略代碼} }
def handleFetchRequest(request: RequestChannel.Request): Unit = {//從請求中獲取請求的API版本(versionId)和客戶端ID(clientId)。val versionId = request.header.apiVersionval clientId = request.header.clientId//從請求中獲取Fetch請求的數(shù)據(jù)val fetchRequest = request.body[FetchRequest]//根據(jù)請求的版本號,決定是否獲取主題名稱的映射關(guān)系(topicNames)。如果版本號大于等于13,則使用metadataCache.topicIdsToNames()獲取主題名稱映射關(guān)系,否則使用空的映射關(guān)系。val topicNames =if (fetchRequest.version() >= 13)metadataCache.topicIdsToNames()elseCollections.emptyMap[Uuid, String]()//根據(jù)主題名稱映射關(guān)系,獲取Fetch請求的數(shù)據(jù)(fetchData)和需要忽略的主題(forgottenTopics)。val fetchData = fetchRequest.fetchData(topicNames)val forgottenTopics = fetchRequest.forgottenTopics(topicNames)//創(chuàng)建一個Fetch上下文(fetchContext),用于管理Fetch請求的處理過程。該上下文包含了Fetch請求的版本號、元數(shù)據(jù)、是否來自Follower副本、Fetch數(shù)據(jù)、需要忽略的主題和主題名稱映射關(guān)系。val fetchContext = fetchManager.newContext(fetchRequest.version,fetchRequest.metadata,fetchRequest.isFromFollower,fetchData,forgottenTopics,topicNames)//初始化兩個可變數(shù)組erroneous和interesting,用于存儲處理過程中的錯誤和請求需要哪些topic的數(shù)據(jù)。val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]()val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]()//Fetch請求來自Follower副本if (fetchRequest.isFromFollower) {//則需要驗證權(quán)限。如果權(quán)限驗證通過// The follower must have ClusterAction on ClusterResource in order to fetch partition data.if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {//遍歷每個分區(qū)的數(shù)據(jù),根據(jù)不同情況將數(shù)據(jù)添加到erroneous或interesting中fetchContext.foreachPartition { (topicIdPartition, data) =>if (topicIdPartition.topic == null)erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting += topicIdPartition -> data}} else {//如果權(quán)限驗證失敗,則將所有分區(qū)的數(shù)據(jù)添加到erroneous中。fetchContext.foreachPartition { (topicIdPartition, _) =>erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)}}} else {//如果Fetch請求來自普通的Kafka消費者// Regular Kafka consumers need READ permission on each partition they are fetching.val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]fetchContext.foreachPartition { (topicIdPartition, partitionData) =>if (topicIdPartition.topic == null)erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)elsepartitionDatas += topicIdPartition -> partitionData}//需要驗證對每個分區(qū)的讀取權(quán)限,根據(jù)權(quán)限驗證結(jié)果,將數(shù)據(jù)添加到erroneous或interesting中。val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topicPartition.topic)partitionDatas.foreach { case (topicIdPartition, data) =>if (!authorizedTopics.contains(topicIdPartition.topic))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)else if (!metadataCache.contains(topicIdPartition.topicPartition))erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)elseinteresting += topicIdPartition -> data}}//省略代碼//如果需要的topic沒有校驗通過或者不存在,則直接調(diào)用processResponseCallback處理響應(yīng)if (interesting.isEmpty) {processResponseCallback(Seq.empty)} else {// for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given// no bytes were recorded in the recent quota window// trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress//如果是Follower提取數(shù)據(jù)的請求,則maxQuotaWindowBytes設(shè)置為int類型的最大,否則從記錄中得到此client以前獲取數(shù)據(jù)大小,// 再和請求中、配置文件中的fetchMaxBytes比較得到下面fetchMaxBytes和fetchMinBytes兩個值val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)Int.MaxValueelsequotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt//根據(jù)請求的類型和配額限制,獲取Fetch請求的最大字節(jié)數(shù)(fetchMaxBytes)和最小字節(jié)數(shù)(fetchMinBytes)val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {// Fetch API version 11 added preferred replica logic//提取 API 版本 11以上 添加了首選副本邏輯Optional.of(new DefaultClientMetadata(fetchRequest.rackId,clientId,request.context.clientAddress,request.context.principal,request.context.listenerName.value))} else {Optional.empty()}//創(chuàng)建一個FetchParams對象,包含了請求的各種參數(shù)val params = new FetchParams(versionId,fetchRequest.replicaId,fetchRequest.replicaEpoch,fetchRequest.maxWait,fetchMinBytes,fetchMaxBytes,FetchIsolation.of(fetchRequest),clientMetadata)// call the replica manager to fetch messages from the local replica//replicaManager.fetchMessages方法,從本地副本獲取消息,并提供回調(diào)函數(shù)processResponseCallback處理響應(yīng)replicaManager.fetchMessages(params = params,fetchInfos = interesting,quota = replicationQuota(fetchRequest),responseCallback = processResponseCallback,)}
}
replicaManager.fetchMessages
最后通過這個方法獲得日志
/*** Fetch messages from a replica, and wait until enough data can be fetched and return;* the callback function will be triggered either when timeout or required fetch info is satisfied.* Consumers may fetch from any replica, but followers can only fetch from the leader.* 從副本中獲取消息,并等待可以獲取足夠的數(shù)據(jù)并返回;* 當(dāng)滿足超時或所需的獲取信息時,將觸發(fā)回調(diào)函數(shù)。* 消費者可以從任何副本中獲取,但追隨者只能從領(lǐng)導(dǎo)者那里獲取。*/def fetchMessages(params: FetchParams,fetchInfos: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {// check if this fetch request can be satisfied right away//調(diào)用readFromLocalLog函數(shù)從本地日志中讀取消息,并將結(jié)果保存在logReadResults中。val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)var bytesReadable: Long = 0var errorReadingData = falsevar hasDivergingEpoch = falsevar hasPreferredReadReplica = falseval logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]//根據(jù)讀取結(jié)果更新一些變量,如bytesReadable(可讀取的字節(jié)數(shù))、errorReadingData(是否讀取數(shù)據(jù)時發(fā)生錯誤)、hasDivergingEpoch(是否存在不同的epoch)和hasPreferredReadReplica(是否存在首選讀取副本)。logReadResults.foreach { case (topicIdPartition, logReadResult) =>brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()if (logReadResult.error != Errors.NONE)errorReadingData = trueif (logReadResult.divergingEpoch.nonEmpty)hasDivergingEpoch = trueif (logReadResult.preferredReadReplica.nonEmpty)hasPreferredReadReplica = truebytesReadable = bytesReadable + logReadResult.info.records.sizeInByteslogReadResultMap.put(topicIdPartition, logReadResult)}// respond immediately if 1) fetch request does not want to wait 不需要等待// 2) fetch request does not require any data 不需要任何數(shù)據(jù)// 3) has enough data to respond 有足夠的數(shù)據(jù)// 4) some error happens while reading data 讀取數(shù)據(jù)時發(fā)生錯誤// 5) we found a diverging epoch 存在不同的epoch// 6) has a preferred read replica 存在首選讀取副本if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||hasDivergingEpoch || hasPreferredReadReplica) {val fetchPartitionData = logReadResults.map { case (tp, result) =>val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)tp -> result.toFetchPartitionData(isReassignmentFetch)}responseCallback(fetchPartitionData)} else {//將構(gòu)建一個延遲處理的DelayedFetch對象,并將其放入延遲處理隊列(delayedFetchPurgatory)中,以便在滿足特定條件時完成請求。// construct the fetch results from the read resultsval fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]fetchInfos.foreach { case (topicIdPartition, partitionData) =>logReadResultMap.get(topicIdPartition).foreach(logReadResult => {val logOffsetMetadata = logReadResult.info.fetchOffsetMetadatafetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))})}val delayedFetch = new DelayedFetch(params = params,fetchPartitionStatus = fetchPartitionStatus,replicaManager = this,quota = quota,responseCallback = responseCallback)// create a list of (topic, partition) pairs to use as keys for this delayed fetch operationval delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }// try to complete the request immediately, otherwise put it into the purgatory;// this is because while the delayed fetch operation is being created, new requests// may arrive and hence make this operation completable.delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)}}
通過readFromLocalLog查詢數(shù)據(jù)日志
二、遍歷請求中需要拉取數(shù)據(jù)的主題分區(qū)集合,分別執(zhí)行查詢數(shù)據(jù)操作,
/*** Read from multiple topic partitions at the given offset up to maxSize bytes* 以給定的偏移量從多個主題分區(qū)讀取最大最大大小字節(jié)*/def readFromLocalLog(params: FetchParams,readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],quota: ReplicaQuota,readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] = {val traceEnabled = isTraceEnableddef read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {//從fetchInfo中獲取一些數(shù)據(jù),包括fetchOffset(拉取偏移量)、maxBytes(拉取的最大字節(jié)數(shù))和logStartOffset(日志起始偏移量)。val offset = fetchInfo.fetchOffsetval partitionFetchSize = fetchInfo.maxBytesval followerLogStartOffset = fetchInfo.logStartOffset//計算調(diào)整后的最大字節(jié)數(shù)adjustedMaxBytes,取fetchInfo.maxBytes和limitBytes的較小值。val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)try {if (traceEnabled)trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +s"remaining response limit $limitBytes" +(if (minOneMessage) s", ignoring response/partition size limits" else ""))//獲取指定分區(qū)的Partition對象val partition = getPartitionOrException(tp.topicPartition)//獲取當(dāng)前時間戳fetchTimeMsval fetchTimeMs = time.milliseconds//檢查拉取請求或會話中的主題ID是否與日志中的主題ID一致,如果不一致則拋出InconsistentTopicIdException異常。val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)if (!hasConsistentTopicId(topicId, partition.topicId))throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")// If we are the leader, determine the preferred read-replica//根據(jù)一些條件選擇合適的副本(replica)進行后續(xù)的數(shù)據(jù)抓取(fetch)。val preferredReadReplica = params.clientMetadata.asScala.flatMap(metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))if (preferredReadReplica.isDefined) {//如果不存在,則跳過讀取操作,直接構(gòu)建一個LogReadResult對象,表示從非Leader副本獲取數(shù)據(jù)的結(jié)果。replicaSelectorOpt.foreach { selector =>debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +s"${preferredReadReplica.get} for ${params.clientMetadata}")}// If a preferred read-replica is set, skip the readval offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),divergingEpoch = None,highWatermark = offsetSnapshot.highWatermark.messageOffset,leaderLogStartOffset = offsetSnapshot.logStartOffset,leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,followerLogStartOffset = followerLogStartOffset,fetchTimeMs = -1L,lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),preferredReadReplica = preferredReadReplica,exception = None)} else {// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition//嘗試進行讀取操作。根據(jù)讀取結(jié)果構(gòu)建一個LogReadResult對象,表示從分區(qū)獲取數(shù)據(jù)的結(jié)果。val readInfo: LogReadInfo = partition.fetchRecords(fetchParams = params,fetchPartitionData = fetchInfo,fetchTimeMs = fetchTimeMs,maxBytes = adjustedMaxBytes,minOneMessage = minOneMessage,updateFetchState = !readFromPurgatory)val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {// If the partition is being throttled, simply return an empty set.new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make// progress in such cases and don't need to report a `RecordTooLargeException`new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else {readInfo.fetchedData}//返回構(gòu)建的LogReadResult對象LogReadResult(info = fetchDataInfo,divergingEpoch = readInfo.divergingEpoch.asScala,highWatermark = readInfo.highWatermark,leaderLogStartOffset = readInfo.logStartOffset,leaderLogEndOffset = readInfo.logEndOffset,followerLogStartOffset = followerLogStartOffset,fetchTimeMs = fetchTimeMs,lastStableOffset = Some(readInfo.lastStableOffset),preferredReadReplica = preferredReadReplica,exception = None)}} catch {//省略代碼}}var limitBytes = params.maxBytesval result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]var minOneMessage = !params.hardMaxBytesLimitreadPartitionInfo.foreach { case (tp, fetchInfo) =>val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)//記錄批量的大小(以字節(jié)為單位)。val recordBatchSize = readResult.info.records.sizeInBytes// Once we read from a non-empty partition, we stop ignoring request and partition level size limits//如果 recordBatchSize 大于 0,則將 minOneMessage 設(shè)置為 false,表示從非空分區(qū)讀取了消息,不再忽略請求和分區(qū)級別的大小限制。if (recordBatchSize > 0)minOneMessage = falselimitBytes = math.max(0, limitBytes - recordBatchSize)//將 (tp -> readResult) 添加到 result 中result += (tp -> readResult)}result}
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
遍歷主題分區(qū)分別執(zhí)行read內(nèi)部函數(shù)執(zhí)行查詢操作
方法內(nèi)部通過partition.fetchRecords
查詢數(shù)據(jù)
1、會選擇合適的副本讀取本地日志數(shù)據(jù)(2.4版本后支持主題分區(qū)多副本下的讀寫分離)
在上面readFromLocalLog
方法中,read
內(nèi)部方法
val preferredReadReplica = params.clientMetadata.asScala.flatMap(metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
def findPreferredReadReplica(partition: Partition,clientMetadata: ClientMetadata,replicaId: Int,fetchOffset: Long,currentTimeMs: Long): Option[Int] = {//partition.leaderIdIfLocal返回一個Option[Int]類型的值,表示分區(qū)的領(lǐng)導(dǎo)者副本的ID。// 如果本地是領(lǐng)導(dǎo)者副本,則返回該副本的ID,否則返回None。partition.leaderIdIfLocal.flatMap { leaderReplicaId =>// Don't look up preferred for follower fetches via normal replication//如果存在領(lǐng)導(dǎo)者副本ID(leaderReplicaId),則執(zhí)行flatMap中的代碼塊;否則直接返回None。if (FetchRequest.isValidBrokerId(replicaId))Noneelse {replicaSelectorOpt.flatMap { replicaSelector =>//通過metadataCache.getPartitionReplicaEndpoints方法獲取分區(qū)副本的端點信息val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,new ListenerName(clientMetadata.listenerName))//創(chuàng)建一個可變的mutable.Set[ReplicaView]類型的集合replicaInfoSet,用于存儲符合條件的副本信息。val replicaInfoSet = mutable.Set[ReplicaView]()//遍歷分區(qū)的遠程副本集合(partition.remoteReplicas),對每個副本進行以下操作://獲取副本的狀態(tài)快照(replica.stateSnapshot)。//如果副本的brokerId存在于ISR中,并且副本的日志范圍包含了指定的fetchOffset,則將副本信息添加到replicaInfoSet中。partition.remoteReplicas.foreach { replica =>val replicaState = replica.stateSnapshotif (partition.inSyncReplicaIds.contains(replica.brokerId) &&replicaState.logEndOffset >= fetchOffset &&replicaState.logStartOffset <= fetchOffset) {replicaInfoSet.add(new DefaultReplicaView(replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),replicaState.logEndOffset,currentTimeMs - replicaState.lastCaughtUpTimeMs))}}//創(chuàng)建一個DefaultReplicaView對象,表示領(lǐng)導(dǎo)者副本的信息,并將其添加到replicaInfoSet中。val leaderReplica = new DefaultReplicaView(replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),partition.localLogOrException.logEndOffset,0L)replicaInfoSet.add(leaderReplica)//創(chuàng)建一個DefaultPartitionView對象,表示分區(qū)的信息,其中包含了副本信息集合和領(lǐng)導(dǎo)者副本信息。val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)//調(diào)用replicaSelector.select方法,根據(jù)特定的策略選擇合適的副本。然后通過collect方法將選擇的副本轉(zhuǎn)換為副本的ID集合。replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {// Even though the replica selector can return the leader, we don't want to send it out with the// FetchResponse, so we exclude it here//從副本的ID集合中排除領(lǐng)導(dǎo)者副本,并返回剩余副本的ID集合。case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id}}}}}
其中 replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect
選合適副本默認(rèn)首先Leader副本,但是2.4版本后支持主題分區(qū)非Leader副本中讀取數(shù)據(jù),即Follower副本讀取數(shù)據(jù)
在代碼上:
- 通過
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
判斷設(shè)置,
在配置上:
- 在
broker
端,需要配置參數(shù)replica.selector.class
,其默認(rèn)配置為LeaderSelector
,意思是:消費者從首領(lǐng)副本獲取消息,改為RackAwareReplicaSelector
,即消費者按照指定的rack id
上的副本進行消費。還需要配置broker.rack
參數(shù),用來指定broker
在哪個機房。 - 在
consumer
端,需要配置參數(shù)client.rack
,且這個參數(shù)和broker
端的哪個broker.rack
匹配上,就會從哪個broker
上去獲取消息數(shù)據(jù)。
讀寫分離在2.4之前為什么之前不支持,后面支持了呢?
之前不支持的原因:其實對于kakfa而言,主題分區(qū)的水平擴展
完全可以解決消息的處理量,增加broker也可以降低系統(tǒng)負(fù)載,所以沒有必要費力不討好增加一個讀寫分離。
現(xiàn)在支持的原因:有一種場景不是很適合,跨機房或者說跨數(shù)據(jù)中心的場景,當(dāng)其中一個數(shù)據(jù)中心需要向另一個數(shù)據(jù)中心同步數(shù)據(jù)的時候,如果只能從首領(lǐng)副本進行數(shù)據(jù)讀取的話,需要跨機房來完成,而這些流量帶寬又比較昂貴,而利用本地跟隨者副本進行消息讀取就成了比較明智的選擇。
所以kafka推出這一個功能,目的并不是降低broker的系統(tǒng)負(fù)載,分?jǐn)傁⑻幚砹?#xff0c;而是為了節(jié)約流量資源。
三、會判斷當(dāng)前請求是主題分區(qū)Follower發(fā)送的拉取數(shù)據(jù)請求還是消費者客戶端拉取數(shù)據(jù)請求
關(guān)于Follower發(fā)請求可以看一下kafka 3.5 主題分區(qū)的Follower創(chuàng)建Fetcher線程從Leader拉取數(shù)據(jù)源碼
def fetchRecords(fetchParams: FetchParams,fetchPartitionData: FetchRequest.PartitionData,fetchTimeMs: Long,maxBytes: Int,minOneMessage: Boolean,updateFetchState: Boolean): LogReadInfo = {def readFromLocalLog(log: UnifiedLog): LogReadInfo = {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}//判斷獲取數(shù)據(jù)的請求是否來自Followerif (fetchParams.isFromFollower) {// Check that the request is from a valid replica before doing the readval (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {val localLog = localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)val replica = followerReplicaOrThrow(fetchParams.replicaId,fetchPartitionData)val logReadInfo = readFromLocalLog(localLog)(replica, logReadInfo)}if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {updateFollowerFetchState(replica,followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,followerStartOffset = fetchPartitionData.logStartOffset,followerFetchTimeMs = fetchTimeMs,leaderEndOffset = logReadInfo.logEndOffset,fetchParams.replicaEpoch)}logReadInfo} else {//來自消費者客戶端請求inReadLock(`leaderIsrUpdateLock`) {val localLog = localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)readFromLocalLog(localLog)}}}
1、拉取數(shù)據(jù)之前首先要得到leaderIsrUpdateLock的讀鎖
上面的方法邏輯中
//Follower的請求val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock)
//來自消費者客戶端請求inReadLock(`leaderIsrUpdateLock`)
2、readFromLocalLog讀取本地日志數(shù)據(jù)
def readFromLocalLog(log: UnifiedLog): LogReadInfo = {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}
四、讀取日志數(shù)據(jù)就是讀取的segment文件(忽視零拷貝的加持)
1、獲取當(dāng)前本地日志的基礎(chǔ)數(shù)據(jù)(高水位線,偏移量等),
private def readRecords(localLog: UnifiedLog,lastFetchedEpoch: Optional[Integer],fetchOffset: Long,currentLeaderEpoch: Optional[Integer],maxBytes: Int,fetchIsolation: FetchIsolation,minOneMessage: Boolean): LogReadInfo = {//localLog的高水位標(biāo)記(initialHighWatermark)、、。val initialHighWatermark = localLog.highWatermark//日志起始偏移(initialLogStartOffset)val initialLogStartOffset = localLog.logStartOffset//日志結(jié)束偏移(initialLogEndOffset)val initialLogEndOffset = localLog.logEndOffset//和最后一個穩(wěn)定偏移(initialLastStableOffset)val initialLastStableOffset = localLog.lastStableOffset//省略代碼//代碼調(diào)用localLog的read方法,讀取指定偏移量處的數(shù)據(jù)val fetchedData = localLog.read(fetchOffset,maxBytes,fetchIsolation,minOneMessage)//返回一個包含讀取數(shù)據(jù)的LogReadInfo對象。new LogReadInfo(fetchedData,Optional.empty(),initialHighWatermark,initialLogStartOffset,initialLogEndOffset,initialLastStableOffset)}
def read(startOffset: Long,maxLength: Int,isolation: FetchIsolation,minOneMessage: Boolean): FetchDataInfo = {checkLogStartOffset(startOffset)val maxOffsetMetadata = isolation match {case FetchIsolation.LOG_END => localLog.logEndOffsetMetadatacase FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadatacase FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata}localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)}
2、遍歷segment,直到從segment讀取到數(shù)據(jù)
/*** @param startOffset 起始偏移量(startOffset)* @param maxLength 最大長度(maxLength)* @param minOneMessage 是否至少讀取一個消息(minOneMessage)* @param maxOffsetMetadata 最大偏移元數(shù)據(jù)(maxOffsetMetadata)* @param includeAbortedTxns 是否包含已中止的事務(wù)(includeAbortedTxns)* @throws* @return 返回一個FetchDataInfo對象*/def read(startOffset: Long,maxLength: Int,minOneMessage: Boolean,maxOffsetMetadata: LogOffsetMetadata,includeAbortedTxns: Boolean): FetchDataInfo = {maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +s"total length ${segments.sizeInBytes} bytes")//獲取下一個偏移元數(shù)據(jù)(endOffsetMetadata)和對應(yīng)的偏移量(endOffset)val endOffsetMetadata = nextOffsetMetadataval endOffset = endOffsetMetadata.messageOffset//獲得segment的集合,比如會獲得某個位點后所有的segment的列表,有序var segmentOpt = segments.floorSegment(startOffset)// return error on attempt to read beyond the log end offset//如果起始偏移量大于結(jié)束偏移量或者找不到日志段,則拋出OffsetOutOfRangeException異常。if (startOffset > endOffset || segmentOpt.isEmpty)throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +s"but we only have log segments upto $endOffset.")//如果起始偏移量等于最大偏移量元數(shù)據(jù)的偏移量,函數(shù)返回一個空的FetchDataInfo對象if (startOffset == maxOffsetMetadata.messageOffset)emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)else if (startOffset > maxOffsetMetadata.messageOffset)//如果起始偏移量大于最大偏移量元數(shù)據(jù)的偏移量,函數(shù)返回一個空的FetchDataInfo對象,并將起始偏移量轉(zhuǎn)換為偏移元數(shù)據(jù)emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), includeAbortedTxns)else {//函數(shù)在小于目標(biāo)偏移量的基本偏移量的日志段上進行讀取var fetchDataInfo: FetchDataInfo = null//首先fetchDataInfo不為null,和大于start位點的segment要存在while (fetchDataInfo == null && segmentOpt.isDefined) {val segment = segmentOpt.getval baseOffset = segment.baseOffsetval maxPosition =// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.//如果它在此段上,請使用最大偏移位置;否則,段大小是限制。if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegmentelse segment.sizefetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)if (fetchDataInfo != null) {//則根據(jù)條件判斷,如果includeAbortedTxns為真,則調(diào)用addAbortedTransactions方法添加中斷的事務(wù)到fetchDataInfo中。if (includeAbortedTxns)fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)}//如果fetchDataInfo為null,則將segmentOpt設(shè)置為segments中大于baseOffset的下一個段。else segmentOpt = segments.higherSegment(baseOffset)}//成功讀取到消息,函數(shù)返回FetchDataInfo對象if (fetchDataInfo != null) fetchDataInfoelse {//如果已經(jīng)超過了最后一個日志段的末尾且沒有讀取到任何數(shù)據(jù),則返回一個空的FetchDataInfo對象,其中包含下一個偏移元數(shù)據(jù)和空的內(nèi)存記錄(MemoryRecords.EMPTY)new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)}}}}
首先獲得segment列表var segmentOpt = segments.floorSegment(startOffset)
,
通過 fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
從segment獲取數(shù)據(jù)
五、創(chuàng)建文件日志流對象FileRecords
def read(startOffset: Long,maxSize: Int,maxPosition: Long = size,minOneMessage: Boolean = false): FetchDataInfo = {if (maxSize < 0)throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")val startOffsetAndSize = translateOffset(startOffset)// if the start position is already off the end of the log, return null//則表示起始位置已經(jīng)超出了日志的末尾,則返回 nullif (startOffsetAndSize == null)return null//起始偏移量、基準(zhǔn)偏移量和起始位置創(chuàng)建一個LogOffsetMetadata對象val startPosition = startOffsetAndSize.positionval offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)val adjustedMaxSize =if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)else maxSize// return a log segment but with zero size in the case belowif (adjustedMaxSize == 0)return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)// calculate the length of the message set to read based on whether or not they gave us a maxOffset//根據(jù)給定的maxOffset計算要讀取的消息集的長度,將其限制為maxPosition和起始位置之間的較小值,并將結(jié)果賦給fetchSize變量。val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)//創(chuàng)建一個FetchDataInfo對象,其中包含偏移量元數(shù)據(jù)、從起始位置開始的指定大小的日志切片(log slice)以及其他相關(guān)信息//其中l(wèi)og.slice(startPosition, fetchSize)是日志數(shù)據(jù)new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),adjustedMaxSize < startOffsetAndSize.size, Optional.empty())}
log.slice
獲取文件數(shù)據(jù)
public FileRecords slice(int position, int size) throws IOException {int availableBytes = availableBytes(position, size);int startPosition = this.start + position;return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);}
這里生成一個新的文件數(shù)據(jù)對象,下面就是FileRecords
的構(gòu)造方法
FileRecords(File file,FileChannel channel,int start,int end,boolean isSlice) throws IOException {this.file = file;this.channel = channel;this.start = start;this.end = end;this.isSlice = isSlice;this.size = new AtomicInteger();//表示這只是一個切片視圖,不需要檢查文件大小,直接將size設(shè)置為end - start。if (isSlice) {// don't check the file size if this is just a slice viewsize.set(end - start);} else {//如果isSlice為false,表示這不是一個切片,需要檢查文件的大小。如果文件大小超過了Integer.MAX_VALUE,將拋出KafkaException異常。if (channel.size() > Integer.MAX_VALUE)throw new KafkaException("The size of segment " + file + " (" + channel.size() +") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);//否則,將文件大小和end之間的較小值設(shè)置為limit,并將size設(shè)置為limit - start。然后,將文件通道的位置設(shè)置為limit,即文件末尾的位置。int limit = Math.min((int) channel.size(), end);size.set(limit - start);// if this is not a slice, update the file pointer to the end of the file// set the file position to the last byte in the filechannel.position(limit);}batches = batchesFrom(start);}
1、根據(jù)位點創(chuàng)建文件流FileLogInputStream
/*** Get an iterator over the record batches in the file, starting at a specific position. This is similar to* {@link #batches()} except that callers specify a particular position to start reading the batches from. This* method must be used with caution: the start position passed in must be a known start of a batch.* @param start The position to start record iteration from; must be a known position for start of a batch* @return An iterator over batches starting from {@code start}*///它的作用是從FileRecords直接返回一個batch的iterator
public Iterable<FileChannelRecordBatch> batchesFrom(final int start) {return () -> batchIterator(start);}private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) {final int end;if (isSlice)end = this.end;elseend = this.sizeInBytes();//創(chuàng)建一個FileLogInputStream對象inputStream,并傳入this、start和end作為參數(shù)。FileLogInputStream inputStream = new FileLogInputStream(this, start, end);//創(chuàng)建一個RecordBatchIterator對象,并將inputStream作為參數(shù)傳入。//將創(chuàng)建的RecordBatchIterator對象作為返回值返回。return new RecordBatchIterator<>(inputStream);}
}
FileLogInputStream
類實現(xiàn)了nextBatch()
接口,這個接口是從基礎(chǔ)輸入流中獲取下一個記錄批次。
public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {/*** Create a new log input stream over the FileChannel* @param records Underlying FileRecords instance* @param start Position in the file channel to start from* @param end Position in the file channel not to read past*/FileLogInputStream(FileRecords records,int start,int end) {this.fileRecords = records;this.position = start;this.end = end;}@Overridepublic FileChannelRecordBatch nextBatch() throws IOException {//首先獲取文件的通道(channel)FileChannel channel = fileRecords.channel();//檢查是否達到了文件末尾或者下一個記錄批次的起始位置。如果達到了文件末尾,則返回空(null)。if (position >= end - HEADER_SIZE_UP_TO_MAGIC)return null;//讀取文件通道中的記錄頭部數(shù)據(jù),并將其存儲在一個緩沖區(qū)(logHeaderBuffer)logHeaderBuffer.rewind();Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header");//記錄頭部數(shù)據(jù)中解析出偏移量(offset)和記錄大小(size)logHeaderBuffer.rewind();long offset = logHeaderBuffer.getLong(OFFSET_OFFSET);int size = logHeaderBuffer.getInt(SIZE_OFFSET);// V0 has the smallest overhead, stricter checking is done laterif (size < LegacyRecord.RECORD_OVERHEAD_V0)throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +"overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));//檢查是否已經(jīng)超過了文件末尾減去記錄開銷和記錄大小的位置。如果超過了,則返回空(null)if (position > end - LOG_OVERHEAD - size)return null;//代碼會根據(jù)記錄頭部的(magic)byte magic = logHeaderBuffer.get(MAGIC_OFFSET);//創(chuàng)建一個記錄批次對象(batch)final FileChannelRecordBatch batch;if (magic < RecordBatch.MAGIC_V個LUE_V2)//則創(chuàng)建一個舊版本的記錄批次對象batch = new LegacyFileChannelRecordBatch(offset, magic, fileRecords, position, size);else//否則創(chuàng)建一個默認(rèn)版本的記錄批次對象batch = new DefaultFileChannelRecordBatch(offset, magic, fileRecords, position, size);//代碼會更新當(dāng)前位置(position),以便下次讀取下一個記錄批次。position += batch.sizeInBytes();return batch;}
}
2、把文件流構(gòu)建成數(shù)據(jù)批量迭代器對象RecordBatchIterator
上文中的batchIterator
方法會把文件流構(gòu)造RecordBatchIterator
對象
class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {private final LogInputStream<T> logInputStream;RecordBatchIterator(LogInputStream<T> logInputStream) {this.logInputStream = logInputStream;}@Overrideprotected T makeNext() {try {T batch = logInputStream.nextBatch();if (batch == null)return allDone();return batch;} catch (EOFException e) {throw new CorruptRecordException("Unexpected EOF while attempting to read the next batch", e);} catch (IOException e) {throw new KafkaException(e);}}
}
AbstractIterator
抽象類
public abstract class AbstractIterator<T> implements Iterator<T> {private enum State {READY, NOT_READY, DONE, FAILED}private State state = State.NOT_READY;private T next;@Overridepublic boolean hasNext() {switch (state) {case FAILED:throw new IllegalStateException("Iterator is in failed state");case DONE:return false;case READY:return true;default:return maybeComputeNext();}}@Overridepublic T next() {if (!hasNext())throw new NoSuchElementException();state = State.NOT_READY;if (next == null)throw new IllegalStateException("Expected item but none found.");return next;}@Overridepublic void remove() {throw new UnsupportedOperationException("Removal not supported");}public T peek() {if (!hasNext())throw new NoSuchElementException();return next;}protected T allDone() {state = State.DONE;return null;}protected abstract T makeNext();private Boolean maybeComputeNext() {state = State.FAILED;next = makeNext();if (state == State.DONE) {return false;} else {state = State.READY;return true;}}}
調(diào)用RecordBatchIterator
類的makeNext
()方法,之后調(diào)用第五章節(jié)的FileLogInputStream
中的nextBatch()
DefaultFileChannelRecordBatch
這個是默認(rèn)的
static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {DefaultFileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {super(offset, magic, fileRecords, position, batchSize);}@Overrideprotected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {return new DefaultRecordBatch(buffer);}@Overridepublic long baseOffset() {return offset;}//省略代碼}
3、DefaultRecordBatch實現(xiàn)iterator方法,在內(nèi)存中創(chuàng)建數(shù)據(jù)
之后看一下哪里調(diào)用的
DefaultFileChannelRecordBatch
中的toMemoryRecordBatch
方法
DefaultRecordBatch
,再通過這個batch
的iterator
方法獲取到Iterator<Record>
的
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {@Override public Iterator<Record> iterator() {if (count() == 0)return Collections.emptyIterator();if (!isCompressed())return uncompressedIterator();// for a normal iterator, we cannot ensure that the underlying compression stream is closed,// so we decompress the full record set here. Use cases which call for a lower memory footprint// can use `streamingIterator` at the cost of additional complexitytry (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {List<Record> records = new ArrayList<>(count());while (iterator.hasNext())records.add(iterator.next());return records.iterator();}}
}
DefaultFileChannelRecordBatch
是FileChannelRecordBatch
的一個子類。FileChannelRecordBatch
表示日志是通過FileChannel
的形式來保存的。在遍歷日志的時候不需要將日志全部讀到內(nèi)存中,而是在需要的時候再讀取。我們直接看最重要的iterator方法
public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {protected final long offset;protected final byte magic;protected final FileRecords fileRecords;protected final int position;protected final int batchSize;private RecordBatch fullBatch;private RecordBatch batchHeader;FileChannelRecordBatch(long offset,byte magic,FileRecords fileRecords,int position,int batchSize) {this.offset = offset;this.magic = magic;this.fileRecords = fileRecords;this.position = position;this.batchSize = batchSize;}//省略代碼@Overridepublic Iterator<Record> iterator() {return loadFullBatch().iterator();}//省略代碼}
protected RecordBatch loadFullBatch() {if (fullBatch == null) {batchHeader = null;fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch");}return fullBatch;}
最后會調(diào)用DefaultFileChannelRecordBatch
類型的toMemoryRecordBatch
方法在內(nèi)存中生成批量數(shù)據(jù)
private RecordBatch loadBatchWithSize(int size, String description) {FileChannel channel = fileRecords.channel();try {ByteBuffer buffer = ByteBuffer.allocate(size);Utils.readFullyOrFail(channel, buffer, position, description);buffer.rewind();//在內(nèi)存中生成數(shù)據(jù)return toMemoryRecordBatch(buffer);} catch (IOException e) {throw new KafkaException("Failed to load record batch at position " + position + " from " + fileRecords, e);}}