中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

020網(wǎng)站建設(shè)專業(yè)網(wǎng)站建設(shè)公司

020網(wǎng)站建設(shè),專業(yè)網(wǎng)站建設(shè)公司,義烏小商品批發(fā)市場網(wǎng)上進(jìn)貨app,注冊公司最好用老年人文章目錄背景環(huán)境工具選型實操M(fèi)M1MM2以MM2集群運(yùn)行以Standalone模式運(yùn)行驗證附錄MM2配置表其他背景 一個測試環(huán)境的kafka集群,Topic有360,Partition有2000,部署在虛擬機(jī)上,由于多方面原因,要求遷移至k8s容器內(nèi)&#x…

文章目錄

  • 背景
  • 環(huán)境
  • 工具選型
  • 實操
    • MM1
    • MM2
      • 以MM2集群運(yùn)行
      • 以Standalone模式運(yùn)行
  • 驗證
  • 附錄
    • MM2配置表
    • 其他

背景

一個測試環(huán)境的kafka集群,Topic有360+,Partition有2000+,部署在虛擬機(jī)上,由于多方面原因,要求遷移至k8s容器內(nèi)(全量遷移),正好可以拿來練一下手。本文主要記錄對MM1和MM2的實際操作過程,以及使用過程中遇到的問題及解決方案。

環(huán)境

source集群:kafka-2.6.0、2個broker、虛擬機(jī)

target集群:kafka-2.6.0、3個broker、k8s

工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)

需求:Topic名稱不能改變、數(shù)據(jù)完整

條件:target集群需要開啟自動創(chuàng)建Topic:auto.create.topics.enable=true

工具選型

本質(zhì)上MM1是Kafka的消費(fèi)者和生產(chǎn)者結(jié)合體,可以有效地將數(shù)據(jù)從源群集移動到目標(biāo)群集,但沒有提供太多其他功能。

并且在MM1多年的使用過程中發(fā)現(xiàn)了以下局限性:

  1. 靜態(tài)的黑名單和白名單
  2. Topic信息不能同步,所有Topic同步到目標(biāo)端都只有一個Partition
  3. 必須通過手動配置來解決active-active場景下的循環(huán)同步問題(MM2為解決這個問題,也做了體驗很不好的改動)
  4. rebalance導(dǎo)致的性能問題
  5. 缺乏監(jiān)控手段
  6. 無法保證Exactly Once
  7. 無法提供容災(zāi)恢復(fù)
  8. 無法同步Topic列表,只能同步有數(shù)據(jù)的Topic

MM2是基于kafka connect框架開發(fā)的。與其它的kafka connecet一樣MM2有source connector和sink connetor組成,可以支持同步以下數(shù)據(jù):

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循環(huán)同步檢測
    • 多集群自定義同步(同一個任務(wù)中,可以多集群同步:A->B、B->C、B->D)
    • 提供可監(jiān)控Metrics
    • 可通過配置保證Exactly Once

實操

秉著實操前先演練的原則,我自己搭建了一個和目標(biāo)集群相同配置的集群,用于驗證不同工具的操作結(jié)果。有足夠把握之后,再對目標(biāo)集群實際操作。

MM1

執(zhí)行 --help 查看參數(shù)選項:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit onthe entire mirror maker when a send      a failed send. (default: true)failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to useA custom rebalance listener of type      for mirror maker consumer.ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will processmessage handler of type                  every record in-between consumer andMirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom messageArguments passed to message handler      handler for mirror maker.constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of        Number of consumption streams.threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.offset commit interval in                (default: 60000)millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalanceArguments passed to custom rebalance     listener for mirror maker consumer.listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心參數(shù)就兩個:消費(fèi)者和生產(chǎn)者的配置文件:

consumer.properties:(消費(fèi)source集群)

bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";

producer.properties:(發(fā)送消息至目標(biāo)集群)

bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3

