Java - 使用 FileWriter 写入同一个文件 - 介绍 multi-threading

Java - write to same file with FileWriter - introduce multi-threading

编辑2: 我很清楚两次票数接近的原因:OP中没有multi-threading。所以我编辑了标题。

线程不在 OP 中,但在我的答案中。

编辑:

好吧,我放弃编辑我的原创 post 而是让我这样问:

此日志模拟器旨在生成用户和设备活动的日志。每行包含多个键和值,以 ;; 分隔,格式为 key=value。键是:User-NameNAS-IdentifierAcct-Input-OctetsAcct-Output-OctetsdateAcct-Status-Type

User-Name 应该是随机的,如 user_<random_number>NAS-Identifier 一样。 Acct-Input-OctetsAcct-Output-Octets是整数,从0开始,按升序排列;增加的数量是任意的。 date 也应该增加。 Acct-Status-TypeStartInterim-UpdateStop之一,代表一个用户session的开始、持续状态和结束。应该有一个 Start,然后是多个 Interim-Update,随机数,和一个 Stop。 session.

期间用户和nas相同

因此,它应该生成如下行:

date=lun, 22 ene 2018 18:39:42.052;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Start;;Acct-Input-Octets=0;;Acct-Output-Octets=0;;
date=lun, 22 ene 2018 18:39:43.827;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=0;;Acct-Output-Octets=0;;
date=lun, 22 ene 2018 18:39:44.463;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=268;;Acct-Output-Octets=42;;
date=lun, 22 ene 2018 18:39:43.968;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=428;;Acct-Output-Octets=143;;
date=lun, 22 ene 2018 18:39:44.039;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=519;;Acct-Output-Octets=294;;
date=lun, 22 ene 2018 18:39:46.276;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Stop;;Acct-Input-Octets=727;;Acct-Output-Octets=535;;
...

但是,我们这里可以看到,同一个用户的台词都聚集在一起了,不太真实。我想要的是这样的:

User A Start
User B Start
User C Start
User A Interim-Update
User B Stop
User D Start
User C Interim-Update
User A Interim-Update
...

我想创建几个线程,比如4个,相互竞争;一个抓住文件并写一行,然后释放它。然后,另一个线程再次写入文件,一行并退出。


OP

我有这个简单的 Java class 来生成 Radius 日志的模拟:

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class RadiusGenerator {
    private static final String LOG_PATH = "radius-simulation.log";
    private static FileWriter writer;
    private static volatile Map<String, String> user_nas; //ensure the newest value read from main heap; use volatile because we read but not write.
    private static final SimpleDateFormat fmt_in = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss.SSS");
    /* keys in db*/
//    private static final String KEY_TIMESTAMP_HOUR = "timestamp_hour";
//    private static final String KEY_USERID = "user_id";
//    private static final String KEY_NASID = "nas_id";
//    private static final String KEY_TYPE = "type";
//    private static final String KEY_BALANCES = "balances";
//    private static final String KEY_INPUT = "input";
//    private static final String KEY_OUTPUT = "output";

    /* fields in tuple */
    private static final String FIELD_DATE = "date";
    private static final String FIELD_USERNAME = "User-Name";
    private static final String FIELD_NAS_IDENTIFIER = "NAS-Identifier";
    private static final String FIELD_METHOD = "Acct-Status-Type";
    private static final String FIELD_BALANCE_INPUT = "Acct-Input-Octets";
    private static final String FIELD_BALANCE_OUTPUT = "Acct-Output-Octets";

    /* methods */
    private static final String METHOD_START = "Start";
    private static final String METHOD_UPDATE = "Interim-Update";
    private static final String METHOD_STOP = "Stop";
    private static String name;
    private static String nas;
    private static String method;
    private static String result;
    private static int input;
    private static int output;
    private static Date now;


    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            RadiusGenerator.writer = new FileWriter(new File(LOG_PATH), false);
            System.out.println("Initializing with parameters: number of users: " + args[0] + ", number of NAS: " + args[1]);
            int numUsers = Integer.parseInt(args[0]);
            int numNAS = Integer.parseInt(args[1]);
//            List<String> usernames = new ArrayList<String>();
//            IntStream.range(1, numUsers).forEach(i -> {
//                usernames.add("User_" + i);
//            });
//            List<String> nasNames = new ArrayList<String>();
//            IntStream.range(1, numNAS).forEach(i -> {
//                nasNames.add("NAS_" + i);
//            });


            FileWriter writer = new FileWriter(new File(LOG_PATH));
            try {
                for (int i=0; i < generateRandom(20); i++) {
                    int indexUser = generateRandom(numUsers-1);
                    name = "User_" + String.valueOf(indexUser);
                    int indexNas = generateRandom(numNAS-1);
                    nas = "NAS_" + String.valueOf(indexNas);
                    user_nas = new HashMap<String, String>();
                    method = "";
                    result = "";
                    now = new Date();
                    input = 0;
                    output = 0;

                    //first line
                    method = METHOD_START;
                    user_nas.put(name, nas);
                    formLine();
                    //update
                    method = METHOD_UPDATE;
                    do {
                        if (generateRandom(10) == 9) {
                            method = "Stop";
                        }
                        formLine();
                        input += generateRandom(300);
                        output += generateRandom(250);
                    } while (!method.equals(METHOD_STOP));
                    //stop
                    user_nas.remove(name);
                    input = 0;
                    output = 0;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            writer.write(result);
            writer.flush();
            writer.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            System.out.println("The paths is not correct. ");
            e.printStackTrace();
        } catch (NumberFormatException e1) {
            System.out.println("The parameters coming in have wrong formats. Double-check them!");
            e1.printStackTrace();
        }
    }


    private static String concat(String a, String b) {
        return a + "=" + b + ";;";
    }

    private static int generateRandom(int range) {
        return (int) (Math.random() * range);
    }

    private static void formLine() throws IOException {
        result += concat(FIELD_DATE, fmt_in.format(new Date(now.getTime() + (long)generateRandom(5000)))); //randomly add 0-5 seconds
        result += concat(FIELD_USERNAME, name);
        result += concat(FIELD_NAS_IDENTIFIER, nas);
        result += concat(FIELD_METHOD, method);
        result += concat(FIELD_BALANCE_INPUT, String.valueOf(input));
        result += concat(FIELD_BALANCE_OUTPUT, String.valueOf(output));
        result += System.lineSeparator();
    }
}

