暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:sarama kafka client(part I:生产者)

https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:

    examples
    mocks
    tools //基于客户端,实现的kafka客户端工具
    tools/kafka-producer-performance
    tools/kafka-console-producer
    tools/kafka-console-partitionconsumer
    tools/kafka-console-consumer
    vagrant //启动虚拟机的配置文件
    acl_xx.go //权限相关的逻辑
    add_partitions_to_txn_response.go
    add_partitions_to_txn_request.go
    add_offsets_to_txn_response.go
    add_offsets_to_txn_request.go
    admin.go
    alter_xx.go //修改相关的逻辑
    async_producer.go
    balance_strategy.go
    broker.go
    client.go
    config.go
    consumer_group.go
    consumer_group_members.go
    consumer.go
    create_xx.go
    fetch_request.go
    delete_xx.go
    describe_xx.go
    list_xx.go
    offset_xx.go
    partitioner.go
    sarama.go
    sync_producer.go
    produce_request.go
    produce_response.go
    utils.go

    其实我们重点关注下面几个文件就好了

      admin.go
      async_producer.go
      broker.go
      client.go
      consumer_group.go
      consumer.go
      sync_producer.go

      还是从例子开始:


      生产者

        package main


        import (
        "fmt"
        "log"


        "github.com/Shopify/sarama"
        )


        func main() {
        // 构建 生产者
        // 生成 生产者配置文件
        config := sarama.NewConfig()
        // 设置生产者 消息 回复等级 0 1 all
        config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll
        //kafka server: Replication-factor is invalid.
        // 设置生产者 成功 发送消息 将在什么 通道返回
        config.Producer.Return.Successes = true
        // 设置生产者 发送的分区
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        // 构建 消息
        msg := &sarama.ProducerMessage{}
        msg.Topic = "test"
        msg.Value = sarama.StringEncoder("123")


        // 连接 kafka
        producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
        if err != nil {
        log.Print(err)
        return
        }
        defer producer.Close()
        // 发送消息
        message, offset, err := producer.SendMessage(msg)
        if err != nil {
        log.Println(err)
        return
        }
          fmt.Println(message, " ", offset)
        }

        1,创建一个生产者:sarama.NewSyncProducer

        代码在sync_producer.go中

          func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
          if err := verifyProducerConfig(config); err != nil {
            }
          p, err := NewAsyncProducer(addrs, config)
            return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
          }

          先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer

            type syncProducer struct
            {
            producer *asyncProducer
            wg sync.WaitGroup
            }

            可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步

            NewAsyncProducer的代码实现在async_producer.go中:

              func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
              client, err := NewClient(addrs, conf)
              if err != nil {
              return nil, err
              }
              return newAsyncProducer(client)
              }

              首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中

                // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
                // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
                // automatically when it passes out of scope. It is safe to share a client amongst many
                // users, however Kafka will process requests from a single client strictly in serial,
                // so it is generally more efficient to use the default one client per producer/consumer.
                type Client interface {
                // Config returns the Config struct of the client. This struct should not be
                // altered after it has been created.
                Config() *Config


                // Controller returns the cluster controller broker. It will return a
                // locally cached value if it's available. You can call RefreshController
                // to update the cached value. Requires Kafka 0.10 or higher.
                Controller() (*Broker, error)


                // RefreshController retrieves the cluster controller from fresh metadata
                // and stores it in the local cache. Requires Kafka 0.10 or higher.
                RefreshController() (*Broker, error)


                // Brokers returns the current set of active brokers as retrieved from cluster metadata.
                Brokers() []*Broker


                // Broker returns the active Broker if available for the broker ID.
                Broker(brokerID int32) (*Broker, error)


                // Topics returns the set of available topics as retrieved from cluster metadata.
                Topics() ([]string, error)


                // Partitions returns the sorted list of all partition IDs for the given topic.
                Partitions(topic string) ([]int32, error)


                // WritablePartitions returns the sorted list of all writable partition IDs for
                // the given topic, where "writable" means "having a valid leader accepting
                // writes".
                WritablePartitions(topic string) ([]int32, error)


                // Leader returns the broker object that is the leader of the current
                // topic/partition, as determined by querying the cluster metadata.
                Leader(topic string, partitionID int32) (*Broker, error)


                // Replicas returns the set of all replica IDs for the given partition.
                Replicas(topic string, partitionID int32) ([]int32, error)


                // InSyncReplicas returns the set of all in-sync replica IDs for the given
                // partition. In-sync replicas are replicas which are fully caught up with
                // the partition leader.
                InSyncReplicas(topic string, partitionID int32) ([]int32, error)


                // OfflineReplicas returns the set of all offline replica IDs for the given
                // partition. Offline replicas are replicas which are offline
                OfflineReplicas(topic string, partitionID int32) ([]int32, error)


                // RefreshBrokers takes a list of addresses to be used as seed brokers.
                // Existing broker connections are closed and the updated list of seed brokers
                // will be used for the next metadata fetch.
                RefreshBrokers(addrs []string) error


                // RefreshMetadata takes a list of topics and queries the cluster to refresh the
                // available metadata for those topics. If no topics are provided, it will refresh
                // metadata for all topics.
                RefreshMetadata(topics ...string) error


                // GetOffset queries the cluster to get the most recent available offset at the
                // given time (in milliseconds) on the topic/partition combination.
                // Time should be OffsetOldest for the earliest available offset,
                // OffsetNewest for the offset of the message that will be produced next, or a time.
                GetOffset(topic string, partitionID int32, time int64) (int64, error)


                // Coordinator returns the coordinating broker for a consumer group. It will
                // return a locally cached value if it's available. You can call
                // RefreshCoordinator to update the cached value. This function only works on
                // Kafka 0.8.2 and higher.
                Coordinator(consumerGroup string) (*Broker, error)


                // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
                // in local cache. This function only works on Kafka 0.8.2 and higher.
                RefreshCoordinator(consumerGroup string) error


                // InitProducerID retrieves information required for Idempotent Producer
                InitProducerID() (*InitProducerIDResponse, error)


                // Close shuts down all broker connections managed by this client. It is required
                // to call this function before a client object passes out of scope, as it will
                // otherwise leak memory. You must close any Producers or Consumers using a client
                // before you close the client.
                Close() error


                // Closed returns true if the client has already had Close called on it
                Closed() bool
                }

                然后创建了一个asyncProducer对象

                  type asyncProducer struct {
                  client Client
                  conf *Config


                  errors chan *ProducerError
                  input, successes, retries chan *ProducerMessage
                  inFlight sync.WaitGroup


                  brokers map[*Broker]*brokerProducer
                  brokerRefs map[*brokerProducer]int
                  brokerLock sync.Mutex


                  txnmgr *transactionManager
                  }

                  transactionManager是它的成员

                    type transactionManager struct {
                    producerID int64
                    producerEpoch int16
                    sequenceNumbers map[string]int32
                    mutex sync.Mutex
                    }

                    创建完producer对象后起了两个协程

                      func newAsyncProducer(client Client) (AsyncProducer, error) {
                      ....
                      go withRecover(p.dispatcher)
                      go withRecover(p.retryHandler)
                      }

                      重点关注下peoducer的input成员

                        input, successes, retries chan *ProducerMessage

                        dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了

                          func (p *asyncProducer) dispatcher() {
                          for msg := range p.input {
                          handler = p.newTopicProducer(msg.Topic)
                          handler <- msg
                          }
                          }

                          这里面分两步:

                          1,获取topicProducer,返回topicProducer的input  chanel

                          2,向这个chanel里发送消息。

                            // one per topic
                            // partitions messages, then dispatches them by partition
                            type topicProducer struct {
                            parent *asyncProducer
                            topic string
                            input <-chan *ProducerMessage


                            breaker *breaker.Breaker
                            handlers map[int32]chan<- *ProducerMessage
                            partitioner Partitioner
                            }

                            每一个topicProducer同样会起一个协程

                              func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage 
                              input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
                              go withRecover(tp.dispatch)
                              }

                              dispatch 方法的内容很相似,把收到的消息转发给partitionProducer

                                func (tp *topicProducer) dispatch(){
                                for msg := range tp.input {
                                handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
                                handler <- msg
                                     }
                                }

                                接着看下partitionProducer做了什么:

                                  // one per partition per topic
                                  // dispatches messages to the appropriate broker
                                  // also responsible for maintaining message order during retries
                                  type partitionProducer struct {
                                  parent *asyncProducer
                                  topic string
                                  partition int32
                                  input <-chan *ProducerMessage


                                  leader *Broker
                                  breaker *breaker.Breaker
                                  brokerProducer *brokerProducer


                                  // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
                                  // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
                                  // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
                                  // therefore whether our buffer is complete and safe to flush)
                                  highWatermark int
                                  retryState []partitionRetryState
                                  }

                                    func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
                                    input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
                                    go withRecover(pp.dispatch)
                                    return input
                                    }

                                    没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:

                                      func (pp *partitionProducer) dispatch() {
                                      pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
                                      pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
                                         pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
                                         for msg := range pp.input {
                                          pp.brokerProducer.input <- msg
                                         }
                                      }

                                      对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。

                                        func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
                                        bp = p.newBrokerProducer(broker)
                                        }
                                          // groups messages together into appropriately-sized batches for sending to the broker
                                          // handles state related to retries etc
                                          type brokerProducer struct {
                                          parent *asyncProducer
                                          broker *Broker


                                          input chan *ProducerMessage
                                          output chan<- *produceSet
                                          responses <-chan *brokerProducerResponse
                                          abandoned chan struct{}
                                          stopchan chan struct{}


                                          buffer *produceSet
                                          timer <-chan time.Time
                                          timerFired bool


                                          closing error
                                          currentRetries map[string]map[int32]error
                                          }

                                          brokerProducer同样起了两个协程

                                            func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
                                            go withRecover(bp.run)
                                            // minimal bridge to make the network response `select`able
                                            go withRecover(func() {
                                            for set := range bridge {
                                            request := set.buildRequest()

                                            response, err := broker.Produce(request)

                                            responses <- &brokerProducerResponse{
                                            set: set,
                                            err: err,
                                            res: response,
                                            }
                                            }
                                            close(responses)
                                            })
                                            }

                                            run是一个循环,不断从input消费message,为请求kafka做准备。

                                              func (bp *brokerProducer) run() {
                                              for {
                                              select {
                                                   case msg, ok := <-bp.input:
                                                   bp.buffer.add(msg)
                                                          case output <- bp.buffer:
                                              bp.rollOver()
                                                          case response, ok := <-bp.responses:
                                              if ok {
                                              bp.handleResponse(response)
                                                             }
                                              }

                                              第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。

                                                func (ps *produceSet) buildRequest() *ProduceRequest {
                                                for topic, partitionSets := range ps.msgs {
                                                for partition, set := range partitionSets {
                                                req.AddBatch(topic, partition, rb)
                                                }
                                                    } 
                                                    req.AddMessage(topic, partition, compMsg)
                                                }

                                                message 定义在async_producer.go中

                                                  type Message struct {
                                                  Codec CompressionCodec // codec used to compress the message contents
                                                  CompressionLevel int // compression level
                                                  LogAppendTime bool // the used timestamp is LogAppendTime
                                                  Key []byte // the message key, may be nil
                                                  Value []byte // the message contents
                                                  Set *MessageSet // the message set a message might wrap
                                                  Version int8 // v1 requires Kafka 0.10
                                                  Timestamp time.Time // the timestamp of the message (version 1+ only)


                                                  compressedCache []byte
                                                  compressedSize int // used for computing the compression ratio metrics
                                                  }

                                                  接着就是发送消息

                                                    func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
                                                    response = new(ProduceResponse)
                                                    err = b.sendAndReceive(request, response)
                                                    }
                                                      func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
                                                      promise, err := b.send(req, res != nil, responseHeaderVersion)
                                                      }
                                                        func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
                                                        bytes, err := b.write(buf)
                                                        }

                                                        调用tcp连接发送数据

                                                          func (b *Broker) write(buf []byte) (n int, err error) {
                                                          return b.conn.Write(buf)
                                                          }

                                                          上面就是整个数据的传递路径。


                                                          2, producer.SendMessage 发送消息

                                                            func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
                                                            sp.producer.Input() <- msg
                                                            }

                                                            SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。


                                                            上面就是sarama的生产消息流程,总结下,核心流程如下:

                                                            syncProducer->topicProducer->partitionProducer->brokerProducer

                                                            消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka



                                                            文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                            评论