Java:使用 JaxB 编组到 XML,如何正确多线程

Java: Marshalling using JaxB to XML, how to properly multithread

我正在尝试获取一个非常长的字符串文件,并根据我得到的模式将其转换为 XML。我使用 jaxB 从该模式创建 classes。由于文件非常大,我创建了一个线程池来提高性能,但从那时起它只处理文件的一行并将其编组到每个线程的 XML 文件。

下面是我的家class,我在那里阅读了文件。每一行都是一个事务的记录,对于遇到的每个新用户,都会创建一个列表来存储该用户的所有事务,并将每个列表放入一个 HashMap 中。我将其设为 ConcurrentHashMap,因为多个线程将同时在地图上工作,这是正确的做法吗?

创建列表后,将为每个用户创建一个线程。每个线程都运行下面的 ProcessCommands 方法,并从家里接收其用户的事务列表。

public class home{
  public static File XMLFile = new File("LogFile.xml");
  Map<String,List<String>> UserMap= new ConcurrentHashMap<String,List<String>>();
  String[] UserNames =  new String[5000];
    int numberOfUsers = 0;
    try{
        BufferedReader reader = new BufferedReader(new FileReader("test.txt"));
            String line;
            while ((line = reader.readLine()) != null)
            {
                parsed = line.split(",|\s+");
                if(!parsed[2].equals("./testLOG")){
                    if(Utilities.checkUserExists(parsed[2], UserNames) == false){ //User does not already exist
                        System.out.println("New User: " + parsed[2]);
                        UserMap.put(parsed[2],new ArrayList<String>());         //Create list of transactions for new user
                        UserMap.get(parsed[2]).add(line);                       //Add First Item to new list
                        UserNames[numberOfUsers] = parsed[2];                   //Add new user
                        numberOfUsers++;
                    }
                    else{                                                           //User Already Existed
                        UserMap.get(parsed[2]).add(line);
                    }
                }
            }
            reader.close();
    } catch (IOException x) {
        System.err.println(x);
    }

    //get start time
    long startTime = new Date().getTime();
    tCount = numberOfUsers;
    ExecutorService threadPool = Executors.newFixedThreadPool(tCount);
    for(int i = 0; i < numberOfUsers; i++){
        System.out.println("Starting Thread " + i + " for user " + UserNames[i]);
        Runnable worker = new ProcessCommands(UserMap.get(UserNames[i]),UserNames[i], XMLfile);
        threadPool.execute(worker);
    }
    threadPool.shutdown();
    while(!threadPool.isTerminated()){

    }
    System.out.println("Finished all threads");

}

这是 ProcessCommands class。该线程为其用户接收列表并创建编组器。据我所知,编组不是线程安全的,因此最好为每个线程创建一个编组,这是最好的方法吗?

当我创建 marshaller 时,我知道每个 from(来自每个线程)都想访问创建的文件导致冲突,我使用了 synchronized,对吗?

当线程遍历它的列表时,每一行都需要一个特定的案例。有很多,所以为了清楚起见,我只是制作了伪案例。每个案例调用下面的函数。

