1.方法1,使用自带工具:
- 查看消费信息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.134:9092 --property print.key=true --topic connect-offsets --from-beginning - 往topic生产消息
bin/kafka-console-producer.sh --broker-list 192.168.253.135:9092 --topic connect-offsets --property parse.key=true
[“connector-mysql_new”,{“server”:“testadmin”}] {“ts_sec”:1606132362,“file”:“mysql-bin1og.000064”,“pos”:4,“gtids”:“5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23655”,“row”:1,“server_id”:3306100,“event”:2}
2.方法2,使用python
1)python3版
from kafka import KafkaProducer
import json
from kafka.errors import KafkaError
def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)
data={
"ts_sec":1605529559,
"file":"mysql-bin1og.000041",
"pos":259,
"gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-22650",
"row":1,
"server_id":3306100,
"event":2}
keynm = ["connector-mysql_new",{"server":"testadmin"}]
try:
producer.send('connect-offsets', key=keynm,value=data)
producer.close()
except KafkaError as e:
print(e)
if name == “main”:
kafka_producer()
- python2 版本
from kafka import KafkaProducer
import json
from kafka.errors import KafkaError
def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)
data={
"ts_sec":1606132362,
"file":"mysql-bin1og.000083",
"pos":4,
"gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23672",
"row":1,
"server_id":3306100,
"event":2}
keynm = ["connector-mysql_new",{"server":"testadmin"}]
try:
producer.send('connect-offsets', key=keynm,value=data)
producer.close()
except KafkaError as e:
print e
if name == “main”:
kafka_producer()




