暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka 心得体会(1)

小白逆袭之路 2021-07-28
1091
kafka 是一种属于 点对点 的分布式消息队列,用于大数据比较多。kafka 消息队列 依赖于kafka,所以在搭建kafka 集群时,需要先搭建 zookeeper 集群。

kafka 的学习 有以下几个重要的概念:
Broker:在搭建kafka 集群时,一个kafka 实例就是一个 broker;
Topic:主题,可以理解为一个数据库;
Partition:分区,可以理解为数据库的分表;
Offset:偏移量,可以理解为数据库表的主键;
Replication: 副本数,可以理解为数据备份
Consumer Group:消费组,一个或者多个消费者构成1个消费组,两个消费组之间是隔离的

关于 分区,副本的参数设置
首先kafka 有个特点,在同一个消费组下,同一时间,一个分区只能被1个消费者消费,一个消费者可以有多个分区
分区:例如,kafka 消费者集群里 消费者的实例数为3,设置topic的分区数应为3,6,9等为3的倍数(这样在消费消息时可以达到负载均衡),一般为3,这样每个消费者占用1个分区,若分区数为2,则将会有一个消费者没分区,会空闲,不能最大程度利用实例;
副本:例如副本数为3,则将会有1个leader,2个follwer;在没有副本的情况下,如果broker宕机,其上面所有分区的数据将不能被消费,同时生产者也不能将消息发送到该分区上

关于acks 配置的理解,acks 值有3个

0:生产者只管往leader 发数据,不管有没有收到,此时效率最快,安全最低

1:生产者往leader发数据,leader确认收到,但是不管followers 有没有收到数据

all:则是两者都有收到数据

确保消息不丢失,需要将 acks 设置为 all,重试retries的值需要大于0,副本数也要大于1


       关于消费组的注意事项:

经过测试:消费组之间是相互隔离的,一条消息在某些情况下可以被重复消费,例如:消费组 a有消费者1,  消费组b有消费者2,消费者1和消费者2 都监听同一个topic ,若此时生产者往这个topic 发送消息c,则消费者1和消费者2 都可以 消费 消息c;并不会出现 消息c被 消费者1和消费者2 的某一个消费后,另一个不消费的情况;会出现重复消费 


kafka 消息的模式属于推拉模式,生产者将消息推到broker,消费者从服务端broker 拉取消息

kafka 的主要应用场景
日志收集;用户活动跟踪;
运营指标;流式处理
 
Kafka 集成到 springcloud 里面:
将kafka的消费者和生产者区分为2个模块,kafka 的提供者 作为 springCloud里面的一个提供者(这样子可以利用断路器 可以对kafka 进行服务降级和)

 



