暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:sarama kafka client(part III:client的角色)

理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:

        我们用到了各种各样的client,返回的对象都是一个Broker的指针,本质上讲,我们通过kafka client 最终都是和broker通信,所以用Broker对象封装和kafka的连接,表示Client。不同场景下,Client有不同的角色,角色是通过元数据来确定的。

    func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) 
    metadata := client.cachedMetadata(topic, partitionID)
    err := client.RefreshMetadata(topic)
      func (client *client) RefreshMetadata(topics ...string) error
      return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
        func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error 
        broker := client.any()
        response, err := broker.GetMetadata(req)
        err := b.sendAndReceive(request, response)
            shouldRetryerr := client.updateMetadata(response, allKnownMetaData)
        client.deregisterBroker(broker)
        _ = broker.Close()

        可以看到,获取元数据的过程是随机选择一个broker(没有元数据不知道身份),然后获取元数据,存储下来

        1,Controller

                首先是Controller,Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务

          func (client *client) Controller() (*Broker, error){
          client.refreshMetadata()
                 controller := client.cachedController()
             _ = controller.Open(client.conf)
          }
            func (client *clientcachedController() *Broker {
            return client.brokers[client.controllerID]
             }

            通过元数据,获取controllerId,然后通过controllerID找到Controller,和前面介绍的一样,topic和partation相关的增删操作会用到controller,主要在admin.go

              func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)
              _, err := client.Controller()
              func (ca *clusterAdmin) Controller() (*Broker, error)
                  return ca.client.Controller()
              func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
              b, err := ca.Controller()
              func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
              controller, err := ca.Controller()
              func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
              b, err := ca.Controller()
              func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
              b, err := ca.Controller()

              2,Coordinator

              每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

                func (client *client) Coordinator(consumerGroup string) (*Broker, error) 
                _ = coordinator.Open(client.conf)

                获取元数据后,就会获取Coordinator

                  func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) 
                  broker := client.any();
                      request := new(FindCoordinatorRequest)
                  request.CoordinatorKey = consumerGroup
                  request.CoordinatorType = CoordinatorGroup
                  response, err := broker.FindCoordinator(request)

                  调整消费组的地方会用到,admin.go

                    func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error)
                    controller, err := ca.client.Coordinator(group)
                    response, err := broker.DescribeGroups(&DescribeGroupsRequest{
                    Groups: brokerGroups,
                    })
                    func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
                    return coordinator.FetchOffset(request)
                    func (ca *clusterAdmin) DeleteConsumerGroup(group string) error
                    coordinator, err := ca.client.Coordinator(group)
                    resp, err := coordinator.DeleteGroups(request)

                    创建session的时候也会用到consumer_group.go

                      func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
                      coordinator, err := c.client.Coordinator(c.groupID)
                      return c.newSession(ctx, topics, handler, retries)
                      }

                      离开也会用到

                        func (c *consumerGroup) leave() error {
                        c.lock.Lock()
                        defer c.lock.Unlock()
                        if c.memberID == "" {
                        return nil
                        }


                        coordinator, err := c.client.Coordinator(c.groupID)

                        心跳维持

                          func (s *consumerGroupSession) heartbeatLoop() {
                          for {
                          coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
                              }
                          }

                          3,any 任意broker

                            func (client *client) any() *Broker
                            _ = client.seedBrokers[0].Open(client.conf)
                            _ = broker.Open(client.conf)

                            4,Leader

                            写相关的操作都是先写到leader partation,然后同步到副本分区。

                              func (client *client) Leader(topic string, partitionID int32) (*Broker, error)
                              leader, err := client.cachedLeader(topic, partitionID)
                                func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error)
                                _ = b.Open(client.conf)

                                使用的地方如下:

                                admin.go

                                  func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error  

                                  async_producer.go

                                    func (pp *partitionProducer) dispatch() 
                                    func (pp *partitionProducer) updateLeader() error
                                    func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) 

                                    consumer.go

                                      func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 
                                      child.broker = c.refBrokerConsumer(leader)
                                      bc = c.newBrokerConsumer(broker)
                                      func (child *partitionConsumer) preferredBroker() (*Broker, error)  

                                      上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama有重要的意义。


                                      文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                      评论