请教 java怎么向activemq发送消息?

huitongrr 2008-02-21 02:22:02
有没有很简单的一段代码吗? 只要求 发送消息的也可以 能给我简单讲一下吗????谢谢啊!! 我对发送消息这没怎么接触过!!呵呵
...全文
1925 29 打赏 收藏 转发到动态 举报
写回复
用AI写文章
29 条回复
切换为时间正序
请发表友善的回复…
发表回复
乳臭未干 2011-07-15
  • 打赏
  • 举报
回复
我也粘一段吧~

package lzTest1;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

public class TryActiveMQ_Send {
public static void main(String args[])throws Exception{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
Message message = session.createTextMessage("我叫AAA!");
producer.send(message);
//session.commit();
session.close();
connection.close();

}
}
--------------------------------------------------------
package lzTest1;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TryActiveMQ_Receive {
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory
("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("收到消息: " + text);
} else {
System.out.println("收到消息: " + message);
}
//session.commit();
session.close();
connection.close();
}
}

mouse5s306 2008-02-29
  • 打赏
  • 举报
回复

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;


public class ReceiveMessage {

private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";


public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){

}
}

protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}

public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}

public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}


这个是我自己做测试用的
适用收取TextMessage消息
希望对LZ有用
huitongrr 2008-02-27
  • 打赏
  • 举报
回复
我自己试了啊 不好用 我只是想和大家讨论一下 是什么问题 那你说 你上网找资料 和问别人有什么不一样! 问别人也是只是思路 那要是不用问的话 那这个 论坛就别开了!

你也可以永远都不问人了! 你也可以永远都别上网看别人的答案了!

你也别上网了!谢谢!
ganghua 2008-02-27
  • 打赏
  • 举报
回复
永远都问别人么?
睿音 2008-02-27
  • 打赏
  • 举报
回复

public void receive() {
try {
receiver = session.createReceiver(queue_test);
TextMessage tm = (TextMessage) receiver.receive();
System.out.println("receive successful!");
messageContent = tm.getText();
System.out.println("The content of the receiving message is:" + messageContent);

} catch (JMSException jmse1) {
System.out.println("JMSException occured while receiving!");
jmse1.printStackTrace();
System.exit( -1);
}
}


我贴的是我自己的一段应用。lz 自己整理下吧。
另:为什么不整合spring.
huitongrr 2008-02-27
  • 打赏
  • 举报
回复

大家好 那怎么从qctivemq队列里面取信息呢!

我用的是下面这个 我在


对方给我一个队列名 queue_test


下面是我写的
private Destination destination;
if (topic) {
destination = session.createTopic("queue_test");
} else {
destination = session.createQueue("queue_test");
}

MessageConsumer consumer = session.createConsumer(destination);


consumer.receive();

以上是取的代码 部分!
但是 我好象没取出来 也可能取出来了 但是 我没看见!! 怎么 可以 显示出来啊 我能看见啊 !

发的我会了 谢谢大家啊!!!
huitongrr 2008-02-27
  • 打赏
  • 举报
回复
那怎么从队列里取得信息呢 发送的我会了 感谢大家!
mouse5s306 2008-02-26
  • 打赏
  • 举报
回复
给个简单的吧

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

//发送TextMessage
public class SendMessage {

private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";

public void sendMessage() throws JMSException{

Connection connection = null;

try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();

connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}


***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {

private String url = "tcp://localhost:61616";

public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}

private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}



LZ发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息

eefox 2008-02-25
  • 打赏
  • 举报
回复
不太清楚 火狐
huitongrr 2008-02-22
  • 打赏
  • 举报
回复
我运行时出现 这个错误javax.naming.NameNotFoundException: MyJMSConnectionFactory
at org.apache.activemq.jndi.ReadOnlyContext.lookup(ReadOnlyContext.java:215)
at javax.naming.InitialContext.lookup(InitialContext.java:351)
at com.QueueBase.<init>(QueueBase.java:19)
at com.QueueComsumer.<init>(QueueComsumer.java:14)
at com.QueueComsumer.main(QueueComsumer.java:46)
java.lang.NullPointerException
at com.QueueComsumer.<init>(QueueComsumer.java:17)
at com.QueueComsumer.main(QueueComsumer.java:46)


我用的是tomcat
我的属性文件(jmsSetting.properties) 是这样写的

#jndi factory
jndi.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory

#jndi provider url
jndi.provider.url=tcp://10.64.8.54:51618/

#jms factory
jms.factory=MyJMSConnectionFactory

#queue name
queue.name=MyJMSQueue


我想错误在这jms.factory=MyJMSConnectionFactory 但是 MyJMSConnectionFactory是什么啊
还有#jndi provider url
jndi.provider.url=t3://127.0.0.1:7001/

这个t3是什么啊?


不好意思 太打扰 朋友了!!!!!!
吐司vivi 2008-02-22
  • 打赏
  • 举报
回复
基于weblogic的
.属性文件(jmsSetting.properties)
#jndi factory
jndi.factory=weblogic.jndi.WLInitialContextFactory

#jndi provider url
jndi.provider.url=t3://127.0.0.1:7001/

#jms factory
jms.factory=MyJMSConnectionFactory

#queue name
queue.name=MyJMSQueue
huitongrr 2008-02-22
  • 打赏
  • 举报
