0:生产者只管往leader 发数据,不管有没有收到,此时效率最快,安全最低
1:生产者往leader发数据,leader确认收到,但是不管followers 有没有收到数据
all:则是两者都有收到数据
确保消息不丢失,需要将 acks 设置为 all,重试retries的值需要大于0,副本数也要大于1
关于消费组的注意事项:
经过测试:消费组之间是相互隔离的,一条消息在某些情况下可以被重复消费,例如:消费组 a有消费者1, 消费组b有消费者2,消费者1和消费者2 都监听同一个topic ,若此时生产者往这个topic 发送消息c,则消费者1和消费者2 都可以 消费 消息c;并不会出现 消息c被 消费者1和消费者2 的某一个消费后,另一个不消费的情况;会出现重复消费
Kafka 集成到 springcloud 里面:


consumer-kafka-provide配置

kafka 提供者的pom 文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>consumer</artifactId><groupId>com.xhj</groupId><version>0.0.1-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>consumer-kafka-provide</artifactId><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.7.RELEASE</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><!-- hystrix --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency><!-- 将微服务provider侧注册进nacos开始 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><!-- 将微服务provider注册进nacos结束 --><!-- nacos配置中心 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>${spring-cloud-starter-alibaba-nacos.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.glassfish.jersey.core</groupId><artifactId>jersey-server</artifactId><version>2.25.1</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.7.RELEASE</version><configuration><mainClass>com.xhj.ConsumerKafkaNewApplication</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
package com.top.init;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;import java.util.Map;/*** 功能:** @author* 2021/7/27 20:54**/@Configuration@EnableKafkapublic class KafkaProducerConfig {public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.144:9092,192.168.16.145:9092,192.168.16.146:9092");props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 0);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {DefaultKafkaProducerFactory producerFactory=new DefaultKafkaProducerFactory<>(producerConfigs());//设置事务Id前缀 开启事务producerFactory.setTransactionIdPrefix("tx-");return producerFactory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}@Beanpublic KafkaTransactionManager<Integer, String> kafkaTransactionManager() {return new KafkaTransactionManager(producerFactory());}}
package com.top.kafka;import org.springframework.kafka.core.KafkaOperations;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.concurrent.ListenableFutureCallback;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.sql.Timestamp;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/*** 功能: kafka 生产者,使用的是注解的方式* spring-kafka* @author 谢宏基* 2021/7/20 23:03**/@RestControllerpublic class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** kafka 事务* @throws InterruptedException* @throws ExecutionException* @throws TimeoutException*/@GetMapping("/kafka/normal/3")public void sendMessage3() throws InterruptedException, ExecutionException, TimeoutException {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){@Overridepublic Object doInOperations(KafkaOperations operations) {operations.send("testtopic","test executeInTransaction");return null;}});}}
package com.top;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.transaction.annotation.EnableTransactionManagement;/*** 功能:kafka 生产者00** @author* 2021/7/27 20:24**/@SpringBootApplication@EnableTransactionManagement@EnableDiscoveryClient@EnableCircuitBreakerpublic class ConsumerKafkaProvideApplication {public static void main(String[] args) {SpringApplication.run(ConsumerKafkaProvideApplication.class, args);}}
spring:application:name: consumer-kafka-providecloud:nacos:config:#nacos.top.com:80 是本地的,内含nacos集群server-addr: nacos.top.com:80namespace: 236b9804-b53a-4bcf-92c3-451fbbdba073group: developfile-extension: yamlprefix: ${spring.application.name}extension-configs[0]:dataId: discovery_nacos.yamlgroup: developrefresh: true
server.port: 6011
spring:cloud:nacos:# Nacos帮助文档: https://nacos.io/zh-cn/docs/concepts.html# Nacos认证信息discovery:server-addr: nacos.top.comnamespace: 236b9804-b53a-4bcf-92c3-451fbbdba073

pom 文件配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>learn-api</artifactId><groupId>org.top</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>springcloud-api-test</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-feign</artifactId><version>${spring-cloud-starter-feign.version}</version></dependency><dependency><groupId>org.top</groupId><artifactId>springcloud-model</artifactId><version>${top-version}</version></dependency></dependencies></project>
package com.xhj.springcloud;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;@FeignClient(value = "consumer-kafka-provide" ,fallbackFactory=KafkaClientServiceFallbackFactory.class)public interface KafkaClientService {@RequestMapping(value = "/kafka/normal/3", method = RequestMethod.GET)public void testKafka();}
package com.xhj.springcloud;import feign.hystrix.FallbackFactory;import org.springframework.stereotype.Component;import java.util.List;@Component // 不要忘记添加,不要忘记添加public class KafkaClientServiceFallbackFactory implements FallbackFactory<KafkaClientService>{@Overridepublic KafkaClientService create(Throwable throwable){return new KafkaClientService() {@Overridepublic void testKafka() {System.out.println("kafka降级。。。。。。");}};}}

