如何将文件发送到 ActiveMQ 队列?
How to send a file to the ActiveMQ Queue?
我在 JBoss FUSE:
中有一个简单的 Apache Camel 路由
<?xml version="1.0"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
<bean id="startPolicy" class="org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy">
<property name="routeStartTime" value="*/3 * * * * ?"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
<camelContext id="blueprintContext" trace="false" xmlns="http://camel.apache.org/schema/blueprint">
<route id="testRoute" routePolicyRef="startPolicy" autoStartup="false">
<from uri="activemq:source-queue?username=admin&password=admin"></from>
<log message="${body}" loggingLevel="INFO"></log>
<to uri="activemq:sink-queue?username=admin&password=admin"></to>
</route>
</camelContext>
</blueprint>
我可以使用这个独立客户端连接到 ActiveMQ 代理并将消息发送到队列:
public class MessageSender {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setUserName("admin");
factory.setPassword("admin");
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("source-queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("some message to queue...");
producer.send(message);
} finally {
connection.close();
}
}
}
从我看到的日志来看,消息是从队列中消费的,消息体显示在日志中:
如何发送一个文件到ActiveMQ Queue?例如,我有一个用 multipart/form-data
编码的 <input type="file">
的简单形式。通过使用此表单,我需要将 POST 请求的有效负载发送到 ActiveMQ 队列。
我该怎么做?
非常感谢您提供的信息。
感谢大家。
@Mary Zheng 提供了一个很好的例子,如何实现:
ActiveMQ File Transfer Example
classQueueMessageProducer
的方法,将文件消息发送到 ActiveMQ Broker:
private void sendFileAsBytesMessage(File file) throws JMSException, IOException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.setStringProperty("fileName", file.getName());
bytesMessage.writeBytes(fileManager.readfileAsBytes(file));
msgProducer.send(bytesMessage);
}
,其中:
ConnectionFactory connFactory =
new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
Connection connection = connFactory.createConnection();
ActiveMQSession session =
(ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
FileAsByteArrayManager
class 对文件执行低级操作:
public class FileAsByteArrayManager {
public byte[] readfileAsBytes(File file) throws IOException {
try (RandomAccessFile accessFile = new RandomAccessFile(file, "r")) {
byte[] bytes = new byte[(int) accessFile.length()];
accessFile.readFully(bytes);
return bytes;
}
}
public void writeFile(byte[] bytes, String fileName) throws IOException {
File file = new File(fileName);
try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) {
accessFile.write(bytes);
}
}
}
我在 JBoss FUSE:
中有一个简单的 Apache Camel 路由<?xml version="1.0"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
<bean id="startPolicy" class="org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy">
<property name="routeStartTime" value="*/3 * * * * ?"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
<camelContext id="blueprintContext" trace="false" xmlns="http://camel.apache.org/schema/blueprint">
<route id="testRoute" routePolicyRef="startPolicy" autoStartup="false">
<from uri="activemq:source-queue?username=admin&password=admin"></from>
<log message="${body}" loggingLevel="INFO"></log>
<to uri="activemq:sink-queue?username=admin&password=admin"></to>
</route>
</camelContext>
</blueprint>
我可以使用这个独立客户端连接到 ActiveMQ 代理并将消息发送到队列:
public class MessageSender {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setUserName("admin");
factory.setPassword("admin");
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("source-queue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("some message to queue...");
producer.send(message);
} finally {
connection.close();
}
}
}
从我看到的日志来看,消息是从队列中消费的,消息体显示在日志中:
如何发送一个文件到ActiveMQ Queue?例如,我有一个用 multipart/form-data
编码的 <input type="file">
的简单形式。通过使用此表单,我需要将 POST 请求的有效负载发送到 ActiveMQ 队列。
我该怎么做?
非常感谢您提供的信息。
感谢大家。
@Mary Zheng 提供了一个很好的例子,如何实现:
ActiveMQ File Transfer Example
classQueueMessageProducer
的方法,将文件消息发送到 ActiveMQ Broker:
private void sendFileAsBytesMessage(File file) throws JMSException, IOException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.setStringProperty("fileName", file.getName());
bytesMessage.writeBytes(fileManager.readfileAsBytes(file));
msgProducer.send(bytesMessage);
}
,其中:
ConnectionFactory connFactory =
new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
Connection connection = connFactory.createConnection();
ActiveMQSession session =
(ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
FileAsByteArrayManager
class 对文件执行低级操作:
public class FileAsByteArrayManager {
public byte[] readfileAsBytes(File file) throws IOException {
try (RandomAccessFile accessFile = new RandomAccessFile(file, "r")) {
byte[] bytes = new byte[(int) accessFile.length()];
accessFile.readFully(bytes);
return bytes;
}
}
public void writeFile(byte[] bytes, String fileName) throws IOException {
File file = new File(fileName);
try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) {
accessFile.write(bytes);
}
}
}