生成如下输出:

date=lun, 22 ene 2018 18:39:42.052;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Start;;Acct-Input-Octets=0;;Acct-Output-Octets=0;;
date=lun, 22 ene 2018 18:39:43.827;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=0;;Acct-Output-Octets=0;;
date=lun, 22 ene 2018 18:39:44.463;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=268;;Acct-Output-Octets=42;;
date=lun, 22 ene 2018 18:39:43.968;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=428;;Acct-Output-Octets=143;;
date=lun, 22 ene 2018 18:39:44.039;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=519;;Acct-Output-Octets=294;;
date=lun, 22 ene 2018 18:39:46.276;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=727;;Acct-Output-Octets=535;;
date=lun, 22 ene 2018 18:39:46.126;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=810;;Acct-Output-Octets=694;;
date=lun, 22 ene 2018 18:39:43.908;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=888;;Acct-Output-Octets=848;;
date=lun, 22 ene 2018 18:39:41.839;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=1007;;Acct-Output-Octets=1078;;
date=lun, 22 ene 2018 18:39:42.163;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=1127;;Acct-Output-Octets=1323;;
date=lun, 22 ene 2018 18:39:43.395;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=1367;;Acct-Output-Octets=1543;;
date=lun, 22 ene 2018 18:39:44.430;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=1487;;Acct-Output-Octets=1656;;
date=lun, 22 ene 2018 18:39:44.760;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Interim-Update;;Acct-Input-Octets=1529;;Acct-Output-Octets=1688;;
date=lun, 22 ene 2018 18:39:43.329;;User-Name=User_703;;NAS-Identifier=NAS_0;;Acct-Status-Type=Stop;;Acct-Input-Octets=1561;;Acct-Output-Octets=1746;;

我们注意到时间戳的顺序不正确;我认为这是因为多个线程同时写入这个文件。我们想要的是这些行严格按照时间戳排序(写入时),但是每个用户的 START 应该排在第一位,然后是 UPDATE,最后是 STOP,就像一个真实的日志。

Java同步的话题一直是个痛点。如果有人擅长这个,请帮忙指出这里应该锁定哪个块或什么object。如果有一些好的参考,更好;同时我会在家里再次研究这个..

感谢所有评论,由于您的帮助,我在我的代码中发现了一些问题并修复了它们。 线程不在 OP 中,但在这个答案中我有。

从结果来看,我认为:

  • 当访问同一个文件时,写入行是原子性的:一旦某个线程占用了文件,其他线程不能干涉,直到所有者完成他的行写入,因为none 的行由来自两个线程的内容组成。
  • 当一个线程完成一行时,它会释放文件,其他等待的线程可以立即获取文件并写入他的行。 这里是比赛,但是没有条件比赛,导致不一致。

所以我猜 FileWriter 是线程安全的?

最终的工作代码如下:

  • 创建多个Runnable写入同一个文件
  • 每次Runnable打开文件写一行,立即做一个flush(),没有close()。要写入的日期是当前时间。在每次写作之间我添加了一些间隔 Thread.sleep(xxx).

现在我使用4个launch参数:

100
10
30
D:\temp\radius-simulator.log

class 现在变成:

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
 * A generator simulative of Radius log. Generate lines of log and save to a file. 
 * The parameters expected, in order, are: 
 *  - Number of users, integer(will be number of threads running simultaneously)
 *  - Number of NAS, integer
 *  - Number of seconds to sleep between the previous Stop and the next Start of each thread, integer
 *  - The full path of log file, string (the app in Windows has right to create file, but not the parent folder,
 *      so be sure to create the parent folder first)
 *  
 *
 */
