暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka原生JavaScript客户端来了

 

Confluent 官方宣布将推出 Kafka 原生官方 JavaScript 客户端 CJSK,它将与 Apache Kafka 保持同步更新。

Confluent 官方仅支持 Java 客户端,但维护了非 Java 版的 librdkafka,因此在 C、C++、Rust、Python、Go 等语言中也有很好的集成。这些语言无一例外引用了 librdkafka,且是服务器端语言,在客户端语言如 JavaScript 确实是空白的。

虽然有一些 Node.js Kafka 客户端,比如 KafkaJS、node-rdkafka、kafka-node,但要么有些库不再更新,或者使用方式与官方 API 差异较大,因此官方决定推出 Kafka JavaScript 客户端。

对于开发者可以选择基于Promise的API,比较符合习惯用法。

使用 Node.js 创建一个生产者 producer.js:

const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;

const {
AvroSerializer,
SerdeType,
SchemaRegistryClient,
} = require("@confluentinc/schemaregistry");

asyncfunctionproducerDemo() {
const schemaRegistryClient = newSchemaRegistryClient({
   baseURLs: ["<SR endpoint>"],
   basicAuthCredentials: {
     credentialsSource"USER_INFO",
     userInfo:
       "<SR_API_KEY>:<SR_API_SECRET",
   },
 });

const kafka = newKafka({
   kafkaJS: {
     clientId"cjsk-blog-demo-producer",
     brokers: ["<bootstrap_server_url>"],
     ssltrue,
     sasl: {
       mechanism"plain",
       username"<API_KEY>",
       password:
         "API_SECRET",
     },
   },
 });

const producer = kafka.producer();

# schema 
const schemaString = JSON.stringify({
   type"record",
   name"Order",
   fields: [
     { name"region"type"string" },
     { name"item_type"type"string" },
   ],
 });

const schemaInfo = {
   schemaType"AVRO",
   schema: schemaString,
 };

const userTopic = "cjsk-blog-demo-topic";
await schemaRegistryClient.register(userTopic + "-value", schemaInfo);

 # 生产消息
const orderInfo = {
   region"CA",
   item_type"accessory",
 };

const avroSerializerConfig = { useLatestVersiontrue };

const serializer = newAvroSerializer(
   schemaRegistryClient,
   SerdeType.VALUE,
   avroSerializerConfig
 );

const outgoingMessage = {
   key"User4",
   valueawait serializer.serialize(userTopic, orderInfo),
 };

console.log("Outgoing message: ", outgoingMessage);

await producer.connect();

await producer.send({
   topic: userTopic,
   messages: [outgoingMessage],
 });

await producer.disconnect();
}

producerDemo();

安装并允许:

npm install @confluentinc/kafka-javascript @confluentinc/schema-registry
node producer.js

消费部分就不演示了。

文章转载自虞大胆的叽叽喳喳,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论