在开发与使用连接器架构的组件时,可以通过注解来定义和使用,也可以通过中间件来配置相关组件来使用,下面我们就以消息队列的连接器来简单介绍一下各自使用情况。其中flyingserver-jms-kafka-rar-1.0连接器是实现了Jakarta连接器架构,同时也实现了JMS规范,这样可以通过JMS规范来使用kafka消息服务。
连接器部署
无论是注解使用还是中间件托管使用,首先都是要安装部署连接器的。下面我们以亚信科技的中间件产品 AISWare FlyingServer Web中间件为例。
进入FlyingServer控制台页面后,点击菜单【应用管理】-->点击【新增应用】,填写好应用名称,文件,部署目标,点击确定开始部署

部署完成后,可以在页面上看到记录与运行状态,至此连接器部署便完成了。

后续便可以通过注解定义与使用连接器了。但是中间件配置与使用的话,还需要做其他配置。
开发注解定义与使用
注解使用的前提是中间件服务已经部署并启用了对应的连接器,如下面的flyingserver-jms-kafka-rar-1.0
消息发送
定义ConnectionFactory可以通过注解@ConnectionFactoryDefinition来使用,如下:
|
@ConnectionFactoryDefinition(name
= "java:module/env/ConnectionFactory", description = "Kafka Conn
Factory", interfaceName =
"javax.jms.ConnectionFactory", resourceAdapter = "flyingserver-jms-kafka-rar-1.0", transactionSupport =
TransactionSupportLevel.NoTransaction, properties = { }) |
然后在需要使用的地方,可以通过@Resource注解来使用,如下:
|
@Resource(lookup="java:module/env/ConnectionFactory") ConnectionFactory factory; |
其中ConnectionFactoryDefinition注解的name为资源的JNDI名称,要和Resource注解需要lookup的JNDI名称一致。然后就可以在代码中使用。
完整样例如下:
|
@ConnectionFactoryDefinition(name
= "java:module/env/ConnectionFactory", description = "Kafka Conn
Factory", interfaceName =
"javax.jms.ConnectionFactory", resourceAdapter = " flyingserver -jms-kafka-rar-1.0", transactionSupport =
TransactionSupportLevel.NoTransaction, properties = { }) @Stateless public class
SendKafkaMessage { private static Connection connection;
@Resource(lookup="java:module/env/ConnectionFactory") ConnectionFactory factory; @Schedule(second = "*/10",
hour="*", minute="*",persistent = false) public void sendMessage() throws Exception
{ try { if (connection == null) { connection =
factory.createConnection(); } connection.start(); Session topicSession =
connection.createSession(false, 1); MessageProducer producer =
topicSession.createProducer(topicSession.createTopic("test")); String mes = "hello +"
+ UUID.randomUUID().toString(); producer.send(topicSession.createTextMessage(mes)); } catch (Exception ex) {
Logger.getLogger(SendKafkaMessage.class.getName()).log(Level.SEVERE,
null, ex); } } } |
消息消费
类似消息发送的使用方式:
|
{ ... MessageConsumer
consumer = topicSession.createConsumer(topicSession.createTopic("test")); consumer.setMessageListener(new
MessageListener() { @Override public void onMessage(Message
message) { } }); Utils.threadPool.submit(new
MessageHandler(“test”, consumer)); ... } class MessageHandler implements Runnable
{ String topicName; private MessageConsumer inconsumer; public MessageHandler(String
topicName, MessageConsumer inconsumer) { this.topicName = topicName; this.inconsumer = inconsumer; } @Override public void run() { System.out.println("Subscribed
to topic " + topicName); while (true) { TextMessage receive = null; try { receive =
(TextMessage)inconsumer.receive(5000); } catch (JMSException e) { e.printStackTrace(); } try { if(receive!=null){ System.out.println("成功接收消息:" +
receive.getText()); } } catch (JMSException e) { e.printStackTrace(); } } } } |
管理对象使用
类似ConnectionFactoryDefinition,管理对象通过@AdministeredObjectDefinition来定义,然后通过@Resource来注入使用,
样例如下:
|
。。。。。 @AdministeredObjectDefinition( name =
"java:global/jms/adminObjTopic", resourceAdapter =
"flyingserver-jms-kafka-rar-1.0", interfaceName =
"javax.jms.Topic", className =
"com.asiainfo.mw.flyingserver.adapter.jms.kafka.KafkaJmsTopic", properties =
{"topicName=test3"} ) 。。。。。
@Resource(lookup="java:global/jms/adminObjTopic") Topic topic; 。。。。。。 MessageProducer
producer = topicSession.createProducer(topic); 。。。。。。 |
中间件配置与使用
一般来讲,符合Jakarta EE规范的中间件,都支持连接器的使用,只是部署、配置方式大同小异,下面我们以亚信科技的中间件产品
AISWare FlyingServer Web中间件为例,来介绍一下如何在中间件产品中配置与使用连接器。
新建资源适配器
点击菜单【资源适配】-->点击【新增资源适配】,填写资源适配名称,配置线程池和属性,点击保存,完成资源适配器配置,可在列表上查看到数据。