執(zhí)行腳本:

./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"

MM1比較簡單,只要兩個配置文件沒問題,sasl配置正確,基本就OK了,適合簡單的數(shù)據(jù)同步,比如指定topic進(jìn)行同步。

MM2

有四種運(yùn)行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作為專用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作為分布式Connect群集中的連接器)
  • As a standalone Connect worker.(作為獨(dú)立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在舊模式下,使用現(xiàn)有的MirrorMaker腳本。)

本文介紹第一種和第三種:作為專用的MirrorMaker群集、作為獨(dú)立的Connect工作者,第二種需要搭建connect集群,操作比較復(fù)雜。

以MM2集群運(yùn)行

這種模式是最簡單的,只需要提供一個配置文件即可,配置文件定制化程度比較高,根據(jù)業(yè)務(wù)需求配置即可

老樣子,執(zhí)行 --help 看看使用說明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.propertiesMirrorMaker 2.0 driverpositional arguments:mm2.properties         MM2 configuration file.optional arguments:-h, --help             show this help message and exit--clusters CLUSTER [CLUSTER ...]Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,參數(shù)簡單了許多,核心參數(shù)就一個配置文件。

mm2.properties:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定義集群別名
clusters = event-center, event-center-new# 設(shè)置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 設(shè)置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 開啟event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允許同步topic的正則
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2內(nèi)部同步機(jī)制使用的topic,replication數(shù)量設(shè)置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定義參數(shù)
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=60
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 遠(yuǎn)端創(chuàng)建新topic的replication數(shù)量設(shè)置
replication.factor=3

需要注意的是:replication.policy.class 默認(rèn)為:DefaultReplicationPolicy,這個策略會把同步至目標(biāo)集群的topic都加上一個源集群別名的前綴,比如源集群別名為A,topic為:bi-log,該topic同步到目標(biāo)集群后會變成:A.bi-log,為啥這么做呢,就是為了避免雙向同步的場景出現(xiàn)死循環(huán)。

官方也給出了解釋:

這是 MirrorMaker 2.0 中的默認(rèn)行為,以避免在復(fù)雜的鏡像拓?fù)渲兄貙憯?shù)據(jù)。 需要在復(fù)制流設(shè)計和主題管理方面小心自定義此項,以避免數(shù)據(jù)丟失。 可以通過對“replication.policy.class”使用自定義復(fù)制策略類來完成此操作。

針對如何自定義策略及使用方法,見我的另一篇文章:

為了保證腳本后臺運(yùn)行,寫一個腳本包裝一下:

run-mm2.sh:

#!/bin/bashexec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &

之后執(zhí)行腳本即可。

以Standalone模式運(yùn)行

這種模式會麻煩點(diǎn),需要提供一個kafka,作為worker節(jié)點(diǎn)來同步數(shù)據(jù),使用的腳本為:connect-standalone.sh

–help看看如何使用:

./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要兩個配置文件,一個是作為worker的kafka集群信息(worker.properties),另一個是同步數(shù)據(jù)的配置(connector.properties)

worker.properties:

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties:

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 這個配置會使同步之后的Topic都加上一個前綴,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=30
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=30
# 連接器消費(fèi)者預(yù)讀隊列大小
# readahead.queue.capacity=500
# 使用自定義策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

執(zhí)行:

./connect-standalone.sh worker.properties connector.properties

這種方式做一個簡單的介紹,我最后采用的是上一種方式,比較簡單直接

驗證

驗證:

  • 消息數(shù)量 OK

    使用kafka-tool工具連接上兩個集群進(jìn)行比對

  • Topic數(shù)量 OK

    • source:
    ./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt 
    
    • sink
    ./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt 
    
    • command.properties示例:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 

? 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通過kafka-acls.sh或者客戶端工具kafka-tool可以查看

附錄

MM2配置表