consumer-kafka-provide配置


  1. kafka 提供者的pom 文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
    <artifactId>consumer</artifactId>
    <groupId>com.xhj</groupId>
    <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>consumer-kafka-provide</artifactId>
    <properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
    </properties>


    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>


    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
    <exclusion>
    <groupId>org.junit.vintage</groupId>
    <artifactId>junit-vintage-engine</artifactId>
    </exclusion>
    </exclusions>
    </dependency>


    <!-- hystrix -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
    <!-- 将微服务provider侧注册进nacos开始 -->
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    </dependency>
    <!-- 将微服务provider注册进nacos结束 -->
    <!-- nacos配置中心 -->
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    <version>${spring-cloud-starter-alibaba-nacos.version}</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-server</artifactId>
    <version>2.25.1</version>
    </dependency>


    </dependencies>


    <dependencyManagement>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>${spring-boot.version}</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
    </dependencies>
    </dependencyManagement>


    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.1</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    <encoding>UTF-8</encoding>
    </configuration>
    </plugin>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <version>2.3.7.RELEASE</version>
    <configuration>
    <mainClass>com.xhj.ConsumerKafkaNewApplication</mainClass>
    </configuration>
    <executions>
    <execution>
    <id>repackage</id>
    <goals>
    <goal>repackage</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>


    </project>
    2. kafka 提供者 配置文件 KafkaProducerConfig (这里有集成了kafka 事务,开启了事务后,kafka发送信息都需要使用executeInTransaction 的形式发送信息
      package com.top.init;


      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.serialization.StringSerializer;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.annotation.EnableKafka;
      import org.springframework.kafka.core.DefaultKafkaProducerFactory;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.kafka.core.ProducerFactory;
      import org.springframework.kafka.transaction.KafkaTransactionManager;


      import java.util.HashMap;
      import java.util.Map;


      /**
      * 功能:
      *
      * @author
      * 2021/7/27 20:54
      **/


      @Configuration
      @EnableKafka
      public class KafkaProducerConfig {




      public Map<String, Object> producerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.144:9092,192.168.16.145:9092,192.168.16.146:9092");
      props.put(ProducerConfig.RETRIES_CONFIG, 3);
      props.put(ProducerConfig.ACKS_CONFIG, "all");
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
      props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
      props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return props;
      }


      public ProducerFactory<String, String> producerFactory() {
      DefaultKafkaProducerFactory producerFactory=new DefaultKafkaProducerFactory<>(producerConfigs());
      //设置事务Id前缀 开启事务
      producerFactory.setTransactionIdPrefix("tx-");
      return producerFactory;
      }


      @Bean
      public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<String, String>(producerFactory());
      }




      @Bean
      public KafkaTransactionManager<Integer, String> kafkaTransactionManager() {
      return new KafkaTransactionManager(producerFactory());
      }




      }


      KafkaProducer  文件
        package com.top.kafka;


        import org.springframework.kafka.core.KafkaOperations;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.transaction.annotation.Transactional;
        import org.springframework.util.concurrent.ListenableFutureCallback;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RestController;


        import javax.annotation.Resource;
        import java.sql.Timestamp;
        import java.util.concurrent.ExecutionException;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.TimeoutException;


        /**
        * 功能: kafka 生产者,使用的是注解的方式
        * spring-kafka
        * @author 谢宏基
        * 2021/7/20 23:03
        **/


        @RestController
        public class KafkaProducer {
        @Resource
        private KafkaTemplate<String, Object> kafkaTemplate;
        /**
        * kafka 事务
        * @throws InterruptedException
        * @throws ExecutionException
        * @throws TimeoutException
        */
        @GetMapping("/kafka/normal/3")
        public void sendMessage3() throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
        @Override
        public Object doInOperations(KafkaOperations operations) {
        operations.send("testtopic","test executeInTransaction");
        return null;
        }
        });
        }





        }


        3. kafka 提供者 启动类 ConsumerKafkaProvideApplication
          package com.top;


          import org.springframework.boot.SpringApplication;
          import org.springframework.boot.autoconfigure.SpringBootApplication;
          import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
          import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
          import org.springframework.transaction.annotation.EnableTransactionManagement;


          /**
          * 功能:kafka 生产者00
          *
          * @author
          * 2021/7/27 20:24
          **/
          @SpringBootApplication
          @EnableTransactionManagement
          @EnableDiscoveryClient
          @EnableCircuitBreaker
          public class ConsumerKafkaProvideApplication {


          public static void main(String[] args) {
          SpringApplication.run(ConsumerKafkaProvideApplication.class, args);
          }
          }


          4. kafka 提供者 bootstrap.yml 文件
            spring:
            application:
            name: consumer-kafka-provide
            cloud:
            nacos:
            config:
            #nacos.top.com:80 是本地的,内含nacos集群
            server-addr: nacos.top.com:80
            namespace: 236b9804-b53a-4bcf-92c3-451fbbdba073
            group: develop
            file-extension: yaml
            prefix: ${spring.application.name}
            extension-configs[0]:
            dataId: discovery_nacos.yaml
            group: develop
            refresh: true


            在这里是以nacos 为springCloud 的配置中心和注册中心,nacos.top.com 是事先搭建的nacos 集群,经由nginx配置负载均衡
            5. kafka 提供者 application.yml 文件
              server.port: 6011
              6. nacos 上discovery_nacos.yaml 文件(注册中心配置)
                spring:
                cloud:
                nacos:
                # Nacos帮助文档: https://nacos.io/zh-cn/docs/concepts.html
                # Nacos认证信息
                      discovery:
                server-addr: nacos.top.com
                namespace: 236b9804-b53a-4bcf-92c3-451fbbdba073
                springcloud-api-test的配置


                1. pom 文件配置


                    <?xml version="1.0" encoding="UTF-8"?>
                    <project xmlns="http://maven.apache.org/POM/4.0.0"
                    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <parent>
                    <artifactId>learn-api</artifactId>
                    <groupId>org.top</groupId>
                    <version>1.0-SNAPSHOT</version>
                    </parent>
                    <modelVersion>4.0.0</modelVersion>


                    <artifactId>springcloud-api-test</artifactId>
                    <dependencies>
                    <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-starter-feign</artifactId>
                    <version>${spring-cloud-starter-feign.version}</version>
                    </dependency>
                    <dependency>
                    <groupId>org.top</groupId>
                    <artifactId>springcloud-model</artifactId>
                    <version>${top-version}</version>
                    </dependency>
                    </dependencies>




                    </project>
                  2.KafkaClientService 文件
                    package com.xhj.springcloud;


                    import org.springframework.cloud.openfeign.FeignClient;
                    import org.springframework.web.bind.annotation.RequestMapping;
                    import org.springframework.web.bind.annotation.RequestMethod;


                    @FeignClient(value = "consumer-kafka-provide" ,fallbackFactory=KafkaClientServiceFallbackFactory.class)
                    public interface KafkaClientService {
                    @RequestMapping(value = "/kafka/normal/3", method = RequestMethod.GET)
                    public void testKafka();
                    }



                    3.KafkaClientServiceFallbackFactory 文件
                      package com.xhj.springcloud;


                      import feign.hystrix.FallbackFactory;
                      import org.springframework.stereotype.Component;


                      import java.util.List;


                      @Component // 不要忘记添加,不要忘记添加
                      public class KafkaClientServiceFallbackFactory implements FallbackFactory<KafkaClientService>
                      {
                      @Override
                      public KafkaClientService create(Throwable throwable)
                      {
                      return new KafkaClientService() {
                      @Override
                      public void testKafka() {
                      System.out.println("kafka降级。。。。。。");
                      }
                      };
                      }
                      }


                      springcloud-consumer-testnew 配置


                      1. pom 文件

                        <?xml version="1.0" encoding="UTF-8"?>
                        <project xmlns="http://maven.apache.org/POM/4.0.0"
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                        <parent>
                        <artifactId>springcloud-consumer</artifactId>
                        <groupId>org.top</groupId>
                        <version>1.0-SNAPSHOT</version>
                        </parent>
                        <modelVersion>4.0.0</modelVersion>


                        <artifactId>springcloud-consumer-testnew</artifactId>
                        <dependencies>
                        <!-- Ribbon相关 -->
                        <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
                        <version>${spring-cloud-starter-netflix-ribbon.version}</version>
                        </dependency>


                        <!--主要添加对feign的支持-->
                        <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-starter-feign</artifactId>
                        <version>${spring-cloud-starter-feign.version}</version>
                        </dependency>
                        <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jetty</artifactId>
                        </dependency>
                        <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                        </dependency>
                        <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>


                        </dependency>
                        <!-- 修改后立即生效,热部署 -->
                        <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>springloaded</artifactId>
                        <version>${springloaded.version}</version>


                        </dependency>
                        <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-devtools</artifactId>
                        </dependency>
                        <dependency>
                        <groupId>org.top</groupId>
                        <artifactId>model-test</artifactId>
                        <version>${top-version}</version>
                        <scope>compile</scope>
                        </dependency>
                        <dependency>
                        <groupId>org.top</groupId>
                        <artifactId>springcloud-model</artifactId>
                        <version>${top-version}</version>
                        <scope>compile</scope>
                        </dependency>
                        <dependency>
                        <groupId>org.top</groupId>
                        <artifactId>springcloud-api-test</artifactId>
                        <version>${top-version}</version>
                        <scope>compile</scope>
                        </dependency>


                        <!-- 将微服务provider侧注册进nacos开始 -->
                        <dependency>
                        <groupId>com.alibaba.cloud</groupId>
                        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
                        </dependency>
                        <dependency>
                        <groupId>com.alibaba.nacos</groupId>
                        <artifactId>nacos-client</artifactId>
                        </dependency>
                        <!-- 将微服务provider注册进nacos结束 -->
                        <!-- nacos配置中心 -->
                        <dependency>
                        <groupId>com.alibaba.cloud</groupId>
                        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
                        <version>${spring-cloud-starter-alibaba-nacos.version}</version>
                        </dependency>
                        </dependencies>


                        </project>
                        2.bootstrap.yml 文件
                           # Nacos帮助文档: https://nacos.io/zh-cn/docs/concepts.html
                          # Nacos认证信息
                          spring:
                          application:
                          name: springcloud-consumer
                          cloud:
                          nacos:
                          config:
                                  #nnacos.top.com:80是本地的,内含nacos集群
                          server-addr: nacos.top.com:80
                          namespace: 236b9804-b53a-4bcf-92c3-451fbbdba073
                          group: develop
                          file-extension: yaml
                          extension-configs[0]:
                          dataId: discovery_nacos.yaml
                          group: develop
                          refresh: true




                          3.application.yml
                            server.port: 8007
                            feign:
                            hystrix:
                            enabled: true #开启断路器(需要在api层配置)








                            4.SpringCloudConsumerNewApplication 启动类
                              package com.xhj;






                              import com.rule.MyRibbonRule;
                              import org.slf4j.Logger;
                              import org.slf4j.LoggerFactory;
                              import org.springframework.boot.SpringApplication;
                              import org.springframework.boot.autoconfigure.SpringBootApplication;
                              import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
                              import org.springframework.cloud.netflix.ribbon.RibbonClient;
                              import org.springframework.cloud.openfeign.EnableFeignClients;
                              import org.springframework.context.annotation.ComponentScan;


                              import java.io.IOException;






                              @SpringBootApplication
                              // 使用Feign 调用服务端
                              @EnableFeignClients
                              @EnableDiscoveryClient
                              public class SpringCloudConsumerNewApplication {
                              private static Logger LOGGER= LoggerFactory.getLogger(SpringCloudConsumerNewApplication.class);
                              public static void main(String[] args) throws IOException {
                                  SpringApplication.run(SpringCloudConsumerNewApplication.class, args);
                              }
                              }


                              5.StudyConsumerController  控制层
                                package com.xhj.controller;


                                //import com.xhj.base.Result;
                                //
                                //import com.xhj.springcloud.Dept;
                                //import com.xhj.DeptClientService;
                                import com.xhj.springcloud.DeptClientService;
                                import com.xhj.base.Result;
                                import com.xhj.springcloud.Dept;
                                import com.xhj.springcloud.KafkaClientService;
                                import org.slf4j.Logger;
                                import org.slf4j.LoggerFactory;


                                import org.springframework.cloud.client.ServiceInstance;


                                import org.springframework.core.ParameterizedTypeReference;
                                import org.springframework.http.HttpMethod;
                                import org.springframework.stereotype.Controller;
                                import org.springframework.web.bind.annotation.GetMapping;
                                import org.springframework.web.bind.annotation.RequestMapping;
                                import org.springframework.web.bind.annotation.ResponseBody;
                                import org.springframework.web.client.RestTemplate;
                                import org.springframework.web.client.RestTemplate;


                                import javax.annotation.Resource;
                                import java.util.List;


                                /**
                                * 功能:一个全部用来学习,写测试语法的控制层
                                *
                                 * @author 
                                * 2021/4/27 23:19
                                **/
                                @Controller
                                @RequestMapping("/springcloud/consumer")
                                public class StudyConsumerController {
                                private static Logger LOGGER = LoggerFactory.getLogger(StudyConsumerController.class);
                                   
                                @Resource
                                private KafkaClientService kafkaClientService;



                                /**
                                * 测试kafka
                                * @return
                                * @throws Exception
                                */
                                @GetMapping("/testKafka")
                                @ResponseBody
                                public Result testKafka() throws Exception {
                                LOGGER.error("测试kafka");
                                kafkaClientService.testKafka();
                                return new Result("success");
                                }
                                }



                                consumer-kafka-consumer 配置

                                1. pom 文件


                                    <?xml version="1.0" encoding="UTF-8"?>
                                    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                                    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
                                    <modelVersion>4.0.0</modelVersion>
                                    <groupId>com.xhj</groupId>
                                    <artifactId>consumer-kafka-consumer</artifactId>
                                    <version>0.0.1-SNAPSHOT</version>
                                    <name>consumer-kafka-consumer</name>
                                    <description>Demo project for Spring Boot</description>
                                    <parent>
                                    <artifactId>consumer</artifactId>
                                    <groupId>com.xhj</groupId>
                                    <version>0.0.1-SNAPSHOT</version>
                                    </parent>
                                    <properties>
                                    <java.version>1.8</java.version>
                                    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                                    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
                                    <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
                                    </properties>


                                    <dependencies>
                                    <dependency>
                                    <groupId>org.springframework.boot</groupId>
                                    <artifactId>spring-boot-starter-web</artifactId>
                                    </dependency>


                                    <dependency>
                                    <groupId>org.springframework.kafka</groupId>
                                    <artifactId>spring-kafka</artifactId>
                                    </dependency>
                                    <dependency>
                                    <groupId>org.springframework.boot</groupId>
                                    <artifactId>spring-boot-starter-test</artifactId>
                                    <scope>test</scope>
                                    <exclusions>
                                    <exclusion>
                                    <groupId>org.junit.vintage</groupId>
                                    <artifactId>junit-vintage-engine</artifactId>
                                    </exclusion>
                                    </exclusions>
                                    </dependency>
                                    </dependencies>


                                    <dependencyManagement>
                                    <dependencies>
                                    <dependency>
                                    <groupId>org.springframework.boot</groupId>
                                    <artifactId>spring-boot-dependencies</artifactId>
                                    <version>${spring-boot.version}</version>
                                    <type>pom</type>
                                    <scope>import</scope>
                                    </dependency>
                                    </dependencies>
                                    </dependencyManagement>


                                    <build>
                                    <plugins>
                                    <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <version>3.8.1</version>
                                    <configuration>
                                    <source>1.8</source>
                                    <target>1.8</target>
                                    <encoding>UTF-8</encoding>
                                    </configuration>
                                    </plugin>
                                    <plugin>
                                    <groupId>org.springframework.boot</groupId>
                                    <artifactId>spring-boot-maven-plugin</artifactId>
                                    <version>2.3.7.RELEASE</version>
                                    <configuration>
                                    <mainClass>com.xhj.ConsumerKafkaNewApplication</mainClass>
                                    </configuration>
                                    <executions>
                                    <execution>
                                    <id>repackage</id>
                                    <goals>
                                    <goal>repackage</goal>
                                    </goals>
                                    </execution>
                                    </executions>
                                    </plugin>
                                    </plugins>
                                    </build>


                                    </project>


                                  2.application.properties文件
                                    # 应用名称
                                    spring.application.name=consumer-kafka-new
                                    # 应用服务 WEB 访问端口
                                    spring.profiles.active: kafka1


                                    ###########【Kafka集群】###########
                                    spring.kafka.bootstrap-servers=192.168.16.144:9092,192.168.16.145:9092,192.168.16.146:9092




                                    ###########【初始化消费者配置】###########
                                    # 默认的消费组ID
                                    spring.kafka.consumer.properties.group.id=defaultConsumerGroup


                                    spring.kafka.consumer.properties.group.id1=consumerGroup1


                                    spring.kafka.consumer.properties.group.id2=consumerGroup2
                                    # 是否自动提交offset
                                    spring.kafka.consumer.enable-auto-commit=true
                                    # 提交offset延时(接收到消息后多久提交offset)
                                    spring.kafka.consumer.auto.commit.interval.ms=1000
                                    # 当kafka中没有初始offset或offset超出范围时将自动重置offset
                                    # earliest:重置为分区中最小的offset;
                                    # latest:重置为分区中最新的offset(消费分区中新产生的数据);
                                    # none:只要有一个分区不存在已提交的offset,就抛出异常;
                                    spring.kafka.consumer.auto-offset-reset=latest
                                    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
                                    spring.kafka.consumer.properties.session.timeout.ms=120000
                                    # 消费请求超时时间
                                    spring.kafka.consumer.properties.request.timeout.ms=1000
                                    # Kafka提供的序列化和反序列化类
                                    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
                                    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
                                    # 消费端监听的topic不存在时,项目启动会报错(关掉)
                                    spring.kafka.listener.missing-topics-fatal=false
                                    # 设置批量消费
                                    spring.kafka.listener.type=batch
                                    # 批量消费每次最多消费多少条消息
                                    spring.kafka.consumer.max-poll-records=3








                                    3.application-kafka1.yml 文件 (这个是为了在只有一个项目情况下,设置不同端口号,在启动时,指定配置文件,模拟出多个kafka消费者的情况)
                                      server.port: 6001
                                      4.ConsumerKafkaNewApplication 启动类
                                        package com.xhj;


                                        import org.springframework.boot.SpringApplication;
                                        import org.springframework.boot.autoconfigure.SpringBootApplication;


                                        @SpringBootApplication
                                        public class ConsumerKafkaNewApplication {


                                        public static void main(String[] args) {
                                        SpringApplication.run(ConsumerKafkaNewApplication.class, args);
                                        }


                                        }


                                        5.KafkaConsumer  文件
                                          package com.xhj.consumerkafka;


                                          import org.apache.kafka.clients.consumer.ConsumerRecord;
                                          import org.apache.kafka.clients.consumer.ConsumerRecords;
                                          import org.springframework.beans.factory.annotation.Autowired;
                                          import org.springframework.kafka.annotation.KafkaListener;
                                          import org.springframework.stereotype.Component;


                                          import java.util.Random;
                                          import java.util.concurrent.ExecutorService;
                                          import java.util.concurrent.Executors;
                                          import java.util.concurrent.TimeUnit;


                                          /**
                                          * 功能:
                                          *
                                          * @author 谢宏基
                                          * 2021/7/20 23:11
                                          **/


                                          @Component
                                          public class KafkaConsumer {






                                          // 消费监听
                                          @KafkaListener(topics = {"testtopic","taoguba1"})
                                          public void onMessage1(ConsumerRecord<?, ?> record){
                                          // 消费的哪个topic、partition的消息,打印出消息内容
                                          int random=new Random().nextInt(10) + 1;
                                          System.out.println("-----------------"+random);
                                          // if(random>5){
                                          // int b=10/0;
                                          // }
                                          // try {
                                          // Thread.sleep(2000);
                                          // }catch (Exception e){
                                          //
                                          //


                                          System.out.println("简单消费:"+System.currentTimeMillis()+";topic"+record.topic()+";分区"+record.partition()+";数据"+record.value()+";偏移"+record.offset()+";key值"+record.key());
                                          }








                                          }



                                          文章转载自小白逆袭之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                          评论