This is just a
code snippet which depicts how a multi-threaded application works.
There is N number
of files; we need to perform same action on all the files. This code
depicts how multiple files are read on simultaneous threads,
processing and writing it to new files
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.List;
public class FileProcessorThread implements Runnable {
/** The constant DONE. */
private static final String CONSTANT_DONE = "DONE".trim();
/** The file details list. */
private List<FileDeatils> fileDeatilsList;
/**
* The constructor for the thread.
* @param fileDeatilsList
*/
public FileProcessorThread(List<FileDeatils> fileDeatilsList) {
super();
this.fileDeatilsList = fileDeatilsList;
}
@Override
public void run() {
Boolean stopIndicator = false;
FileDeatils fileToProcess;
while (!stopIndicator) {
fileToProcess = getUnProcessedFile(fileDeatilsList);
if (fileToProcess != null) {
lockFile(fileToProcess);
readProcessWrite(fileToProcess);
} else {
// if nothing to process stop indicator would be set to true and Thread will come to END.
System.out.println(Thread.currentThread().getName() + "_ Thread Stopping..");
stopIndicator = true;
}
}
}
/**
* to check processed or not.
* @param fileDeatils
* @return boolean
*/
private boolean isProcessedChkViaFlg(FileDeatils fileDeatils) {
boolean isProcessed = false;
synchronized (fileDeatils) {
if (fileDeatils.isProcessed()) {
isProcessed = true;
}
Thread t = Thread.currentThread();
System.out.println("Scan to verify processed record " + t.getName()
+ " Processed " + isProcessed);
}
return isProcessed;
}
/** get unprocessed file
* @param fileDeatilsList
* @return
*/
private FileDeatils getUnProcessedFile(List<FileDeatils> fileDeatilsList) {
FileDeatils fileToProcess = null;
for (FileDeatils fileDeatils : fileDeatilsList) {
if (!isProcessedChkViaFlg(fileDeatils)) {
fileToProcess = fileDeatils;
break;
}
}
return fileToProcess;
}
/**
* Lock the file by putting a DONE at the end of the file.
* @param fileDeatils
*/
public void lockFile(FileDeatils fileDeatils) {
try {
synchronized (fileDeatils) {
if (!fileDeatils.isProcessed()) {
Writer inPutFileWriter = new BufferedWriter(new FileWriter(
fileDeatils.getInputFileName(), true));
inPutFileWriter.append(CONSTANT_DONE+"_"+Thread.currentThread().getName());
fileDeatils.setProcessed(true);
inPutFileWriter.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Read the file and process.
* @param fileDeatils
* @return
*/
public boolean readProcessWrite(FileDeatils fileDeatils) {
boolean processed = true;
File inputFile = new File(fileDeatils.getInputFileName());
File outPutFile = new File(fileDeatils.getOutPutFileName());
Writer output = null;
BufferedReader input = null;
try {
output = new BufferedWriter(new FileWriter(outPutFile));
input = new BufferedReader(new FileReader(inputFile));
String line = null;
while ((line = input.readLine()) != null) {
if (line.contains("X")) {
//Thread t = Thread.currentThread();
//System.out.println(t.getName()+"_"+line);
line = line.replace("X", "XXXXXXXXXXXXXX");
}
output.write(line);
output.write("n");
}
} catch (IOException e) {
e.printStackTrace();
processed = false;
} finally {
try {
output.close();
input.close();
} catch (IOException e) {
e.printStackTrace();
processed = false;
}
}
return processed;
}
}
import java.util.List;
public class ControllerThread implements Runnable{
/** The file details list. */
private List<FileDeatils> fileDeatilsList;
/** To name the new thread. */
private int count = 0;
/** All files are processed or not. */
private boolean isProcessCompleted = false;
/**
* The Constructor.
* @param fileDeatilsList
*/
public ControllerThread(List<FileDeatils> fileDeatilsList) {
super();
this.fileDeatilsList = fileDeatilsList;
}
@Override
public void run() {
while (!isProcessCompleted) {
isProcessCompleted = isProcessingCompleted();
/* Allow thread to sleep for 1.2 seconds */
if (!isProcessCompleted) {
try {
Thread.sleep(1250);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (noOfProcessedFiles() < 20) {
System.out.println("File processed is less than 20 so starting one more thread");
StringBuilder threadName = new StringBuilder();
threadName.append("File Copy Adhoc_ThreadIndex:");
threadName.append(count++);
ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread");
FileProcessorThread fileProcessorThread = new FileProcessorThread(fileDeatilsList);
Thread thread = new Thread(threadGroup, fileProcessorThread, threadName.toString());
thread.start();
}
} else {
System.out.println("Stopping Controller Thread !!! ...");
}
}
}
/**
* to find the processing completed.
* @return
*/
private boolean isProcessingCompleted() {
boolean isCompleted = true;
synchronized (fileDeatilsList) {
for (FileDeatils fileDeatils : fileDeatilsList) {
if (!fileDeatils.isProcessed()) {
isCompleted = false;
break;
}
}
}
return isCompleted;
}
/**
* to find the processing completed.
* @return
*/
private int noOfProcessedFiles() {
int completedListCount = 0;
synchronized (fileDeatilsList) {
for (FileDeatils fileDeatils : fileDeatilsList) {
if (fileDeatils.isProcessed()) {
completedListCount++;
}
}
}
return completedListCount;
}
}
public class FileDeatils {
private String inputFileName;
private String outPutFileName;
private boolean processed;
public String getInputFileName() {
return inputFileName;
}
public void setInputFileName(String inputFileName) {
this.inputFileName = inputFileName;
}
public String getOutPutFileName() {
return outPutFileName;
}
public void setOutPutFileName(String outPutFileName) {
this.outPutFileName = outPutFileName;
}
public boolean isProcessed() {
return processed;
}
public void setProcessed(boolean processed) {
this.processed = processed;
}
}
/* the initiator which kiks of the action */
import java.util.List;
public class Initiator {
/**
* To initiate the action.
* @param noOfConcurrentOperations
* @param fileDeatilsList
*/
public void initiate(int noOfConcurrentOperations, List<FileDeatils> fileDeatilsList) {
for (int i = 1; i < noOfConcurrentOperations; i++) {
StringBuilder threadName = new StringBuilder();
threadName.append("File Copy_ThreadIndex:");
threadName.append(i);
ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread");
FileProcessorThread fileProcessorThread = new FileProcessorThread(fileDeatilsList);
Thread thread = new Thread(threadGroup, fileProcessorThread, threadName.toString());
thread.start();
}
ControllerThread controllerThread = new ControllerThread(fileDeatilsList);
StringBuilder threadName = new StringBuilder();
threadName.append("File Copy Control Thread_ThreadIndex ");
ThreadGroup threadGroup = new ThreadGroup("MultipleFileProcessingThread");
Thread thread = new Thread(threadGroup, controllerThread,
threadName.toString());
thread.start();
}
}
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class Init {
public static void main(String[] args) {
System.out.println("Started.. creating input file to test");
Init i = new Init();
List<FileDeatils> fileDeatilsList = null;
try {
fileDeatilsList = i.createInputOutPut();
} catch (IOException e) {
e.printStackTrace();
}
writeToFile(fileDeatilsList);
new Initiator().initiate(2, fileDeatilsList);
}
/**
* just to create some data for testing, create 22 input files to c:InputTest Folder
* @return
* @throws IOException
*/
private List<FileDeatils> createInputOutPut() throws IOException {
List<FileDeatils> fileDeatilsList = new ArrayList<FileDeatils>();
int noOfInputFiles = 23;
for (int i = 1; i < noOfInputFiles; i++) {
FileDeatils fileDeatils = new FileDeatils();
String inputFileName = "C:\InputTest\Input\INPUT" + i + ".txt";
String outPutFileName = "C:\InputTest\Output\OUTPUT_" + i + ".txt";
fileDeatils.setInputFileName(inputFileName);
fileDeatils.setOutPutFileName(outPutFileName);
fileDeatilsList.add(fileDeatils);
}
return fileDeatilsList;
}
/**
* this writes some data to the test input file.
*
* @param fileDeatilsList
*/
public static void writeToFile(List<FileDeatils> fileDeatilsList) {
String INPUT_STRING = "INPUT";
int fileCount = 1;
for (FileDeatils fileDeatils : fileDeatilsList) {
File inputFile = new File(fileDeatils.getInputFileName());
Writer output = null;
int count = 1;
try {
output = new BufferedWriter(new FileWriter(inputFile));
while (true) {
output.write(INPUT_STRING + "_" + fileCount + "_X");
output.write("n");
count++;
if ((count + (int) new Random().nextInt(5000)) > 1000000) {
break;
}
}
output.close();
fileCount++;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}