回复
那个 你有jmsSetting.properties 文件没 那个MQ的段口号 还有什么属性 是不是在那里设置的啊!!!! 能发个过来吗 谢谢啊@*@
lcm16001080 2008-02-22
  • 打赏
  • 举报
回复
我补充几句。所有的mq都是基于jdk 的jms的api来开发的。消息类型为:发布/订阅,点对点。一般的mq传送机制是:生产者生产出消息后,交由jms provider去维护并发送给消费者。其它的略。。。。。.

欢迎加入http://www.ityouku.com讨论,java菜鸟群33897438, msn:lichunmei2006@hotmail.com
huitongrr 2008-02-22
  • 打赏
  • 举报
回复
十分感谢 我试试 一.Prop类(用来读取属性文件,单例)
是做什么用的啊>?
huitongrr 2008-02-22
  • 打赏
  • 举报
回复
哈哈哈哈 感谢CSDN, 感谢斑竹, 感谢所有回帖的朋友 !!!
吐司vivi 2008-02-21
  • 打赏
  • 举报
回复
以前保存的一段:
一.Prop类(用来读取属性文件,单例)


package com.sitinspring.standardWeblogicJms;

import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;

import javax.naming.Context;
import javax.naming.InitialContext;

public class Props {
private static final String File_Name = "jmsSetting.properties";

private static Properties propts;

public static void makeProptsInstance() {
propts = new Properties();

try {
propts.load(new FileInputStream(File_Name));
} catch (Exception ex) {
ex.printStackTrace();
}
}

public static String get(String name){
if(propts==null){
makeProptsInstance();
}

return (String)propts.get(name);
}

@SuppressWarnings("unchecked")
public static Context getInitialContext(){
Context context=null;

String jndiFactory=Props.get("jndi.factory");
String providerUrl=Props.get("jndi.provider.url");

Hashtable env=new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
env.put(Context.PROVIDER_URL, providerUrl);

try {
context=new InitialContext(env);
} catch (Exception ex) {
ex.printStackTrace();
}

return context;
}
}
二.QueueBase类(QueueComsumer和QueueSupplier的基类,用于归纳一些两类共通的东西)
package com.sitinspring.standardWeblogicJms;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;

public class QueueBase{
protected QueueConnectionFactory queueConnectionFactory;
protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected Queue queue;

public QueueBase(Context context){
try{
String jmsFactory=Props.get("jms.factory");
queueConnectionFactory=(QueueConnectionFactory)context.lookup(jmsFactory);
queueConnection=queueConnectionFactory.createQueueConnection();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

String queueName=Props.get("queue.name");
queue=(Queue)context.lookup(queueName);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
三.QueueComsumer类(用于接收消息)
package com.sitinspring.standardWeblogicJms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.naming.Context;

public class QueueComsumer extends QueueBase implements MessageListener {
private QueueReceiver queueReceiver;

public QueueComsumer(Context context) {
super(context);

try {
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}

public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtmsg = (TextMessage) message;

try {
System.out.println("I have received the TextMassage:"
+ txtmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public void close() throws JMSException {
queueReceiver.close();
queueSession.close();
queueConnection.close();
}

public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueComsumer queueComsumer = new QueueComsumer(context);

synchronized (queueComsumer) {
while (true) {
try {
queueComsumer.wait();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();

try {
queueComsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
}
四.QueueSupplier类(用于发送消息)
package com.sitinspring.standardWeblogicJms;

import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.naming.Context;

public class QueueSupplier extends QueueBase {
private QueueSender queueSender;

public QueueSupplier(Context context) {
super(context);

try {
queueSender = queueSession.createSender(queue);
} catch (Exception ex) {
ex.printStackTrace();
}
}

public void sendMsg(String msg) throws JMSException {
TextMessage txtMsg = queueSession.createTextMessage();
txtMsg.setText(msg);

queueConnection.start();
queueSender.send(txtMsg);
}

public void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
}

public static void main(String[] args) {
Context context = Props.getInitialContext();
QueueSupplier queueSupplier = new QueueSupplier(context);

try {
queueSupplier.sendMsg("Hello World");

System.out.println("A message have been sent!");
} catch (JMSException ex) {
ex.printStackTrace();
} finally {
try {
queueSupplier.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
huitongrr 2008-02-21
  • 打赏
  • 举报
回复
??>???在哪 你有没 我的邮箱是rtb2005@163.com 要是 能把那段代码发上来 更好 !!谢谢!
baitianhai 2008-02-21
  • 打赏
  • 举报
回复
...不是jar里面有,是下载的zip文件里有
huitongrr 2008-02-21
  • 打赏
  • 举报
回复
真的吗??? 你是说jar里的java文件吗>???/ 怎么给大家分啊 谢谢大家! 我找找看看啊!!!不知道能不能找到 呵呵!
baitianhai 2008-02-21
  • 打赏
  • 举报
回复
你下载activemq,里面就有代码例子吧,很简单的


加载更多回复(9)

67,513

社区成员

发帖
与我相关
我的任务
社区描述
J2EE只是Java企业应用。我们需要一个跨J2SE/WEB/EJB的微容器,保护我们的业务核心组件(中间件),以延续它的生命力,而不是依赖J2SE/J2EE版本。
社区管理员
  • Java EE
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