暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入解析:如何轻松扩展Maxwell,教你实现二次开发实现自定义功能

编程与架构 2024-08-24
130

实现Maxwell自定义Producer发送数据到指定HTTP接口

在数据同步和实时处理的场景中,Maxwell作为一个开源的MySQL binlog解析工具,可以将数据库变更捕获并输出到不同的目标系统。为了满足特定的业务需求,有时我们需要将这些数据发送到自定义的HTTP接口。本篇博客将通过扩展Maxwell的Producer功能,详细介绍如何开发自定义的Producer,将捕获的数据发送到指定的HTTP接口。

文章将从Maxwell源码的编译配置入手,逐步讲解如何定义自定义的参数、修改Maxwell的核心类、创建自定义Producer类,并最终实现自定义数据发送。最后,我们将展示如何部署、运行、测试该扩展功能。通过本文,你将掌握Maxwell自定义Producer开发的全过程,并能够根据自己的业务需求扩展其他目标Producer。

1. 功能介绍

通过扩展Maxwell Producer的功能,将同步的数据发送到指定的HTTP接口。这种扩展方式具有很强的灵活性,后续可以很方便地扩展为发送数据到其他不同类型的Producer。

2. 项目地址

GitHub地址: Maxwell

本次修改基于Maxwell
源码版本:1.38.0

3. 开始编译

3.1 配置IDEA的JDK版本为JDK 11

在进行编译前,首先需要确认你的项目JDK版本是11,否则会导致编译报错。以下是具体步骤:

错误示例

如果你使用的是低于JDK 11的版本,可能会看到如下错误提示:

错误提示

3.2 配置JDK 11步骤

  1. 1. 打开 File -> Project Structure

  2. 2. 添加JDK版本,并选择 JDK 11

  3. 3. 确保插件和编译器均使用相同版本的JDK。

添加JDK

4. 开发自定义Producer

本文重点介绍如何为Maxwell项目新增一个自定义的Producer功能,并将数据发送到指定的HTTP接口。通过阅读此内容,你将掌握如何扩展Maxwell Producer并适应不同的需求。

4.1 定义自定义Producer的必要参数

我们首先需要定义自定义Producer使用的参数:

参数说明
http_url请求接口的URL
header设置请求头

注意:不要与现有的Maxwell参数冲突,这些参数应被添加到 com.zendesk.maxwell.MaxwellConfig
 类中。

4.2 将参数加入Maxwell源码

4.2.1 修改 com.zendesk.maxwell.MaxwellConfig
 类

首先添加自定义的参数变量:

public String httpUrl;
public String header;

然后在 buildOptionParser
 方法中,添加参数的定义:

parser.section("restful");
parser.accepts( "http_url""URL for sending data" ).withRequiredArg();
parser.accepts( "header""Request headers for HTTP requests" ).withRequiredArg();

最后,在 setup
 方法中初始化这些参数:

this.httpUrl = fetchStringOption("http_url", options, properties, null);
this.header = fetchStringOption("header", options, properties, null);

4.2.2 修改 com.zendesk.maxwell.MaxwellContext
 类

在 getProducer
 方法中添加我们自定义的Producer逻辑:

case "restful":
    this.producer = new MaxwellRestFulProducer(this);
    break;

4.3 创建自定义的Producer类

为了创建一个自定义Producer类,你需要继承 AbstractProducer
 并实现 StoppableTask
 接口。

自定义Producer代码

public class MaxwellRestFulProducer extends AbstractProducer implements StoppableTask {
    private static final Logger logger = LoggerFactory.getLogger(MaxwellRestFulProducer.class);
    private final String httpUrl;
    private final String header;
    private List<BasicHeader> headerList;

    public MaxwellRestFulProducer(MaxwellContext context) {
        super(context);
        this.httpUrl = context.getConfig().httpUrl;
        this.header = context.getConfig().header;
        this.headerList = parseHeaders(header);
    }

    private List<BasicHeader> parseHeaders(String header) {
        List<BasicHeader> headerList = new ArrayList<>();
        if (StringUtils.isNotBlank(header)) {
            JSONObject jsonObject = new JSONObject(new String(Base64.getDecoder().decode(header), StandardCharsets.UTF_8));
            jsonObject.keys().forEachRemaining(key -> {
                headerList.add(new BasicHeader(key, jsonObject.getString(key)));
            });
        }
        return headerList;
    }

    @Override
    public void push(RowMap r) throws Exception {
        if (!r.shouldOutput(outputConfig)) return;
        String messageStr = r.toJSON(outputConfig);
        sendToRestFul(messageStr);
    }

    private void sendToRestFul(String message) throws IOException {
        try (CloseableHttpClient httpClient = HttpClientBuilder.create().build()) {
            HttpPost httpPost = new HttpPost(httpUrl);
            headerList.forEach(httpPost::addHeader);
            httpPost.setEntity(new StringEntity(message, StandardCharsets.UTF_8));
            try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
                logger.info("HTTP响应内容为: {}", EntityUtils.toString(response.getEntity()));
            }
        }
    }

    @Override
    public void requestStop() { }

    @Override
    public void awaitStop(Long timeout) { }
}

4.4 打包项目

项目配置完成后,使用 maven
 进行打包:

mvn clean package

打包成功后会生成 maxwell-1.38.0.tar.gz
 文件。

5. 开发消息接收接口

我们需要实现一个简单的RESTful接口来接收Maxwell Producer发送的消息。

@RestController
public class MessageController {
    @PostMapping("/data/send")
    public String receiveData(@RequestBody String json) {
        System.out.println("收到的数据: " + json);
        return "OK";
    }
}

6. 运行和测试

6.1 部署Java接收服务

将上述代码部署到服务器中,启动服务。

6.2 启动Maxwell并指定Producer参数

使用以下命令启动Maxwell:

bin/maxwell --user='maxwell' --password='123456' --host='192.168.4.143' --producer=restful --http_url=http://127.0.0.1:20081/data/send 

在控制台中,你将看到如下日志输出,表示服务启动成功:

INFO MaxwellRestFulProducer - HTTP响应内容为: OK

6.3 测试数据同步

修改MySQL数据库中的数据,观察日志,确保Producer将数据成功发送到指定接口。

7. 总结

本文介绍了如何通过扩展Maxwell Producer,将数据同步到指定HTTP接口。通过这些步骤,可以灵活地将Maxwell Producer扩展到多种应用场景。希望本文对你有所帮助!


欢迎关注我的公众号“编程与架构”,原创技术文章第一时间推送。


文章转载自编程与架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论