暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

连接器架构组件的使用

原创 老板,不要香菜 2023-12-11
224

在开发与使用连接器架构的组件时,可以通过注解来定义和使用,也可以通过中间件来配置相关组件来使用,下面我们就以消息队列的连接器来简单介绍一下各自使用情况。其中flyingserver-jms-kafka-rar-1.0连接器是实现了Jakarta连接器架构,同时也实现了JMS规范,这样可以通过JMS规范来使用kafka消息服务。

连接器部署

无论是注解使用还是中间件托管使用,首先都是要安装部署连接器的。下面我们以亚信科技的中间件产品 AISWare FlyingServer Web中间件为例。

进入FlyingServer控制台页面后,点击菜单【应用管理】-->点击【新增应用】,填写好应用名称,文件,部署目标,点击确定开始部署


部署完成后,可以在页面上看到记录与运行状态,至此连接器部署便完成了。


后续便可以通过注解定义与使用连接器了。但是中间件配置与使用的话,还需要做其他配置。

 

 

开发注解定义与使用

注解使用的前提是中间件服务已经部署并启用了对应的连接器,如下面的flyingserver-jms-kafka-rar-1.0

消息发送

定义ConnectionFactory可以通过注解@ConnectionFactoryDefinition来使用,如下:

@ConnectionFactoryDefinition(name = "java:module/env/ConnectionFactory",

  description = "Kafka Conn Factory",

  interfaceName = "javax.jms.ConnectionFactory",

  resourceAdapter = "flyingserver-jms-kafka-rar-1.0",

  transactionSupport = TransactionSupportLevel.NoTransaction,

  properties = { })

然后在需要使用的地方,可以通过@Resource注解来使用,如下:

@Resource(lookup="java:module/env/ConnectionFactory")

    ConnectionFactory factory;

其中ConnectionFactoryDefinition注解的name为资源的JNDI名称,要和Resource注解需要lookup的JNDI名称一致。然后就可以在代码中使用。

完整样例如下:

@ConnectionFactoryDefinition(name = "java:module/env/ConnectionFactory",

  description = "Kafka Conn Factory",

  interfaceName = "javax.jms.ConnectionFactory",

  resourceAdapter = " flyingserver -jms-kafka-rar-1.0",

  transactionSupport = TransactionSupportLevel.NoTransaction,

  properties = { })

@Stateless

public class SendKafkaMessage {

    private static Connection connection;

    @Resource(lookup="java:module/env/ConnectionFactory")

    ConnectionFactory factory;

 

    @Schedule(second = "*/10", hour="*", minute="*",persistent = false)

    public void sendMessage() throws Exception {

        try {

            if (connection == null) {

                connection = factory.createConnection();

            }

            connection.start();

            Session topicSession = connection.createSession(false, 1);

            MessageProducer producer = topicSession.createProducer(topicSession.createTopic("test"));

            String mes = "hello +" + UUID.randomUUID().toString();

            producer.send(topicSession.createTextMessage(mes));

        } catch (Exception ex) {

            Logger.getLogger(SendKafkaMessage.class.getName()).log(Level.SEVERE, null, ex);

        }

    }

}

 

消息消费

类似消息发送的使用方式:

{

...

MessageConsumer consumer = topicSession.createConsumer(topicSession.createTopic("test"));

            consumer.setMessageListener(new MessageListener() {

                @Override

                public void onMessage(Message message) {

 

                }

            });

Utils.threadPool.submit(new MessageHandler(“test”, consumer));

...

}

    class MessageHandler implements Runnable {

        String topicName;

        private MessageConsumer inconsumer;

 

        public MessageHandler(String topicName, MessageConsumer inconsumer) {

            this.topicName = topicName;

            this.inconsumer = inconsumer;

        }

 

        @Override

        public void run() {

            System.out.println("Subscribed to topic " + topicName);

            while (true) {

                TextMessage receive = null;

                try {

                    receive = (TextMessage)inconsumer.receive(5000);

                } catch (JMSException e) {

                    e.printStackTrace();

                }

                try {

                    if(receive!=null){

 

                    System.out.println("成功接收消息:" + receive.getText());

                    }

                } catch (JMSException e) {

                    e.printStackTrace();

                }

               

            }

        }

    }

 

