暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于DolphinScheduler抽取通用EventBus组件:支持延迟与事件驱动

海豚调度 2025-05-20
101

一、思路来源

虽然guava中的eventbus已经很方便了,但是还是想要实现一个更为方便,同时支持延迟事件、同时带eventbus的组件。在Apache DolphinScheduler项目中,有一个eventbus的组件,这个组件写得挺好的,想着用在业务系统上,因此自己抽取了一下,拿到业务系统中来用。话不多说,我们把它抽取出来吧,同时进行demo的运行。还是要感谢Apache DolphinScheduler的开源,让这个很简单,但是很高效的组件能够让我们便捷地使用。

二、具体实现过程

首先是定义事件接口:

public interface IEvent {
}

针对事件接口,我们抽象出共性方法接口:延迟时间和过期时间

public abstractclass AbstractDelayEvent implements IEventDelayed {
    privatefinallong delayTime;
    privatefinallong expireTime;


    public long getDelayTime() {
        return delayTime;
    }

    public long getExpireTime() {
        return expireTime;
    }

    public AbstractDelayEvent(long delayTime) {
        this.delayTime = delayTime;
        this.expireTime = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expireTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.expireTime < ((AbstractDelayEvent) o).expireTime) {
            return -1;
        }
        if (this.expireTime > ((AbstractDelayEvent) o).expireTime) {
            return1;
        }
        return0;
    }
}

主要的信息:

定义eventbus中,我们需要使用的方法:

public interface IEventBus<T extends IEvent{

    void publish(T event);

    Optional<T> poll() throws InterruptedException;

    Optional<T> peek();

    Optional<T> remove();

    boolean isEmpty();

    int size();
}

可以看到主要是:发布事件、消费、移除、删除、判断当前的事件是否为空,以及事件大小等方法。其中最重要的方法为发布事件和消费处理事件方法。

针对当前的事件bus接口进行抽象,抽取出共性方法,方便复用:

public abstractclass AbstractDelayEventBus<T extends AbstractDelayEventimplements IEventBus<T{

    protectedfinal DelayQueue<T> delayEventQueue = new DelayQueue<>();

    @Override
    public void publish(T event) {
        delayEventQueue.put(event);
    }

    @Override
    public Optional<T> poll() throws InterruptedException {
        // 使用带超时的 poll 方法,等待事件到期
        return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS));
    }

    @Override
    public Optional<T> peek() {
        return Optional.ofNullable(delayEventQueue.peek());
    }

    @Override
    public Optional<T> remove() {
        return Optional.ofNullable(delayEventQueue.poll());
    }

    @Override
    public boolean isEmpty() {
        return delayEventQueue.isEmpty();
    }

    @Override
    public int size() {
        return delayEventQueue.size();
    }
}

三、测试运行效果

接下来,我们使用它,来进行处理:

定义自己的延迟事件:

如果是在业务中,可以定义自己的业务数据信息事件对象

public class MyDelayEvent extends AbstractDelayEvent {
    private final String message;

    public MyDelayEvent(long delayTime, String message) {
        super(delayTime);
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

定义事件延迟事件bus

当然也可以进行自己的可定制化特性。

public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent{
    // 不需要额外的修改
}

进行测试:

思路:创建事件总线、发布事件,然后针对发布的事件信息,进行消费,然后等待延迟时间的到来,从而实现消费,从而进行业务的处理。

import java.util.Optional;

publicclass EventBusExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建事件总线
        IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus();

        // 发布单个事件
        eventBus.publish(new MyDelayEvent(100"Single Event"));
        System.out.println("After publish, event bus size: " + eventBus.size());

        // 持续尝试消费事件
        while (true) {
            Optional<MyDelayEvent> event = eventBus.poll();
            if (event.isPresent()) {
                System.out.println("Received event: " + event.get().getMessage());
            } else {
                System.out.println("No event received within the timeout.");
                break;
            }
        }

        // 检查总线大小
        System.out.println("Event bus size: " + eventBus.size());
    }
}

运行结果:

可以看到实现自己的业务逻辑还是很方便的,可以自己实现吧,这里给出的代码是可以运行的。

源码地址:https://gitee.com/null_713_2407/pratice

参考

github:https://github.com/apache/dolphinscheduler

转载自后端技术学习

作者 | 刘亚洲




用户案例



网易邮箱 每日互动 惠生工程  作业帮 
博世智驾 蔚来汽车 长城汽车集度长安汽车
思科网讯食行生鲜联通医疗联想
新网银行唯品富邦消费金融 
自如有赞伊利当贝大数据
珍岛集团传智教育Bigo
YY直播  三合一太美医疗
Cisco Webex兴业证券




迁移实战



Azkaban   Ooize(当贝迁移案例)
Airflow (有赞迁移案例)
Air2phin(迁移工具)
Airflow迁移实践



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.3.0 Alpha发布,功能增强与性能优化大升级!




加入社区



关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

📂非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

👩‍💻代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


你的好友秀秀子拍了拍你

并请你帮她点一下“分享”

文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论