需要注意的是:这里配置的属性是连接器的全局属性,优先级较低。

连接池
与EIS系统创建于销毁连接是非常消耗性能的,一般来说,我们会通过连接池来优化处理,所以,这里我们创建与消息中间件的连接池。
点击菜单【资源配置-连接器管理-连接池】-->点击【新增连接池】,填写连接池名称,适配器,连接类以及附加属性配置,点击保存,完成连接池的创建


需要注意的是,这里的属性会覆盖资源适配器的属性,优先级高于资源适配器。

创建连接器JNDI
我们在使用的时候,主要是通过JNDI来获取我们的管理对象。
点击菜单【资源配置-连接器管理-连接器】-->点击【新增连接器】,同时填写,连接器名称,选择创建的连接池,
点击保存完成连接器的创建。创建完成后,可在列表页面看到创建的数据。


管理对象创建(非必须)
如果需要创建管理对象使用,可在页面【资源配置—连接器管理—管理对象】中创建,点击【新增管理对象】,填写JNDI名称,选择资源适配器、资源类型、类名称,添加响应的属性,点击保存,完成管理对象的创建。如:

然后代码中通过JNDI获取,
|
Context
ctx=new InitialContext(); topic topic=(
topic)ctx.lookup("jms/topic"); |
代码使用样例
ConnectionFactory可以通过JNDI来获取,所以代码中可以通过以下方式来使用。
以一个servlet为例。
|
public class
Producer2Servlet extends HttpServlet { private static Connection connection; @Override protected void service(HttpServletRequest
req, HttpServletResponse resp) throws ServletException, IOException { try { if (connection == null) { Context ctx = new
InitialContext(); ConnectionFactory
connectionFactory = (ConnectionFactory)
ctx.lookup("jms/kafkaJMSConnectionFactory"); connection = (Connection)
connectionFactory.createConnection(); } connection.start(); Session topicSession =
connection.createSession(false, 1); MessageProducer producer =
topicSession.createProducer(topicSession.createTopic("test2")); String mes = "hello +"
+ UUID.randomUUID().toString();
producer.send(topicSession.createTextMessage(mes)); resp.getWriter().println(mes); } catch (NamingException |
JMSException e) { e.printStackTrace(); } } } |
通过代码 (ConnectionFactory)
ctx.lookup("jms/kafkaJMSConnectionFactory");来获取中间件中管理的ConnectionFactory,这样就可以使用在中间件中配置好的连接器信息,来完成消息的发送。
当然,中间件上配置好的JNDI资源也可以通过注解@Resource(lookup="jms/kafkaJMSConnectionFactory")来获取