pom 文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-consumer</artifactId><groupId>org.top</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>springcloud-consumer-testnew</artifactId><dependencies><!-- Ribbon相关 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-ribbon</artifactId><version>${spring-cloud-starter-netflix-ribbon.version}</version></dependency><!--主要添加对feign的支持--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-feign</artifactId><version>${spring-cloud-starter-feign.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jetty</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!-- 修改后立即生效,热部署 --><dependency><groupId>org.springframework</groupId><artifactId>springloaded</artifactId><version>${springloaded.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId></dependency><dependency><groupId>org.top</groupId><artifactId>model-test</artifactId><version>${top-version}</version><scope>compile</scope></dependency><dependency><groupId>org.top</groupId><artifactId>springcloud-model</artifactId><version>${top-version}</version><scope>compile</scope></dependency><dependency><groupId>org.top</groupId><artifactId>springcloud-api-test</artifactId><version>${top-version}</version><scope>compile</scope></dependency><!-- 将微服务provider侧注册进nacos开始 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><!-- 将微服务provider注册进nacos结束 --><!-- nacos配置中心 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>${spring-cloud-starter-alibaba-nacos.version}</version></dependency></dependencies></project>
# Nacos帮助文档: https://nacos.io/zh-cn/docs/concepts.html# Nacos认证信息spring:application:name: springcloud-consumercloud:nacos:config:#nnacos.top.com:80是本地的,内含nacos集群server-addr: nacos.top.com:80namespace: 236b9804-b53a-4bcf-92c3-451fbbdba073group: developfile-extension: yamlextension-configs[0]:dataId: discovery_nacos.yamlgroup: developrefresh: true
server.port: 8007feign:hystrix:enabled: true #开启断路器(需要在api层配置)
package com.xhj;import com.rule.MyRibbonRule;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.cloud.netflix.ribbon.RibbonClient;import org.springframework.cloud.openfeign.EnableFeignClients;import org.springframework.context.annotation.ComponentScan;import java.io.IOException;@SpringBootApplication// 使用Feign 调用服务端@EnableFeignClients@EnableDiscoveryClientpublic class SpringCloudConsumerNewApplication {private static Logger LOGGER= LoggerFactory.getLogger(SpringCloudConsumerNewApplication.class);public static void main(String[] args) throws IOException {SpringApplication.run(SpringCloudConsumerNewApplication.class, args);}}
package com.xhj.controller;//import com.xhj.base.Result;////import com.xhj.springcloud.Dept;//import com.xhj.DeptClientService;import com.xhj.springcloud.DeptClientService;import com.xhj.base.Result;import com.xhj.springcloud.Dept;import com.xhj.springcloud.KafkaClientService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.client.ServiceInstance;import org.springframework.core.ParameterizedTypeReference;import org.springframework.http.HttpMethod;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.client.RestTemplate;import org.springframework.web.client.RestTemplate;import javax.annotation.Resource;import java.util.List;/*** 功能:一个全部用来学习,写测试语法的控制层** @author* 2021/4/27 23:19**/@Controller@RequestMapping("/springcloud/consumer")public class StudyConsumerController {private static Logger LOGGER = LoggerFactory.getLogger(StudyConsumerController.class);@Resourceprivate KafkaClientService kafkaClientService;/*** 测试kafka* @return* @throws Exception*/@GetMapping("/testKafka")@ResponseBodypublic Result testKafka() throws Exception {LOGGER.error("测试kafka");kafkaClientService.testKafka();return new Result("success");}}

pom 文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xhj</groupId><artifactId>consumer-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer-kafka-consumer</name><description>Demo project for Spring Boot</description><parent><artifactId>consumer</artifactId><groupId>com.xhj</groupId><version>0.0.1-SNAPSHOT</version></parent><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.7.RELEASE</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.7.RELEASE</version><configuration><mainClass>com.xhj.ConsumerKafkaNewApplication</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
# 应用名称spring.application.name=consumer-kafka-new# 应用服务 WEB 访问端口spring.profiles.active: kafka1###########【Kafka集群】###########spring.kafka.bootstrap-servers=192.168.16.144:9092,192.168.16.145:9092,192.168.16.146:9092###########【初始化消费者配置】############ 默认的消费组IDspring.kafka.consumer.properties.group.id=defaultConsumerGroupspring.kafka.consumer.properties.group.id1=consumerGroup1spring.kafka.consumer.properties.group.id2=consumerGroup2# 是否自动提交offsetspring.kafka.consumer.enable-auto-commit=true# 提交offset延时(接收到消息后多久提交offset)spring.kafka.consumer.auto.commit.interval.ms=1000# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;spring.kafka.consumer.auto-offset-reset=latest# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)spring.kafka.consumer.properties.session.timeout.ms=120000# 消费请求超时时间spring.kafka.consumer.properties.request.timeout.ms=1000# Kafka提供的序列化和反序列化类spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 消费端监听的topic不存在时,项目启动会报错(关掉)spring.kafka.listener.missing-topics-fatal=false# 设置批量消费spring.kafka.listener.type=batch# 批量消费每次最多消费多少条消息spring.kafka.consumer.max-poll-records=3
server.port: 6001
package com.xhj;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ConsumerKafkaNewApplication {public static void main(String[] args) {SpringApplication.run(ConsumerKafkaNewApplication.class, args);}}
package com.xhj.consumerkafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/*** 功能:** @author 谢宏基* 2021/7/20 23:11**/@Componentpublic class KafkaConsumer {// 消费监听@KafkaListener(topics = {"testtopic","taoguba1"})public void onMessage1(ConsumerRecord<?, ?> record){// 消费的哪个topic、partition的消息,打印出消息内容int random=new Random().nextInt(10) + 1;System.out.println("-----------------"+random);// if(random>5){// int b=10/0;// }// try {// Thread.sleep(2000);// }catch (Exception e){////System.out.println("简单消费:"+System.currentTimeMillis()+";topic"+record.topic()+";分区"+record.partition()+";数据"+record.value()+";偏移"+record.offset()+";key值"+record.key());}}
文章转载自小白逆袭之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




