中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

做網(wǎng)站是怎樣賺錢的網(wǎng)站快速排名互點(diǎn)軟件

做網(wǎng)站是怎樣賺錢的,網(wǎng)站快速排名互點(diǎn)軟件,網(wǎng)站建設(shè)與維護(hù)中職,哪個(gè)公司制作企業(yè)網(wǎng)站目錄 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 自定義序列化器 4、分區(qū)器 默認(rèn)分區(qū)規(guī)則 自定義分區(qū)器 5、生產(chǎn)者攔截器 作用 自定義攔截器 6、生產(chǎn)者原理解析 1、消息生產(chǎn)流程 2、生產(chǎn)者常見參數(shù)配置 3、序列化器 基本概念 在Kafka中保存的數(shù)…

目錄

1、消息生產(chǎn)流程

2、生產(chǎn)者常見參數(shù)配置

3、序列化器

基本概念

自定義序列化器

4、分區(qū)器

默認(rèn)分區(qū)規(guī)則

自定義分區(qū)器

5、生產(chǎn)者攔截器

作用

自定義攔截器

6、生產(chǎn)者原理解析


1、消息生產(chǎn)流程

2、生產(chǎn)者常見參數(shù)配置

3、序列化器

基本概念

  • 在Kafka中保存的數(shù)據(jù)都是字節(jié)數(shù)組。
  • 消息發(fā)送前,需要將消息序列化為字節(jié)數(shù)組進(jìn)行發(fā)送。
  • 生產(chǎn)者通過key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定義序列化器。
  • Kafka已實(shí)現(xiàn)的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定義序列化器

實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer<T>接口,并實(shí)現(xiàn)其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果數(shù)據(jù)是null,則返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一個(gè)4字節(jié)存儲(chǔ)userId的值// 第二個(gè)4字節(jié)存儲(chǔ)username字節(jié)數(shù)組的長度int值// 第三個(gè)length長度,存儲(chǔ)username序列化之后的字節(jié)數(shù)組ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化數(shù)據(jù)異常");}}@Overridepublic void close() {// do nothing}
}

4、分區(qū)器

默認(rèn)分區(qū)規(guī)則

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分區(qū)號(hào),則使?record提供的分區(qū)號(hào)
  2. 如果record沒有提供分區(qū)號(hào),則使?key的序列化后的值的hash值對(duì)分區(qū)數(shù)量取模
  3. 如果record沒有提供分區(qū)號(hào),也沒有提供key,則使?輪詢的?式分配分區(qū)號(hào)。

自定義分區(qū)器

實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并實(shí)現(xiàn)其中的partition方法。

在生產(chǎn)者參數(shù)中通過配置partitioner.class指定自定義分區(qū)器。

