我怎样才能使骆驼路线线程安全?

How can i make a camel route thread safe?

我有一个 camel 路由,它使用来自 apache activeMQ 的任务。 当 ActiveMQ 只有一个消费者时一切正常。

但是当消费者增加到两个(或更多)时,应用程序的行为不当。

这是我的路线:

<routeContext id="myRoute"  xmlns="http://camel.apache.org/schema/spring">
    <route errorHandlerRef="myErrorHandler" id="myErrorRoute">
        <from uri="activemq:queue:{{my.queue}}" />
        <log loggingLevel="DEBUG" message="Message received from my queue : ${body}"></log>
        <multicast>
            <pipeline>
                <log loggingLevel="DEBUG" message="Adding to redis : ${body}"></log>
                <to uri="spring-redis://localhost:6379?serializer=#stringSerializer" />
            </pipeline>
            <pipeline>
                <transform>
                    <method ref="insertBean" method="myBatchInsertion"></method>
                </transform>
                <choice>
                    <when>
                        <simple> ${body.size()} == ${properties:my.batch.size}</simple>
                        <log message="Going to insert my batch in database" />
                        <to uri="mybatis:batchInsert?statementType=InsertList"></to>
          <log message="Inserted in my table : ${in.header.CamelMyBatisResult}"></log>
                        <choice>
                            <when>
                                <simple>${properties:my.write.file} == true</simple>
                                <bean beanType="com.***.***.processors.InsertToFile"
                                    method="processMy(${exchange}" />
                                <log message="Going to write to file : ${in.header.CamelFileName}" />
                                <to uri="file://?fileExist=Append&amp;bufferSize=32768"></to>
                            </when>
                        </choice>
                    </when>
                </choice>
            </pipeline>
        </multicast>
    </route>
</routeContext>

下面是豆子:

public class InsertBeanImpl {
  public  List<Out> myOutList = new CopyOnWriteArrayList<Out>();

  public List<Out> myBatchInsertion(Exchange exchange) {
        if (myOutList.size() >= myBatchSize) {
            Logger.sysLog(LogValues.info,this.getClass().getName(),"Reached max PayLoad size : "+myOutList.size() + " , going to clear batch");
            myOutList.clear();
        }
        Out myOut = exchange.getIn().getBody(Out.class);
        Logger.sysLog(LogValues.APP_INFO, this.getClass().getName(), myOut.getMasterId()+" | "+"Adding to batch masterId : "+myOut.getMasterId());
        synchronized(myOut){
            myOutList.add(myOut);
        }
        Logger.sysLog(LogValues.info, this.getClass().getName(), "Count of batch : "+myOutList.size());
        return myOutList;
    }
}



public class SaveToFile {
    static String currentFileName = null;
    static int iSub = 0;
    String path;
    String absolutePath;

    @Autowired
    private Utility utility;


    public void processMy(Exchange exchange) {
        getFileName(exchange, currentFileNameSub, iSub);
    }


    public void getFileName(Exchange exchange, String outFile, int i) {
        exchange.getIn().setBody(getFromJson(exchange));
        path = (String) exchange.getIn().getHeader("path");
        Calendar date = null;
        date = new GregorianCalendar();
        NumberFormat format = NumberFormat.getIntegerInstance();
        format.setMinimumIntegerDigits(2);
        String pathSuffix = "/" + date.get(Calendar.YEAR) + "/"
                + format.format((date.get(Calendar.MONTH) + 1)) + "/"
                    + format.format(date.get(Calendar.DAY_OF_MONTH));
        String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        double megabytes = 100 * 1024 * 1024;
        if (outFile != null) {
            if (!fileName.equals(outFile.split("_")[0])) {
                outFile = null;
                i = 0;
            }
        }
        if (outFile == null) {
            outFile = fileName + "_" + i;
        }
        while (new File(path + "/" + pathSuffix + "/" + outFile).length() >= megabytes) {
            outFile = fileName + "_" + (++i);
        }
        absolutePath = path + "/" + pathSuffix + "/" + outFile;
        exchange.getIn().setHeader("CamelFileName", absolutePath);
    }

    public String getFromJson(Exchange exchange) {
        synchronized(exchange){
            List<Out> body = exchange.getIn().getBody(CopyOnWriteArrayList.class);
            Logger.sysLog(LogValues.info, this.getClass().getName(), "body > "+body.size());
            String text = "";
            for (int i = 0; i < body.size(); i++) {
                Out msg = body.get(i);
                text = text.concat(utility.convertObjectToJsonStr(msg) + "\n");
            }
            return text;
        }

    }
}

由于处理器不同步且线程不安全,因此在多个消费者的情况下路由无法按预期工作。

谁能告诉我如何使我的路由线程安全或同步?

我尝试让处理器同步,但没有用。还有其他方法吗?

确保您的处理器线程安全,不要过多使用 Synchronize,最好根本不用。

为此,您根本不能在其中包含可更改的实例变量。

只有公共属性可以在那里,比如一些对所有线程都有效并且在任何方法执行期间都不会改变的设置。那么就不需要使用影响性能的Synchronize机制了。

Camel 中的所有其他内容都是线程安全的,如果处理器实现不是线程安全的,则无法告诉 Camel 是线程安全的。

显然 InsertBeanImplSaveToFile 都不是 thread-safe。一般来说,Camel 路由中使用的 bean 应该是无状态的,即它们不应该有变量字段。

对于InsertBeanImpl,看起来您真正想要做的是将多条消息聚合为一条消息。对于这样的用例,我会考虑改用 Camel 聚合器 [1],您可以使用它更轻松地实现 thread-safe 解决方案。

[1] http://camel.apache.org/aggregator.html

对于 SaveToFile,我认为没有理由将 pathabsolutePath 作为字段。将它们移动到 getFileName 方法中的局部变量中。