public class ProcessCommands implements Runnable{
private static final boolean DEBUG = false;
private List<String> list = null;
private String threadName;
private File XMLfile = null;
public Thread myThread;


public ProcessCommands(List<String> list, String threadName, File XMLfile){
    this.list = list;
    this.threadName = threadName;
    this.XMLfile = XMLfile;
}

public void run(){
    Date start = null;
    int transactionNumber = 0;
    String[] parsed = new String[8];
    String[] quoteParsed = null;
    String[] universalFormatCommand = new String[9];
    String userCommand = null;
    Connection connection = null;
    Statement stmt = null;
    Map<String, UserObject> usersMap = null;
    Map<String, Stack<BLO>> buyMap = null;
    Map<String, Stack<SLO>> sellMap = null;
    Map<String, QLO> stockCodeMap = null;
    Map<String, BTO> buyTriggerMap = null;
    Map<String, STO> sellTriggerMap = null;
    Map<String, USO> usersStocksMap = null;
    String SQL = null;
    int amountToAdd = 0;
    int tempDollars = 0;
    UserObject tempUO = null;
    BLO tempBLO = null;
    SLO tempSLO = null;
    Stack<BLO> tempStBLO = null;
    Stack<SLO> tempStSLO = null;
    BTO tempBTO = null;
    STO tempSTO = null;
    USO tempUSO = null;
    QLO tempQLO = null;
    String stockCode = null;
    String quoteResponse = null;
    int usersDollars = 0;
    int dollarAmountToBuy = 0;
    int dollarAmountToSell = 0;
    int numberOfSharesToBuy = 0;
    int numberOfSharesToSell = 0;
    int quoteStockInDollars = 0;
    int shares = 0;
    Iterator<String> itr = null;

    int transactionCount = list.size();
    System.out.println("Starting "+threadName+" - listSize = "+transactionCount);

    //UO dollars, reserved
    usersMap  = new HashMap<String, UserObject>(3);  //userName -> UO

    //USO shares
    usersStocksMap = new HashMap<String, USO>(); //userName+stockCode -> shares

    //BLO code, timestamp, dollarAmountToBuy, stockPriceInDollars
    buyMap = new HashMap<String, Stack<BLO>>();  //userName -> Stack<BLO>

    //SLO code, timestamp, dollarAmountToSell, stockPriceInDollars
    sellMap = new HashMap<String, Stack<SLO>>();  //userName -> Stack<SLO>

    //BTO code, timestamp, dollarAmountToBuy, stockPriceInDollars
    buyTriggerMap = new ConcurrentHashMap<String, BTO>();  //userName+stockCode -> BTO

    //STO code, timestamp, dollarAmountToBuy, stockPriceInDollars
    sellTriggerMap = new HashMap<String, STO>();  //userName+stockCode -> STO

    //QLO timestamp, stockPriceInDollars
    stockCodeMap = new HashMap<String, QLO>();  //stockCode -> QLO



    //create user object and initialize stacks
    usersMap.put(threadName, new UserObject(0, 0));
    buyMap.put(threadName, new Stack<BLO>());
    sellMap.put(threadName, new Stack<SLO>());
    try {
        //Marshaller marshaller = getMarshaller();
        synchronized (this){
            Marshaller marshaller = init.jc.createMarshaller();
            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
            marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
            marshaller.marshal(LogServer.Root,XMLfile);
            marshaller.marshal(LogServer.Root,System.out);
        }
    } catch (JAXBException M) {
        M.printStackTrace();
    }

    Date timing = new Date();
    //universalFormatCommand = new String[8];
    parsed = new String[8];
    //iterate through workload file
    itr = this.list.iterator();
    while(itr.hasNext()){
        userCommand = (String) itr.next(); 
        itr.remove();
        parsed = userCommand.split(",|\s+");
        transactionNumber = Integer.parseInt(parsed[0].replaceAll("\[", "").replaceAll("\]", ""));
        universalFormatCommand = Utilities.FormatCommand(parsed, parsed[0]);
        if(transactionNumber % 100 == 0){
            System.out.println(this.threadName + " - " +transactionNumber+ " - "+(new Date().getTime() - timing.getTime())/1000);
        }
        /*System.out.print("UserCommand " +transactionNumber + ": ");
        for(int i = 0;i<8;i++)System.out.print(universalFormatCommand[i]+ " ");
        System.out.print("\n");*/
        //switch for user command
        switch (parsed[1].toLowerCase()) {

        case "One"
            *Do Stuff"
            LogServer.create_Log(universalFormatCommand, transactionNumber, CommandType.ADD);
            break;
        case "Two"
            *Do Stuff"
            LogServer.create_Log(universalFormatCommand, transactionNumber, CommandType.ADD);
            break;
        }
     }
  }

函数 create_Log 和以前一样有多个案例,为了清楚起见,我只留下一个。 case "QUOTE" 只调用一个对象创建函数,其他的other case 可以创建多个对象。类型 'log' 是一个复杂的 XML 类型,它定义了所有其他对象类型,因此在每次调用 create_Log 时,我都会创建一个名为 Root 的日志类型。 JaxB 生成的 class 'log' 包含一个创建对象列表的函数。声明:

Root.getUserCommandOrQuoteServerOrAccountTransaction().add(quote_QuoteType);

获取我创建的根元素,创建一个列表并将新创建的对象 'quote_QuoteType' 添加到该列表。在我添加线程之前,此方法成功地创建了一个包含尽可能多的对象的列表,然后对它们进行了编组。所以我非常肯定 class 'LogServer' 中的位不是问题所在。这与上面的 ProcessCommands class 中的编组和同步有关。

public class LogServer{
    public static log Root = new log();