/*** 自定義分區(qū)器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此處可以計(jì)算分區(qū)的數(shù)字。// 我們直接指定為2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生產(chǎn)者攔截器

作用

????????在發(fā)送消息前,或者在執(zhí)行回調(diào)邏輯前,對(duì)消息做一些定制化的處理,比如修改消息,打印消息日志等。此外,Producer允許設(shè)置多個(gè)攔截器從而形成一條攔截器鏈,Producer將按照指定順序調(diào)用它們。

自定義攔截器

????????自定義攔截器實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,并實(shí)現(xiàn)其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 確保在消息被序列化前調(diào)?該?法。?戶可以在該?法中對(duì)消息做任何操作,但最好保證不要修 改消息所屬的topic和分區(qū),否則會(huì)影響?標(biāo)分區(qū)的計(jì)算。
  • onAcknowledgement(RecordMetadata, Exception):該?法會(huì)在消息被應(yīng)答之前或消息發(fā)送失敗時(shí)調(diào)?, 并且通常都是在Producer回調(diào)邏輯觸發(fā)之前。
  • close:關(guān)閉Interceptor,主要?于執(zhí)??些資源清理?作。

? ? ? ? 在生產(chǎn)者參數(shù)中通過配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定義攔截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("攔截器---go");// 此處根據(jù)業(yè)務(wù)需要對(duì)相關(guān)的數(shù)據(jù)作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息頭headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("攔截器---back");if (exception != null) {// 如果發(fā)生異常,記錄在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生產(chǎn)者原理解析

以上內(nèi)容為個(gè)人學(xué)習(xí)理解,如有問題,歡迎在評(píng)論區(qū)指出。

部分內(nèi)容截取自網(wǎng)絡(luò),如有侵權(quán),聯(lián)系作者刪除。

http://m.risenshineclean.com/news/45566.html

相關(guān)文章:

  • 一般做網(wǎng)站圖是多大的像素廣點(diǎn)通廣告投放平臺(tái)
  • 站免費(fèi)下載安裝seo優(yōu)化策略
  • 做鏡像網(wǎng)站搜索引擎優(yōu)化百度百科
  • 營銷型企業(yè)網(wǎng)站建設(shè)策劃關(guān)鍵詞排名怎么快速上去
  • 怎么建設(shè)阿里巴巴國際網(wǎng)站首頁企業(yè)培訓(xùn)機(jī)構(gòu)排名
  • 域名注冊(cè)和網(wǎng)站建設(shè)深圳關(guān)鍵詞優(yōu)化報(bào)價(jià)
  • 做網(wǎng)站和淘寶美工 最低電腦百度手機(jī)點(diǎn)擊排名工具
  • sql數(shù)據(jù)庫環(huán)境網(wǎng)站搭建教程百度總部公司地址在哪里
  • 怎么制作軟件app教程網(wǎng)絡(luò)優(yōu)化工程師需要學(xué)什么
  • 利用ionic做的網(wǎng)站最新網(wǎng)絡(luò)營銷方式
  • 做視頻網(wǎng)站seo查詢 工具
  • 廣州網(wǎng)站建設(shè)首選快優(yōu)淘寶權(quán)重查詢
  • 免費(fèi)正版高清圖片素材庫蕭山市seo關(guān)鍵詞排名
  • 美容行業(yè)培訓(xùn)網(wǎng)站建設(shè)seo搜索優(yōu)化是什么呢
  • 自動(dòng)生成海報(bào)的網(wǎng)站星鏈友店
  • 如果做淘寶網(wǎng)站百度指數(shù)官網(wǎng)入口登錄
  • 網(wǎng)絡(luò)網(wǎng)站建設(shè)電話哪些網(wǎng)站可以免費(fèi)發(fā)廣告
  • 室內(nèi)裝修公司需要資質(zhì)嗎優(yōu)化網(wǎng)站怎么真實(shí)點(diǎn)擊
  • 網(wǎng)站欄目結(jié)構(gòu)設(shè)計(jì)seo技術(shù)優(yōu)化技巧
  • 如何在微信公眾號(hào)里建設(shè)微網(wǎng)站制作app平臺(tái)需要多少錢
  • 中國十大軟件上市公司排名seo從零開始到精通200講解
  • 做英語趣味教具的網(wǎng)站時(shí)事新聞最新2022
  • 網(wǎng)站開發(fā)運(yùn)營優(yōu)化百度漲
  • 做一個(gè)電子商務(wù)網(wǎng)站麗水網(wǎng)站seo
  • 工商企業(yè)查詢網(wǎng)楓樹seo網(wǎng)
  • 丹徒網(wǎng)站建設(shè)要多少錢百度seo刷排名網(wǎng)址
  • 做一個(gè)簡單的公司網(wǎng)站要多少錢虎撲體育網(wǎng)體育
  • 煙臺(tái)網(wǎng)站的優(yōu)化seo網(wǎng)絡(luò)運(yùn)營
  • 網(wǎng)站建設(shè)延期合同書百度競(jìng)價(jià)是什么工作
  • 縉云做網(wǎng)站每天新聞早知道