This is just a
code snippet which depicts how a multi-threaded application works.
There is N number
of files; we need to perform same action on all the files. This code
depicts how multiple files are read on simultaneous threads,
processing and writing it to new files
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Writer; import java.util.List; public class FileProcessorThread implements Runnable { /** The constant DONE. */ private static final String CONSTANT_DONE = "DONE".trim(); /** The file details list. */ private List<FileDeatils> fileDeatilsList; /** * The constructor for the thread. * @param fileDeatilsList */ public FileProcessorThread(List<FileDeatils> fileDeatilsList) { super(); this.fileDeatilsList = fileDeatilsList; } @Override public void run() { Boolean stopIndicator = false; FileDeatils fileToProcess; while (!stopIndicator) { fileToProcess = getUnProcessedFile(fileDeatilsList); if (fileToProcess != null) { lockFile(fileToProcess); readProcessWrite(fileToProcess); } else { // if nothing to process stop indicator would be set to true and Thread will come to END. System.out.println(Thread.currentThread().getName() + "_ Thread Stopping.."); stopIndicator = true; } } } /** * to check processed or not. * @param fileDeatils * @return boolean */ private boolean isProcessedChkViaFlg(FileDeatils fileDeatils) { boolean isProcessed = false; synchronized (fileDeatils) { if (fileDeatils.isProcessed()) { isProcessed = true; } Thread t = Thread.currentThread(); System.out.println("Scan to verify processed record " + t.getName() + " Processed " + isProcessed); } return isProcessed; } /** get unprocessed file * @param fileDeatilsList * @return */ private FileDeatils getUnProcessedFile(List<FileDeatils> fileDeatilsList) { FileDeatils fileToProcess = null; for (FileDeatils fileDeatils : fileDeatilsList) { if (!isProcessedChkViaFlg(fileDeatils)) { fileToProcess = fileDeatils; break; } } return fileToProcess; } /** * Lock the file by putting a DONE at the end of the file. * @param fileDeatils */ public void lockFile(FileDeatils fileDeatils) { try { synchronized (fileDeatils) { if (!fileDeatils.isProcessed()) { Writer inPutFileWriter = new BufferedWriter(new FileWriter( fileDeatils.getInputFileName(), true)); inPutFileWriter.append(CONSTANT_DONE+"_"+Thread.currentThread().getName()); fileDeatils.setProcessed(true); inPutFileWriter.close(); } } } catch (IOException e) { e.printStackTrace(); } } /** * Read the file and process. * @param fileDeatils * @return */ public boolean readProcessWrite(FileDeatils fileDeatils) { boolean processed = true; File inputFile = new File(fileDeatils.getInputFileName()); File outPutFile = new File(fileDeatils.getOutPutFileName()); Writer output = null; BufferedReader input = null; try { output = new BufferedWriter(new FileWriter(outPutFile)); input = new BufferedReader(new FileReader(inputFile)); String line = null; while ((line = input.readLine()) != null) { if (line.contains("X")) { //Thread t = Thread.currentThread(); //System.out.println(t.getName()+"_"+line); line = line.replace("X", "XXXXXXXXXXXXXX"); } output.write(line); output.write("n"); } } catch (IOException e) { e.printStackTrace(); processed = false; } finally { try { output.close(); input.close(); } catch (IOException e) { e.printStackTrace(); processed = false; } } return processed; } } import java.util.List; public class ControllerThread implements Runnable{ /** The file details list. */ private List<FileDeatils> fileDeatilsList; /** To name the new thread. */ private int count = 0; /** All files are processed or not. */ private boolean isProcessCompleted = false; /** * The Constructor. * @param fileDeatilsList */ public ControllerThread(List<FileDeatils> fileDeatilsList) { super(); this.fileDeatilsList = fileDeatilsList; } @Override public void run() { while (!isProcessCompleted) { isProcessCompleted = isProcessingCompleted(); /* Allow thread to sleep for 1.2 seconds */ if (!isProcessCompleted) { try { Thread.sleep(1250); } catch (InterruptedException e) { e.printStackTrace(); } if (noOfProcessedFiles() < 20) { System.out.println("File processed is less than 20 so starting one more thread"); StringBuilder threadName = new StringBuilder(); threadName.append("File Copy Adhoc_ThreadIndex:"); threadName.append(count++); ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread"); FileProcessorThread fileProcessorThread = new FileProcessorThread(fileDeatilsList); Thread thread = new Thread(threadGroup, fileProcessorThread, threadName.toString()); thread.start(); } } else { System.out.println("Stopping Controller Thread !!! ..."); } } } /** * to find the processing completed. * @return */ private boolean isProcessingCompleted() { boolean isCompleted = true; synchronized (fileDeatilsList) { for (FileDeatils fileDeatils : fileDeatilsList) { if (!fileDeatils.isProcessed()) { isCompleted = false; break; } } } return isCompleted; } /** * to find the processing completed. * @return */ private int noOfProcessedFiles() { int completedListCount = 0; synchronized (fileDeatilsList) { for (FileDeatils fileDeatils : fileDeatilsList) { if (fileDeatils.isProcessed()) { completedListCount++; } } } return completedListCount; } } public class FileDeatils { private String inputFileName; private String outPutFileName; private boolean processed; public String getInputFileName() { return inputFileName; } public void setInputFileName(String inputFileName) { this.inputFileName = inputFileName; } public String getOutPutFileName() { return outPutFileName; } public void setOutPutFileName(String outPutFileName) { this.outPutFileName = outPutFileName; } public boolean isProcessed() { return processed; } public void setProcessed(boolean processed) { this.processed = processed; } } /* the initiator which kiks of the action */ import java.util.List; public class Initiator { /** * To initiate the action. * @param noOfConcurrentOperations * @param fileDeatilsList */ public void initiate(int noOfConcurrentOperations, List<FileDeatils> fileDeatilsList) { for (int i = 1; i < noOfConcurrentOperations; i++) { StringBuilder threadName = new StringBuilder(); threadName.append("File Copy_ThreadIndex:"); threadName.append(i); ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread"); FileProcessorThread fileProcessorThread = new FileProcessorThread(fileDeatilsList); Thread thread = new Thread(threadGroup, fileProcessorThread, threadName.toString()); thread.start(); } ControllerThread controllerThread = new ControllerThread(fileDeatilsList); StringBuilder threadName = new StringBuilder(); threadName.append("File Copy Control Thread_ThreadIndex "); ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread"); Thread thread = new Thread(threadGroup, controllerThread, threadName.toString()); thread.start(); } } import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.Writer; import java.util.ArrayList; import java.util.List; import java.util.Random; public class Init { public static void main(String[] args) { System.out.println("Started.. creating input file to test"); Init i = new Init(); List<FileDeatils> fileDeatilsList = null; try { fileDeatilsList = i.createInputOutPut(); } catch (IOException e) { e.printStackTrace(); } writeToFile(fileDeatilsList); new Initiator().initiate(2, fileDeatilsList); } /** * just to create some data for testing, create 22 input files to c:InputTest Folder * @return * @throws IOException */ private List<FileDeatils> createInputOutPut() throws IOException { List<FileDeatils> fileDeatilsList = new ArrayList<FileDeatils>(); int noOfInputFiles = 23; for (int i = 1; i < noOfInputFiles; i++) { FileDeatils fileDeatils = new FileDeatils(); String inputFileName = "C:\InputTest\Input\INPUT" + i + ".txt"; String outPutFileName = "C:\InputTest\Output\OUTPUT_" + i + ".txt"; fileDeatils.setInputFileName(inputFileName); fileDeatils.setOutPutFileName(outPutFileName); fileDeatilsList.add(fileDeatils); } return fileDeatilsList; } /** * this writes some data to the test input file. * * @param fileDeatilsList */ public static void writeToFile(List<FileDeatils> fileDeatilsList) { String INPUT_STRING = "INPUT"; int fileCount = 1; for (FileDeatils fileDeatils : fileDeatilsList) { File inputFile = new File(fileDeatils.getInputFileName()); Writer output = null; int count = 1; try { output = new BufferedWriter(new FileWriter(inputFile)); while (true) { output.write(INPUT_STRING + "_" + fileCount + "_X"); output.write("n"); count++; if ((count + (int) new Random().nextInt(5000)) > 1000000) { break; } } output.close(); fileCount++; } catch (IOException e) { e.printStackTrace(); } } } }