暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Debuting a Modern C++ API for Apache Kafka

大数据从业者 2021-05-16
853

Morgan Stanley uses Apache Kafka® to publish market data to internal clients and to persist it for replay purposes. We started out using librdkafka
’s C++ API, which maintains C++98 compatibility. C++ is evolving quickly, and we wanted to break away from this compatibility requirement so we could take advantage of new C++ features. This led us to create a new C++ API for Kafka that uses modern C++ features (i.e. C++14 and later). We’ve open sourced this client and hope you enjoy it.


An example producer from librdkafka

First, let’s take a look at an example of the librdkafka
 project, slightly stripped for brevity:

    // https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp
    #include "librdkafka/rdkafkacpp.h"

    int main (int argc, char **argv) {

    if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
    return 1;
    }


    std::string brokers = argv[1];
    std::string topic = argv[2];

    // Create configuration object
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    std::string errstr;

    // Set bootstrap broker(s).
    conf->set("bootstrap.servers", brokers, errstr);

    // Set the delivery report callback.
    // The callback is only triggered from ::poll() and ::flush().
    struct ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
    void dr_cb (RdKafka::Message &message) {
    /* If message.err() is non-zero the message delivery failed permanently for the message. */
    if (message.err())
    std::cerr << "% Message delivery failed: " << message.errstr() << std::endl;
    else
    std::cerr << "% Message delivered to topic " << message.topic_name() <<
    " [" << message.partition() << "] at offset " << message.offset() << std::endl;
    }
    } ex_dr_cb;


    conf->set("dr_cb", &ex_dr_cb, errstr);

    // Create a producer instance.
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

    delete conf;

    // Read messages from stdin and produce to the broker.
    std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;


    for (std::string line; std::getline(std::cin, line);) {
    // Send/Produce message. This is an asynchronous call,
    // on success it will only enqueue the message on the internal producer queue.
    retry:
    RdKafka::ErrorCode err =
    producer->produce(
    /* Topic name */
    topic,
    /* Any Partition */
    RdKafka::Topic::PARTITION_UA,
    /* Make a copy of the value */
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
    /* Value */
    const_cast<char*>(line.c_str()), line.size(),
    /* Key */
    NULL, 0,
    /* Timestamp (defaults to current time) */
    0,
    /* Message headers, if any */
    NULL,
    /* Per-message opaque value passed to delivery report */
    NULL);

    if (err != RdKafka::ERR_NO_ERROR) {
    std::cerr << "% Failed to produce to topic " << topic << ": " <<
    RdKafka::err2str(err) << std::endl;


    if (err == RdKafka::ERR__QUEUE_FULL) {
    // If the internal queue is full, wait for messages to be delivered and then retry.
    producer->poll(1000/*block for max 1000ms*/);
    goto retry;
    }
    } else {
    std::cout << "% Enqueued message (" << line.size() << " bytes) " <<
    "for topic " << topic << std::endl; } // A producer application should continually serve the delivery report queue by calling poll() at frequent intervals. producer->poll(0);

    if (line.empty()) break;
    }

    /* Wait for final messages to be delivered or fail. */
    std::cout << "% Flushing final messages..." << std::endl; producer->flush(10*1000 /* wait for max 10 seconds */);

    if (producer->outq_len() > 0)
    std::cerr << "% " << producer->outq_len() << "message(s) were not delivered" << std::endl;

    delete producer;
    }


    This program configures a Kafka producer, sends user-specified messages using the producer, and then waits until all messages are delivered or a timeout occurs. Finally, it closes the producer.

    Although this works, it doesn’t take advantage of modern C++ features:

    • Manual resource management (raw pointers) instead of “Resource Acquisition Is Initialization” (RAII)

    • Use of event loop with GOTO
      , instead of continuations popularized by Asio

    • Callbacks required to the subclass RdKafka::DeliveryReportCb
      , where a Lambda would have been enough

    Modern C++ features allow us to increase performance and usability, such as the following:

    • Smart pointers help make the lifetime management much easier for shallow-copied messages

    • Encapsulation hides internal queue management and complicated polling rules

    • Object-oriented interfaces are used to replace the long parameter lists for functions

    A modern Kafka producer

    Let’s dive into the modern C++ API that we built: modern-cpp-kafka
    , available on GitHub.

    Reimplement the previously shown functionality but using the modern-cpp-kafka
     API. First, let’s use the synchronous producer:

      // https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_sync_producer.cc
      #include "kafka/KafkaProducer.h"

      #include <iostream>
      #include <string>

      int main(int argc, char **argv)
      {
      if (argc != 3) {
      std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
      return 1;
      }

      std::string brokers = argv[1];
      kafka::Topic topic = argv[2];

      // Create configuration object
      kafka::Properties props({
      {"bootstrap.servers", brokers},
      {"enable.idempotence", "true"},
      });

      // Create a producer instance.
      kafka::KafkaSyncProducer producer(props);

      // Read messages from stdin and produce to the broker.
      std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;

      for (std::string line; std::getline(std::cin, line);) {
      // The ProducerRecord doesn't own `line`, it is just a thin wrapper
      auto record = kafka::ProducerRecord(topic,
      kafka::NullKey,
      kafka::Value(line.c_str(), line.size()));

      // Send the message.
      try {
      kafka::Producer::RecordMetadata metadata = producer.send(record);
      std::cout << "% Message delivered: " << metadata.toString() << std::endl;
      } catch (const kafka::KafkaException& e) {
      std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
      }

      if (line.empty()) break;
      };

      // producer.close(); No explicit close is needed, RAII will take care of it
      }


      There are several key differences:

      • RAII is used for lifetime management

      • Exceptions are used for error handling

      • Polling and queue management is now hidden

      • Naming matches the Java API, making it easier to learn if you know the other

      • The Properties
         instance has enable.idempotence=true
         configured, thus the producer will ensure that messages are successfully sent exactly once and in the original order

      But this isn’t perfect yet! The synchronous nature prevents us from sending multiple messages concurrently, and a slower network will quickly degrade the performance of our application. This brings us to the asynchronous producer:

          // Create a producer instance.
        kafka::KafkaAsyncProducer producer(props);

        // Read messages from stdin and produce to the broker.
        std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;

        for (std::string line; std::getline(std::cin, line);) {
        // The ProducerRecord doesn't own `line`, it is just a thin wrapper
        auto record = kafka::ProducerRecord(topic,
        kafka::NullKey,
        kafka::Value(line.c_str(), line.size()));

        // Send the message.
        producer.send(record,
        // The delivery report handler
        [](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
        if (!ec)
        std::cout << "% Message delivered: " << metadata.toString() << std::endl;
        else
        std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
        },
        // The memory block given by record.value() will be copied
        kafka::KafkaProducer::SendOption::ToCopyRecordValue);

        if (line.empty()) break;
        };


        With the asynchronous producer, we can have multiple messages in flight. We no longer have to derive a new class from a library-defined callback type: a regular Lambda can be used instead, improving readability and making the code more concise. The callback now takes std::error_code
         instead of RdKafka::ErrorCode
        , a more intuitive choice for modern C++ applications.

        producer.send(...)
         will keep waiting if the internal queue is full (on ERR__QUEUE_FULL
        ), but only as long as one message is either delivered or a timeout occurs, freeing up space in the internal queue.

        Unfortunately, now we have to copy the message. Let’s fix that:

           for (auto line = std::make_shared<std::string>();
          std::getline(std::cin, *line);
          line = std::make_shared<std::string>())
          {
          // The ProducerRecord doesn't own `line`, it is just a thin wrapper
          auto record = kafka::ProducerRecord(topic,
          kafka::NullKey,
          kafka::Value(line->c_str(), line->size()));

          // Send the message.
          producer.send(record,
          // The delivery report handler
          // Note: Here we capture the shared_pointer of `line`,
          // which holds the content for `record.value()`.
          // It makes sure the memory block is valid until the lambda finishes.
          [line](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
          if (!ec)
          std::cout << "% Message delivered: " << metadata.toString() << std::endl;
          else
          std::cerr << "% Message delivery failed: " << ec.message() << std::endl; });
          if (line->empty()) break;
          };


          Now the message is owned by a smart pointer that gets captured by the Lambda/callback that gets invoked after the message is delivered (or after an error occurs). Therefore, the message is kept alive as long as it is needed.

          A modern Kafka consumer

          So far, we managed to send some messages. Let’s see if we can consume them! KafkaAutoCommitConsumer
           is the simplest of all:

            // https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_auto_commit_consumer.cc
            #include "kafka/KafkaConsumer.h"

            #include <iostream>
            #include <stream>

            int main(int argc, char **argv)
            {
            if (argc != 3) {
            std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
            return 1;
            }

            std::string brokers = argv[1];
            kafka::Topic topic = argv[2];

            // Create configuration object
            kafka::Properties props ({
            {"bootstrap.servers", brokers},
            });

            // Create a consumer instance.
            kafka::KafkaAutoCommitConsumer consumer(props);

            // Subscribe to topics
            consumer.subscribe({topic});

            // Read messages from the topic.
            std::cout << "% Reading messages from topic: " << topic << std::endl;
            while (true) {
            auto records = consumer.poll(std::chrono::milliseconds(100));
            for (const auto& record: records) {
            // In this example, quit on empty message
            if (record.value().size() == 0) return 0;

            if (!record.error()) {
            std::cout << "% Got a new message..." << std::endl;
            std::cout << " Topic : " << record.topic() << std::endl;
            std::cout << " Partition: " << record.partition() << std::endl;
            std::cout << " Offset : " << record.offset() << std::endl;
            std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
            std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
            std::cout << " Key [" << record.key().toString() << "]" << std::endl;
            std::cout << " Value [" << record.value().toString() << "]" << std::endl;
            } else {
            // Errors are typically informational, thus no special handling is required
            std::cerr << record.toString() << std::endl;
            }
            }
            }

            // consumer.close(); // No explicit close is needed, RAII will take care of it
            }


            This example initializes a KafkaAutoCommitConsumer
            , subscribes to a given topic, and consumes messages until it receives an empty one. As expected, the destructor of the consumer properly cleans up its resources.

            An interesting detail of this consumer is the scheduling of commits, that is, when the consumer signals to the broker that a given message was successfully consumed. KafkaAutoCommitConsumer
             commits its position before each poll (not after the poll), effectively acknowledging the messages received during the previous poll. This ensures that even if the consumer crashes, unprocessed messages will not be acknowledged (assuming processing atomically completes between polls).

            To get more control over the scheduling of commits, KafkaManualCommitConsumer
             can be used:

                // Create a consumer instance.
              kafka::KafkaManualCommitConsumer consumer(props);

              // Subscribe to topics
              consumer.subscribe({topic});

              auto lastTimeCommitted = std::chrono::steady_clock::now();

              // Read messages from the topic.
              std::cout << "% Reading messages from topic: " << topic << std::endl;
              bool allCommitted = true;
              bool running = true;
              while (running) {
              auto records = consumer.poll(std::chrono::milliseconds(100));
              for (const auto& record: records) {
              // In this example, quit on empty message
              if (record.value().size() == 0) {
              running = false;
              break;
              }

              if (!record.error()) {
              std::cout << "% Got a new message..." << std::endl;
              std::cout << " Topic : " << record.topic() << std::endl;
              std::cout << " Partition: " << record.partition() << std::endl;
              std::cout << " Offset : " << record.offset() << std::endl;
              std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
              std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
              std::cout << " Key [" << record.key().toString() << "]" << std::endl;
              std::cout << " Value [" << record.value().toString() << "]" << std::endl;

              allCommitted = false;
              } else {
              // No special handling is required,
              // since the consumer will attempt to auto-recover.
              std::cerr << record.toString() << std::endl;
              }
              }


              if (!allCommitted) {
              auto now = std::chrono::steady_clock::now();
              if (now - lastTimeCommitted > std::chrono::seconds(1)) {
              // Commit offsets for messages polled
              std::cout << "% syncCommit offsets: " << kafka::Utility::getCurrentTime() << std::endl;
              consumer.commitSync(); // or commitAsync()

              lastTimeCommitted = now;
              allCommitted = true;
              }
              }
              }


              In this example, a manual commit happens roughly once per second. commitSync
               waits for commit acknowledgement, while commitAsync
               does not.

              Summary of modern-cpp-kafka
               and basic concepts

              The examples above provide an overview of the modern-cpp-kafka
               API.

              Let’s summarize the basic concepts:

              There are three Kafka clients: KafkaProducer
              KafkaConsumer
              , and AdminClient
               (not shown in this article).

              1. KafkaProducer

                • ProducerRecord
                  : The “message type” for a KafkaProducer
                   to send, constructed with Topic
                  Partition
                  Key
                  Value
                  , and Headers
                  .

                • Producer::Callback
                  : The callback method used to provide asynchronous handling of request completion. This method will be called when the record sent to the server has been acknowledged.

                • KafkaAsyncProducer
                  : Publishes records to the Kafka cluster asynchronously. Each send
                   operation requires a per-message Producer::Callback
                  .

                • KafkaSyncProducer
                  : Publishes records to the Kafka cluster synchronously. The send
                   operation does not return until the delivery is completed.

                • Producer::RecordMetadata
                  : The metadata for a record that has been acknowledged by the server. It contains Topic
                  Partitions
                  Offset
                  KeySize
                  ValueSize
                  Timestamp
                  , and PersistedStatus
                  . A KafkaAsyncProducer
                   passes this metadata as an input parameter of the Producer::Callback
                  KafkaSyncProducer
                   returns the metadata with the synchronized send
                   method.

              2. KafkaConsumer

                • ConsumerRecord
                  : The message type returned by a KafkaConsumer
                   instance. It contains Topic
                  Partition
                  Offset
                  Key
                  Value
                  Timestamp
                  , and Headers
                  .

                • KafkaAutoCommitConsumer
                  : Automatically commits previously polled offsets on each poll
                   operation.

                • KafkaManualCommitConsumer
                  : Provides manual commitAsync
                   and commitSync
                   methods to acknowledge messages.

              3. AdminClient
                : The administrative client for Kafka that supports managing and inspecting topics. Examples can be found on GitHub.

              Conclusion

              modern-cpp-kafka
               is a header-only C++ library that uses idiomatic C++ features to provide a safe, efficient, and easy way of producing and consuming Kafka messages.

              The modern-cpp-kafka
               project on GitHub has been thoroughly tested within Morgan Stanley. After we replaced a legacy implementation with it, throughput for a key middleware system improved by 26%.

              We are actively maintaining and improving the project. For example, the transactional
               interface is on the way and new components, such as streamer
               and connector
              , are also on the roadmap. If you’re interested in contributing, we’d be very happy to have you involved in the project, whether it’s raising an issue or submitting a PR.


              文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论