public class RadiusSimulator implements Runnable {
    private static File log;
    private static volatile Map<String, String> user_nas; //ensure the newest value read from main heap; use volatile because we read but not write.
    private static final SimpleDateFormat fmt_in = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss.SSS");

    /* fields in tuple */
    private static final String FIELD_DATE = "date";
    private static final String FIELD_USERNAME = "User-Name";
    private static final String FIELD_NAS_IDENTIFIER = "NAS-Identifier";
    private static final String FIELD_METHOD = "Acct-Status-Type";
    private static final String FIELD_BALANCE_INPUT = "Acct-Input-Octets";
    private static final String FIELD_BALANCE_OUTPUT = "Acct-Output-Octets";

    /* methods */
    private static final String METHOD_START = "Start";
    private static final String METHOD_UPDATE = "Interim-Update";
    private static final String METHOD_STOP = "Stop";

    /* parameters */
    private static int numUsers;
    private static int numNAS;
    private static int sleepTime;

    private String username;
    private String nas;
    private String method;
    private String result;
    private int input;
    private int output;

    private FileWriter writer;
    private int status;
    private boolean stop;
    private String name;


    public RadiusSimulator(String name) {
        this.name = name;
        this.method = METHOD_START;
        status = 0;
        stop = false;
        // just open the file without appending, to clear content
        try {
            writer = new FileWriter(log, false);
            writer.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }


    public static void main(String[] args) {
        try {
            System.out.println("/**\r\n" + 
                    " * A simulated generator of Radius log. Generate lines of log and save to a file. \r\n" + 
                    " * The parameters expected, in order, are: \r\n" + 
                    " *  - Number of users, integer(will be number of threads running simultaneously)\r\n" + 
                    " *  - Number of NAS, integer\r\n" + 
                    " *  - Number of maximum of seconds to sleep between the previous Stop and the next Start of each thread, integer(>60)\r\n" + 
                    " *  - The full path of log file, string (be sure to create the parent folder first)\r\n" + 
                    " *  \r\n" + 
                    " * (require Java 1.7 or above) \r\n\"" +
                    " *  \r\n" + 
                    " *\r\n" + 
                    " */");

            System.out.println("Initializing with parameters: number of users: " + args[0] + ", number of NAS: " + args[1]);
            numUsers = Integer.parseInt(args[0]);
            numNAS = Integer.parseInt(args[1]);
            sleepTime = Integer.parseInt(args[2]);
            log = new File(args[3]);
            for (int i=0; i<numUsers; i++) {
                new Thread(new RadiusSimulator("User_" + i)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }



    }

    private int generateRandom(int min, int max) {
        if (max < min) max = min;
        int randomNum = ThreadLocalRandom.current().nextInt(min, max + 1);
        return randomNum;
    }

    private static String concat(String a, String b) {
        return a + "=" + b + ";;";
    }

    private void formLine() throws IOException {
        result += concat(FIELD_DATE, fmt_in.format(new Date())); //print always the date of now
        result += concat(FIELD_USERNAME, name);
        result += concat(FIELD_NAS_IDENTIFIER, nas);
        result += concat(FIELD_METHOD, method);
        result += concat(FIELD_BALANCE_INPUT, String.valueOf(input));
        result += concat(FIELD_BALANCE_OUTPUT, String.valueOf(output));
        result += System.lineSeparator();
    }
    @Override
    public void run() {
        try {
            writer = new FileWriter(log, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }
//        while (!stop) {
        while (true) {
            try {
                Thread.sleep(generateRandom(0, 3000));
                if (status == 0) {
                    // stage 1
                    int indexUser = generateRandom(0, numUsers-1);
                    username = "User_" + String.valueOf(indexUser);
                    int indexNas = generateRandom(0, numNAS-1);
                    nas = "NAS_" + String.valueOf(indexNas);
                    user_nas = new HashMap<String, String>();
                    result = "";
                    input = 0;
                    output = 0;
                    //first line
                    user_nas.put(username, nas);
                    formLine();
                    System.out.println(result);
                    writer.write(result);
                    writer.flush();
                    status ++; //next stage
                    result = "";
                } else if (status == 1) {
                    method = METHOD_UPDATE;
                    input += generateRandom(0, 400);
                    output += generateRandom(0, 500);
                    formLine();
                    System.out.println(result);
                    writer.write(result);
                    writer.flush();
                    // 1 out of 10 of probability to enter stage 3
                    if (generateRandom(0, 10) == 9) {
                        status = 2;
                    }
                    result = "";
                } else if (status == 2) {
                    method = METHOD_STOP;
                    input += generateRandom(0, 400);
                    output += generateRandom(0, 500);
                    formLine();
                    System.out.println(result);
                    /* write inmediately after constructing the line */
                    writer.write(result);
                    writer.flush();
                    user_nas.remove(username);
                    input = 0;
                    output = 0;
//                    stop = true;
                    result = "";
                    status = 0; //go back to stage 1
                    Thread.sleep(generateRandom(60*1000, sleepTime*1000));
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}