優(yōu)秀網(wǎng)站案例泰安百度公司代理商
Kafka 服務(wù)器(Broker) 的配置
server.properties
# broker.id: 每個(gè) Kafka Broker 的唯一標(biāo)識(shí)符。broker.id 必須在整個(gè) Kafka 集群中唯一。
broker.id=0# 配置 Kafka Broker 監(jiān)聽客戶端請(qǐng)求的地址和端口。這個(gè)配置決定了 Kafka 服務(wù)將接受來(lái)自生產(chǎn)者、消費(fèi)者以及其他客戶端的連接。
listeners=PLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存儲(chǔ)目錄
log.dir=/usr/local/data/kafka‐logs# Kafka 連接到 Zookeeper 的地址
zookeeper.connect=192.168.65.60:2181
每個(gè) Kafka 集群中的節(jié)點(diǎn)(Broker)都需要有一個(gè) server.properties 配置文件,并且每個(gè)節(jié)點(diǎn)的配置可以有所不同。
生產(chǎn)者
生產(chǎn)者配置
Properties props = new Properties();// Kafka服務(wù)器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發(fā)送的key和value從字符串序列化為字節(jié)數(shù)組
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 發(fā)出消息持久化機(jī)制參數(shù)* acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù),就可以繼續(xù)發(fā)送下一條消息。性能最高,但是最容易丟消息* acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是不需要等待所有follower是否成功寫入,就可以繼續(xù)發(fā)送下一條消息* 如果follower沒有成功備份數(shù)據(jù),而此時(shí)leader又掛掉,則消息會(huì)丟失* acks=‐1或all: 需要等待 min.insync.replicas(默認(rèn)為1,推薦配置大于等于2) 這個(gè)參數(shù)配置的副本個(gè)數(shù)都成功寫入日志。這是最強(qiáng)的數(shù)據(jù)保證。一般金融級(jí)別才會(huì)使用這種配置*/
props.put(ProducerConfig.ACKS_CONFIG, "1");// 2. 重試相關(guān)
//2.1 發(fā)送失敗重試次數(shù),重試能保證消息發(fā)送的可靠性,但是也可能造成消息重復(fù)發(fā)送,需要接收者做好消息接收的冪等性處理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重試間隔設(shè)置,默認(rèn)重試間隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地緩沖區(qū)和延遲發(fā)送相關(guān)* 在設(shè)置本地緩沖區(qū)/延遲發(fā)送后,消息會(huì)先發(fā)送到本地緩沖區(qū),當(dāng)達(dá)到批量發(fā)送消息的大 * 小時(shí),本地線程會(huì)從緩沖區(qū)取數(shù)據(jù)(一個(gè)batch),批量發(fā)送到broker。同時(shí),需要設(shè)置 * batch最大的延遲發(fā)送時(shí)間,如果一條消息在本地緩沖區(qū)中等待的時(shí)間達(dá)到設(shè)置的時(shí)間后 * batch沒滿,那么也必須把消息發(fā)送出去* /// 3.1 設(shè)置本地緩沖區(qū)大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 設(shè)置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延遲發(fā)送時(shí)間* 默認(rèn)值是0:意思就是消息必須立即被發(fā)送,但這樣會(huì)影響性能* 一般設(shè)置10毫秒左右,就是說(shuō)這個(gè)消息發(fā)送完后會(huì)進(jìn)入本地的一個(gè)batch,如果10毫秒內(nèi),這個(gè)batch滿了16kb就會(huì)隨batch一起被發(fā)送出去* 如果10毫秒內(nèi),batch沒滿,那么也必須把消息發(fā)送出去,不能讓消息的發(fā)送延遲時(shí)間太長(zhǎng)* * 消息 -> 本地緩沖區(qū)(32M)-> batch(16k)-> 發(fā)送(10ms batch不滿也發(fā)送)*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
生產(chǎn)者發(fā)送消息
// 創(chuàng)建 Kafka 生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 發(fā)送消息
String topic = "test-topic"; // 主題名稱
String key = "order1"; // 消息的 key
String value = "Order details: 123"; // 消息的內(nèi)容// 創(chuàng)建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 發(fā)送消息,這里的lambda函數(shù)就是onCompletion()方法producer.send(record, (metadata, exception) -> {if (exception != null) {System.out.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully to topic " + metadata.topic() +" partition " + metadata.partition() + " with offset " + metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 關(guān)閉生產(chǎn)者producer.close();
}
// 指定發(fā)送分區(qū)
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);// 也可以指定發(fā)送分區(qū)
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);// 等待消息發(fā)送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();// 異步回調(diào)方式發(fā)送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 處理異常}
});
// 關(guān)閉
producer.close();
此外,為了保證生產(chǎn)者的消息發(fā)送成功,可以通過(guò)添加回調(diào)函數(shù)的方式,在send成功后打印日志。
詳細(xì)內(nèi)容參考:Kafka如何保證消息不丟失
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產(chǎn)者發(fā)送消失敗,原因:{}", ex.getMessage()));
消費(fèi)者
消費(fèi)者配置
Properties properties = new Properties();// Kafka服務(wù)器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發(fā)送的key和value從字符串序列化為字節(jié)數(shù)組
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消費(fèi)分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自動(dòng)提交offset,默認(rèn)就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自動(dòng)提交offset的間隔時(shí)間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/* * 當(dāng)消費(fèi)主題的是一個(gè)新的消費(fèi)組,或者指定offset的消費(fèi)方式,offset不存在,那么應(yīng)該如何消費(fèi)* latest(默認(rèn)) :只消費(fèi)自己?jiǎn)?dòng)之后發(fā)送到主題的消息* earliest:第一次從頭開始消費(fèi),以后按照消費(fèi)offset記錄繼續(xù)消費(fèi),這個(gè)需要區(qū)別于 consumer.seekToBeginning(每次都從頭開始消費(fèi))*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// consumer給broker發(fā)送心跳的間隔時(shí)間,broker接收到心跳如果此時(shí)有rebalance發(fā)生會(huì)通過(guò)心跳響應(yīng)將rebalance方案下發(fā)給consumer,這個(gè)時(shí)間可以稍微短一點(diǎn)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服務(wù)端broker多久感知不到一個(gè)consumer心跳就認(rèn)為他故障了,會(huì)將其踢出消費(fèi)組,對(duì)應(yīng)的Partition也會(huì)被重新分配給其他consumer,默認(rèn)是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的條數(shù),如果消費(fèi)者處理速度很快,可以設(shè)置大點(diǎn),如果處理速度一般,可以設(shè)置小點(diǎn)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果兩次poll操作間隔超過(guò)了這個(gè)時(shí)間,broker就會(huì)認(rèn)為這個(gè)consumer處理能力太弱,會(huì)將其踢出消費(fèi)組,將分區(qū)分配給別的consumer消費(fèi)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
消費(fèi)者消費(fèi)消息
// 創(chuàng)建 Kafka 消費(fèi)者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 訂閱主題
consumer.subscribe(Collections.singletonList("test-topic"));// 消費(fèi)指定分區(qū),這段代碼指定了消費(fèi)者從TOPIC_NAME的第一個(gè)分區(qū)(分區(qū)0)開始消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消費(fèi)(從頭消費(fèi) - seekToBeginning)* seekToBeginning()方法使消費(fèi)者回溯到該分區(qū)的最初位置,意味著從頭開始消費(fèi)該分 區(qū)的所有消息。* 這對(duì)于重新消費(fèi)主題中的消息或重新同步時(shí)非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消費(fèi),即消費(fèi)者將跳過(guò)之前的消息,從該offset開始消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 從指定時(shí)間點(diǎn)開始消費(fèi) - 1小時(shí)前
* partitionsFor()方法獲取指定主題(TOPIC_NAME)的所有分區(qū)信息。
* fetchDataTime 是一個(gè)時(shí)間戳,表示1小時(shí)前的時(shí)間,new Date().getTime() - 1000 * 60 * 60 用來(lái)計(jì)算這個(gè)時(shí)間戳。
* map 用于存儲(chǔ)每個(gè)分區(qū)與其對(duì)應(yīng)的時(shí)間戳(fetchDataTime)。這個(gè)時(shí)間戳將用于從Kafka中拉取時(shí)間戳較早的消息。
*/
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
// 消費(fèi)消息
try {while (true) {consumer.poll(1000).forEach(record -> {// 可以修改為具體業(yè)務(wù)邏輯System.out.println("Consumed record with key: " + record.key() +", value: " + record.value() + ", from partition: " + record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 關(guān)閉消費(fèi)者consumer.close();
}
消費(fèi)者提交offset
手動(dòng)提交offset的意義
- 控制消費(fèi)進(jìn)度
手動(dòng)提交offset能夠讓消費(fèi)者在每個(gè)消息或消息批次消費(fèi)后,明確地告訴Kafka“我已經(jīng)消費(fèi)到這個(gè)offset了”。這對(duì)于控制消息消費(fèi)的精確性非常重要,尤其在需要精確控制消費(fèi)位置的場(chǎng)景中。
- 避免消息丟失或重復(fù)消費(fèi)
如果自動(dòng)提交offset,可能會(huì)發(fā)生消費(fèi)者在處理中出現(xiàn)異常(如程序崩潰),導(dǎo)致已消費(fèi)的消息的offset提交失敗,導(dǎo)致消息丟失或重復(fù)消費(fèi)。手動(dòng)提交則可以在處理完消息并確保成功時(shí)再提交offset,避免這種問(wèn)題。比如在金融交易、日志收集系統(tǒng)等場(chǎng)景中,需要確保消息的處理不會(huì)丟失,并且不會(huì)重復(fù)處理。
- 靈活的錯(cuò)誤處理與恢復(fù)
通過(guò)手動(dòng)提交offset,消費(fèi)者可以在消費(fèi)過(guò)程中靈活地處理錯(cuò)誤。如果在消費(fèi)某條消息時(shí)發(fā)生異常,消費(fèi)者可以選擇不提交offset,這樣在消費(fèi)者重啟或恢復(fù)時(shí)會(huì)重新消費(fèi)該消息。它使得消費(fèi)者在出錯(cuò)時(shí)能更好地控制重試策略。
代碼實(shí)現(xiàn)
同步提交
consumer.commitSync();
當(dāng)調(diào)用該方法時(shí),消費(fèi)者會(huì)將當(dāng)前消費(fèi)的偏移量提交到Kafka集群,并且當(dāng)前線程會(huì)阻塞,直到該提交操作完成。
優(yōu)勢(shì):
- 阻塞:會(huì)等待offset提交成功,不會(huì)繼續(xù)執(zhí)行后續(xù)代碼,直到提交完成。
- 可靠性:如果提交失敗,commitSync()會(huì)拋出異常,可以捕獲并進(jìn)行處理,確保提交正確。
缺點(diǎn):會(huì)導(dǎo)致性能問(wèn)題,因?yàn)樗鼤?huì)阻塞當(dāng)前線程,直到提交完成。
異步提交
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {// 處理異常}
});
回調(diào)函數(shù):異步提交會(huì)接受一個(gè)OffsetCommitCallback回調(diào)接口作為參數(shù),該接口的onComplete()方法會(huì)在提交操作完成時(shí)被調(diào)用。這個(gè)方法會(huì)接收到兩個(gè)參數(shù):
offsets:包含提交的偏移量信息(TopicPartition和OffsetAndMetadata)。
ex:如果提交發(fā)生錯(cuò)誤,該參數(shù)會(huì)包含異常信息。
優(yōu)勢(shì):
- 非阻塞:不會(huì)等待提交完成,允許程序繼續(xù)執(zhí)行其他操作。
- 提高吞吐量:減少等待時(shí)間,尤其是在批量消費(fèi)和提交的情況下,可以提高整體的吞吐量和性能。
缺點(diǎn):可能會(huì)出現(xiàn)提交失敗的情況,回調(diào)函數(shù)中的異常處理需要做好,以確保異常得到及時(shí)處理。
Spring boot集成
1. 添加依賴
在pom.xml 中添加 Kafka 的相關(guān)依賴
<dependencies><!-- Spring Boot Starter for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional if you need a web app) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter for Actuator (optional for monitoring) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Spring Boot Starter Test (optional for testing) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
2. 配置文件
application.yml
spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate
注意:
ack‐mode
RECORD:當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
BATCH:當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器處理之后提交
TIME:當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
COUNT:當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
TIME | COUNT:有一個(gè)條件滿足時(shí)提交
MANUAL:當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE:手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種(一次提交一條消息)
3. 啟動(dòng)類
package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {@Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 發(fā)送消息kafkaProducer.sendMessage("test-topic", "Hello, Kafka!");}
}
4. 生產(chǎn)者類
package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent: " + message);}
}
5. 消費(fèi)者類
package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group")public void consume(String message) {System.out.println("Consumed message: " + message);}@KafkaListener(topics = "test-topic",groupId = "test-group")public void consume1(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();ack.acknowledge(); //手動(dòng)提交offset}// 配置多個(gè)topic,concurrency就是同組下的消費(fèi)者個(gè)數(shù),就是并發(fā)消費(fèi)數(shù),必須小于等于分區(qū)總數(shù)@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 從topic1的分區(qū)0和1讀取消息@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) // 從topic2的分區(qū)0讀取消息,并設(shè)置分區(qū)1的初始偏移量為100}, concurrency = "6")public void listenToMultipleTopics(String message) {// 消費(fèi)消息的邏輯System.out.println("Group: testGroup, Message: " + message);}
}
Kafka事務(wù)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());// 初始化事務(wù)
producer.initTransactions();
try {// 開啟事務(wù)producer.beginTransaction();// 發(fā)到不同的主題的不同分區(qū)producer.send(/*...*/);// 提交事務(wù)producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滾事務(wù)producer.abortTransaction();
}
// 關(guān)閉
producer.close();
spring框架下Kafka事務(wù)
可以通過(guò)**@Transactional**實(shí)現(xiàn)
配置
可以通過(guò)在application.yml文件或KafkaConfig配置類中添加配置的方式,提供事務(wù)支持。
1. application.yml
spring:kafka:bootstrap-servers: localhost:9092 # Kafka 集群地址producer:acks: all # 確保消息被所有副本確認(rèn)transactional-id-prefix: tx- # 事務(wù)前綴,Kafka 事務(wù)需要一個(gè)事務(wù) ID 前綴consumer:group-id: test-group # 消費(fèi)者組 IDenable-auto-commit: false # 手動(dòng)提交 offsetlistener:ack-mode: manual # 設(shè)置為手動(dòng)提交確認(rèn)
2. 配置類
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {// 設(shè)置 Kafka 生產(chǎn)者的事務(wù)管理器KafkaTransactionManager<String, String> transactionManager =new KafkaTransactionManager<>(producerFactory());KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}@Beanpublic DefaultKafkaProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-"); // 事務(wù) IDreturn new DefaultKafkaProducerFactory<>(configProps);}
}
生產(chǎn)者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;@Service
public class KafkaTransactionProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Transactionalpublic void sendTransactionalMessages() {try {// 發(fā)送事務(wù)消息kafkaTemplate.send("topic1", "key1", "message1");kafkaTemplate.send("topic2", "key2", "message2");// 你可以在此處加入其他業(yè)務(wù)邏輯,如果出現(xiàn)異常,會(huì)回滾事務(wù)if (someConditionFails()) {throw new RuntimeException("Simulating failure to trigger rollback");}// 如果沒有異常,事務(wù)提交,消息將被正常發(fā)送} catch (Exception e) {// 事務(wù)回滾System.out.println("Transaction failed, rolling back...");throw e;}}private boolean someConditionFails() {// 模擬某些條件下事務(wù)失敗return true;}
}
消費(fèi)者
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaTransactionConsumer {@KafkaListener(topics = "topic1", groupId = "test-group")public void listenTopic1(String message) {System.out.println("Received message from topic1: " + message);}@KafkaListener(topics = "topic2", groupId = "test-group")public void listenTopic2(String message) {System.out.println("Received message from topic2: " + message);}
}
在生產(chǎn)者的配置中啟用事務(wù),配置 transactional.id,并設(shè)置事務(wù)管理器 KafkaTransactionManager,它會(huì)自動(dòng)管理 Kafka 事務(wù)的開始、提交和回滾。
事務(wù)管理:@Transactional 注解用于標(biāo)識(shí)在發(fā)送消息的過(guò)程是一個(gè)事務(wù)操作。如果其中任何消息發(fā)送失敗,Spring Kafka 會(huì)自動(dòng)回滾事務(wù)。
回滾機(jī)制:在 sendTransactionalMessages() 中模擬了一個(gè)失敗的條件,確保事務(wù)在遇到異常時(shí)會(huì)被回滾。