propertydefault valuedescription
namerequiredname of the connector, e.g. “us-west->us-east”
topicsempty stringregex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported.
topics.blacklist“..internal, ..replica, __consumer_offsets” or similartopics to exclude from replication
groupsempty stringregex of groups to replicate, e.g. “.*”
groups.blacklistempty stringgroups to exclude from replication
source.cluster.aliasrequiredname of the cluster being replicated
target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster
sync.topic.configs.enabledtruewhether or not to monitor source cluster for configuration changes
sync.topic.acls.enabledtruewhether to monitor source cluster ACLs for changes
emit.heartbeats.enabledtrueconnector should periodically emit heartbeats
emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats
emit.checkpoints.enabledtrueconnector should periodically emit consumer offset information
emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topics.enabledtrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groups.enabledtrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoints.topic.retention.ms1 dayused when creating checkpoint topics for the first time
offset.syncs.topic.retention.msmax longused when creating offset sync topic for the first time
replication.factor2used when creating remote topics

其他

參考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide

http://m.risenshineclean.com/news/39588.html

相關(guān)文章:

  • 重慶網(wǎng)站建設(shè)排名武漢seo首頁
  • 網(wǎng)站負(fù)責(zé)人辦理幕布或站點(diǎn)拍照重要新聞今天8條新聞
  • 用html制作網(wǎng)站代碼百家號關(guān)鍵詞排名優(yōu)化
  • android安裝教程seo診斷書
  • 499全包網(wǎng)站建設(shè)東莞做網(wǎng)頁建站公司
  • 企業(yè)免費(fèi)網(wǎng)站優(yōu)化方案百度瀏覽器手機(jī)版
  • 做倫理電影網(wǎng)站百度推廣關(guān)鍵詞質(zhì)量度
  • 杭州網(wǎng)站建設(shè)哪家好seo深圳培訓(xùn)班
  • 北京道路建設(shè)在什么網(wǎng)站查詢網(wǎng)站推廣的軟件
  • 機(jī)械網(wǎng)站建設(shè)哪家好怎么樣在百度上推廣自己的產(chǎn)品
  • 做網(wǎng)站怎么收集資料太原免費(fèi)網(wǎng)站建站模板
  • 網(wǎng)站正常打開速度慢semi
  • 單頁網(wǎng)站對攻擊的好處如何做好互聯(lián)網(wǎng)營銷
  • 警惕成人網(wǎng)站免費(fèi)看手機(jī)引流推廣接單
  • 做網(wǎng)站所用的技術(shù)推廣普通話的宣傳語
  • 國內(nèi)網(wǎng)站開發(fā)短視頻精準(zhǔn)獲客系統(tǒng)
  • 品牌專業(yè)建設(shè)網(wǎng)站常見的搜索引擎
  • 在哪能學(xué)到網(wǎng)站建設(shè)專業(yè)seo推廣是做什么
  • 做植物提取物好的推廣網(wǎng)站seo自動優(yōu)化軟件下載
  • 校園網(wǎng)站規(guī)劃與建設(shè)工具大全
  • 網(wǎng)站建設(shè)app開發(fā)合同范本百度普通下載
  • 新疆建設(shè)廳官方網(wǎng)站文件鏈接推廣
  • 無錫 網(wǎng)站建設(shè)黃頁88網(wǎng)官網(wǎng)
  • 同個主體新增網(wǎng)站備案外鏈吧怎么使用
  • 做網(wǎng)站看網(wǎng)頁效果手機(jī)網(wǎng)站排名優(yōu)化
  • 跨境電商千萬別做亞馬遜seo排名優(yōu)化工具推薦
  • 地產(chǎn)建站規(guī)劃可以投放廣告的網(wǎng)站
  • 網(wǎng)站建設(shè)服務(wù)價格表seo顧問公司
  • 網(wǎng)站icp備案新規(guī)推廣哪個平臺好
  • c 做網(wǎng)站教程百度seo教程視頻