    public static QuoteServerType Log_Quote(String[] input, int TransactionNumber){
    ObjectFactory factory = new ObjectFactory();
    QuoteServerType quoteCall = factory.createQuoteServerType();

    **Populate the QuoteServerType object called quoteCall**

    return quoteCall;
    }

    public static void create_Log(String[] input, int TransactionNumber, CommandType Command){
    System.out.print("TRANSACTION "+TransactionNumber + " is " + Command + ": ");
    for(int i = 0; i<input.length;i++) System.out.print(input[i] + " ");
    System.out.print("\n");
    switch(input[1]){
    case "QUOTE":
        System.out.print("QUOTE CASE");
        QuoteServerType quote_QuoteType = Log_Quote(input,TransactionNumber);
        Root.getUserCommandOrQuoteServerOrAccountTransaction().add(quote_QuoteType);
        break;
        }
      }

所以你写了很多代码,但是你试过它是否真的有效吗?快速浏览后,我对此表示怀疑。你应该一部分一部分地测试你的代码逻辑,不要一直测试到最后。看来你只是盯着 Java。我建议首先在简单的单线程应用程序上练习。对不起,如果我听起来很刺耳,但我也会尽量保持建设性:

  1. 按照惯例,classes 名称以大写字母开头,变量以小写开头,您可以采用其他方式。
  2. 你应该在你的家 (Home) 创建一个方法 class 而不是将你所有的代码都放在静态块中。
  3. 您是将整个文件读入内存,您没有逐行处理它。在 Home 初始化后,文件的全部内容将在 UserMap 变量下。如果文件真的很大,你会 运行 堆内存不足。如果您假设文件很大,那么您将无法做到这一点,并且您必须重新设计您的应用程序以将部分结果存储在某个地方。如果你的文件小于内存,你可以保持这样(但你说它很大)。
  4. 不需要用户名,UserMap.containsKey 就可以了
  5. 你的线程池大小应该在你的核心范围内而不是用户数量,因为你会得到线程垃圾(如果你的代码中有阻塞操作,如果不保持它作为处理器数量,则使 tCount = 2*processors ).一旦一个 ProcessCommand 完成,执行器将启动另一个 ProcessCommand,直到您完成所有,您将有效地使用所有处理器内核。
  6. 不要 while(!threadPool.isTerminated()),这一行将完全消耗一个处理器,因为它会不断检查,而是调用 awaitTermination
  7. 您的 ProcessCommand 具有视图映射变量,如您所说,只有一个条目原因,每个变量将处理来自一个用户的数据。
  8. synchronized(this) is Process 将不起作用,因为每个线程将在不同的对象(不同的进程实例)上同步。
  9. 我相信创建 marshaller 是线程安全的(检查它)所以根本不需要同步
  10. 您在对交易列表进行实际处理之前保存您的日志(无论它是什么)
  11. 编组将覆盖当前状态为 LogServer.Root 的文件内容。如果它在您的 proccsCommand 之间共享(似乎是这样),那么将它保存在每个线程中有什么意义。完成后立即执行。
  12. 你不需要 itr.remove();
  13. 日志 class(对于 ROOT 变量 !!!)需要是线程安全的,因为所有线程都会调用它的操作(因此日志 class 中的列表必须是并发列表等)。
  14. 等等……

我会推荐给

  1. 从实际有效的简单单线程版本开始。
  2. 逐行处理,(将每个用户的结果存储在不同的文件中,您可以为最近使用的用户使用事务缓存,这样就不必一直写入磁盘(请参阅番石榴缓存)
  3. 处理多线程每个用户事务到您的用户日志对象(同样,如果它很多,您必须将它们保存到磁盘而不是全部保存在内存中)。
  4. 编写代码,将来自不同用户的日志组合起来创建一个(同样,您可能希望多线程执行),尽管它主要是 IO 操作,因此收益不大,而且做起来更棘手。

祝你好运 覆盖 cont