实现Maxwell自定义Producer发送数据到指定HTTP接口
在数据同步和实时处理的场景中,Maxwell作为一个开源的MySQL binlog解析工具,可以将数据库变更捕获并输出到不同的目标系统。为了满足特定的业务需求,有时我们需要将这些数据发送到自定义的HTTP接口。本篇博客将通过扩展Maxwell的Producer功能,详细介绍如何开发自定义的Producer,将捕获的数据发送到指定的HTTP接口。
文章将从Maxwell源码的编译配置入手,逐步讲解如何定义自定义的参数、修改Maxwell的核心类、创建自定义Producer类,并最终实现自定义数据发送。最后,我们将展示如何部署、运行、测试该扩展功能。通过本文,你将掌握Maxwell自定义Producer开发的全过程,并能够根据自己的业务需求扩展其他目标Producer。
1. 功能介绍
通过扩展Maxwell Producer的功能,将同步的数据发送到指定的HTTP接口。这种扩展方式具有很强的灵活性,后续可以很方便地扩展为发送数据到其他不同类型的Producer。
2. 项目地址
GitHub地址: Maxwell
本次修改基于Maxwell
源码版本:1.38.0
3. 开始编译
3.1 配置IDEA的JDK版本为JDK 11
在进行编译前,首先需要确认你的项目JDK版本是11,否则会导致编译报错。以下是具体步骤:
错误示例
如果你使用的是低于JDK 11的版本,可能会看到如下错误提示:
错误提示
3.2 配置JDK 11步骤
1. 打开 File -> Project Structure。
2. 添加JDK版本,并选择 JDK 11。
3. 确保插件和编译器均使用相同版本的JDK。

4. 开发自定义Producer
本文重点介绍如何为Maxwell项目新增一个自定义的Producer功能,并将数据发送到指定的HTTP接口。通过阅读此内容,你将掌握如何扩展Maxwell Producer并适应不同的需求。
4.1 定义自定义Producer的必要参数
我们首先需要定义自定义Producer使用的参数:
| 参数 | 说明 |
| http_url | 请求接口的URL |
| header | 设置请求头 |
注意:不要与现有的Maxwell参数冲突,这些参数应被添加到 com.zendesk.maxwell.MaxwellConfig
类中。
4.2 将参数加入Maxwell源码
4.2.1 修改 com.zendesk.maxwell.MaxwellConfig
类
首先添加自定义的参数变量:
public String httpUrl;
public String header;
然后在 buildOptionParser
方法中,添加参数的定义:
parser.section("restful");
parser.accepts( "http_url", "URL for sending data" ).withRequiredArg();
parser.accepts( "header", "Request headers for HTTP requests" ).withRequiredArg();
最后,在 setup
方法中初始化这些参数:
this.httpUrl = fetchStringOption("http_url", options, properties, null);
this.header = fetchStringOption("header", options, properties, null);
4.2.2 修改 com.zendesk.maxwell.MaxwellContext
类
在 getProducer
方法中添加我们自定义的Producer逻辑:
case "restful":
this.producer = new MaxwellRestFulProducer(this);
break;
4.3 创建自定义的Producer类
为了创建一个自定义Producer类,你需要继承 AbstractProducer
并实现 StoppableTask
接口。
自定义Producer代码
public class MaxwellRestFulProducer extends AbstractProducer implements StoppableTask {
private static final Logger logger = LoggerFactory.getLogger(MaxwellRestFulProducer.class);
private final String httpUrl;
private final String header;
private List<BasicHeader> headerList;
public MaxwellRestFulProducer(MaxwellContext context) {
super(context);
this.httpUrl = context.getConfig().httpUrl;
this.header = context.getConfig().header;
this.headerList = parseHeaders(header);
}
private List<BasicHeader> parseHeaders(String header) {
List<BasicHeader> headerList = new ArrayList<>();
if (StringUtils.isNotBlank(header)) {
JSONObject jsonObject = new JSONObject(new String(Base64.getDecoder().decode(header), StandardCharsets.UTF_8));
jsonObject.keys().forEachRemaining(key -> {
headerList.add(new BasicHeader(key, jsonObject.getString(key)));
});
}
return headerList;
}
@Override
public void push(RowMap r) throws Exception {
if (!r.shouldOutput(outputConfig)) return;
String messageStr = r.toJSON(outputConfig);
sendToRestFul(messageStr);
}
private void sendToRestFul(String message) throws IOException {
try (CloseableHttpClient httpClient = HttpClientBuilder.create().build()) {
HttpPost httpPost = new HttpPost(httpUrl);
headerList.forEach(httpPost::addHeader);
httpPost.setEntity(new StringEntity(message, StandardCharsets.UTF_8));
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
logger.info("HTTP响应内容为: {}", EntityUtils.toString(response.getEntity()));
}
}
}
@Override
public void requestStop() { }
@Override
public void awaitStop(Long timeout) { }
}
4.4 打包项目
项目配置完成后,使用 maven
进行打包:
mvn clean package
打包成功后会生成 maxwell-1.38.0.tar.gz
文件。
5. 开发消息接收接口
我们需要实现一个简单的RESTful接口来接收Maxwell Producer发送的消息。
@RestController
public class MessageController {
@PostMapping("/data/send")
public String receiveData(@RequestBody String json) {
System.out.println("收到的数据: " + json);
return "OK";
}
}
6. 运行和测试
6.1 部署Java接收服务
将上述代码部署到服务器中,启动服务。
6.2 启动Maxwell并指定Producer参数
使用以下命令启动Maxwell:
bin/maxwell --user='maxwell' --password='123456' --host='192.168.4.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send
在控制台中,你将看到如下日志输出,表示服务启动成功:
INFO MaxwellRestFulProducer - HTTP响应内容为: OK
6.3 测试数据同步
修改MySQL数据库中的数据,观察日志,确保Producer将数据成功发送到指定接口。
7. 总结
本文介绍了如何通过扩展Maxwell Producer,将数据同步到指定HTTP接口。通过这些步骤,可以灵活地将Maxwell Producer扩展到多种应用场景。希望本文对你有所帮助!
欢迎关注我的公众号“编程与架构”,原创技术文章第一时间推送。