管理对象使用

类似ConnectionFactoryDefinition,管理对象通过@AdministeredObjectDefinition来定义,然后通过@Resource来注入使用,

样例如下:

。。。。。

@AdministeredObjectDefinition(

                name = "java:global/jms/adminObjTopic",

                resourceAdapter = "flyingserver-jms-kafka-rar-1.0",

                interfaceName = "javax.jms.Topic",

                className = "com.asiainfo.mw.flyingserver.adapter.jms.kafka.KafkaJmsTopic",

                properties = {"topicName=test3"}

        )

。。。。。

    @Resource(lookup="java:global/jms/adminObjTopic")

Topic topic;

。。。。。。

MessageProducer producer = topicSession.createProducer(topic);

。。。。。。

 

 

 

中间件配置与使用

一般来讲,符合Jakarta EE规范的中间件,都支持连接器的使用,只是部署、配置方式大同小异,下面我们以亚信科技的中间件产品 AISWare FlyingServer Web中间件为例,来介绍一下如何在中间件产品中配置与使用连接器。

新建资源适配器

点击菜单【资源适配】-->点击【新增资源适配】,填写资源适配名称,配置线程池和属性,点击保存,完成资源适配器配置,可在列表上查看到数据。


需要注意的是:这里配置的属性是连接器的全局属性,优先级较低。

 


 

连接池

与EIS系统创建于销毁连接是非常消耗性能的,一般来说,我们会通过连接池来优化处理,所以,这里我们创建与消息中间件的连接池。

点击菜单【资源配置-连接器管理-连接池】-->点击【新增连接池】,填写连接池名称,适配器,连接类以及附加属性配置,点击保存,完成连接池的创建



需要注意的是,这里的属性会覆盖资源适配器的属性,优先级高于资源适配器。

 


创建连接器JNDI

我们在使用的时候,主要是通过JNDI来获取我们的管理对象。

点击菜单【资源配置-连接器管理-连接器】-->点击【新增连接器】,同时填写,连接器名称,选择创建的连接池,

点击保存完成连接器的创建。创建完成后,可在列表页面看到创建的数据。


 


 

 

管理对象创建(非必须)

如果需要创建管理对象使用,可在页面【资源配置—连接器管理—管理对象】中创建,点击【新增管理对象】,填写JNDI名称,选择资源适配器、资源类型、类名称,添加响应的属性,点击保存,完成管理对象的创建。如:


然后代码中通过JNDI获取,

Context ctx=new InitialContext();

            topic topic=( topic)ctx.lookup("jms/topic");

 

代码使用样例

ConnectionFactory可以通过JNDI来获取,所以代码中可以通过以下方式来使用。

以一个servlet为例。

public class Producer2Servlet extends HttpServlet {

    private static Connection connection;

    @Override

    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

        try {

            if (connection == null) {

                Context ctx = new InitialContext();

                ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("jms/kafkaJMSConnectionFactory");

                connection = (Connection) connectionFactory.createConnection();

            }

            connection.start();

            Session topicSession = connection.createSession(false, 1);

 

            MessageProducer producer = topicSession.createProducer(topicSession.createTopic("test2"));

            String mes = "hello +" + UUID.randomUUID().toString();

            producer.send(topicSession.createTextMessage(mes));

            resp.getWriter().println(mes);

        } catch (NamingException | JMSException e) {

            e.printStackTrace();

        }

    }

}

通过代码 (ConnectionFactory) ctx.lookup("jms/kafkaJMSConnectionFactory");来获取中间件中管理的ConnectionFactory,这样就可以使用在中间件中配置好的连接器信息,来完成消息的发送。

当然,中间件上配置好的JNDI资源也可以通过注解@Resource(lookup="jms/kafkaJMSConnectionFactory")来获取

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论