A Place For Answers

Posts tagged “Multi-Threading

Big Data Analysis Using Regex & Multi-Threading

What would be appropriate for a first post?

A “Hello World” app right? Luckily, an opportunity presented itself when I was tasked with creating a log analyzer. To give a bit of background on the situation, the system is a server with multiple processors and around 1700 logs, each weighing 10MB.

Like any good little developer, my first mission was to split the problem to smaller bits. The first was how would I recognize the wanted string, well to my aid came – regular expressions or for short regex, to those who don’t know what it is, regex is a standardized way to find and match strings. Almost all programming languages offer regex in some way or form, and there’s a reason why? Cause it’s a great tool.

Getting back to our walk-through, let’s analyze visually a couple of lines from the logs:

20/8/13,13:58:09|17|Admin|DAL.GetEntity|ID='1027234'|Start
20/8/13,13:58:17|26|Admin|DAL.GetEntity|ID='1073947'|Start
20/8/13,13:58:52|26|Admin|DAL.GetEntity|ID='1073947'|End|Success
20/8/13,13:59:03|17|Admin|DAL.GetEntity|ID='1027234'|End|Failure

Now, you can see that there is a pattern going on here, what we want to do is match the function “DAL.GetEntity” at it’s “End” and capture the “ID” and “Success/Failure”. Here’s how I did it with regex:

int successes = 0;
var regex = new Regex(@".*DAL\.GetEntity.*(?<id>\d+).*End\|(?<success>.*)");
String[] lines = File.ReadAllLines(fileName);

foreach (String line in lines)
{
	if (regex.IsMatch(line))
	{
		var matches = regex.Match(line).Groups;
		Boolean success = matches["success"].Value.ToString() == "Success" ? true : false;
		if(success) { successes++; }
	}
}

Now, what do we have here?
The definition of regex, where we describe the pattern, let’s go over some of it:
. – Matches any single character except a newline character.
* – Matches the preceding character zero or more times.
(pattern) – Matches pattern and remembers the match. The matched substring can be retrieved from the resulting Matches collection.
(?<name>pattern) – Matches pattern and gives the match a name.
\d – Matches a digit character. Equivalent to [0-9].
+ – Matches the preceding character one or more times.

More of these can be found here.
While going over the lines, if we find a match, we save the named captured groups of the regex, check it’s values and that’s that – we just analyzed a file… 1 down, ~1699 to go.

Reading all the files synchronously would take a lot of time, by my calculations about a day’s worth of running, that’s where the asynchronously road would shine, having multiple processors sharing the load. The tactic here is basically to perform the analysis above on each file and writing it’s results to another log. To achieve that goal, we’ll create a function that will be in charge of collecting all the file names to be analyzed, open a thread for each of them and letting that thread do all the heavy lifting, here’s how to do it:

String _logDirPath;
String _logName;
int _maxThreads;
Object _lock;
CountdownEvent countdownEvent;
long _successes;
 
public Boolean AnalyzeAllFiles(String logDirectoryPath, String logName = "AnalysisLog", int threadCount = 8)
{
    _successes = 0;
    _logName = logName;
    _logDirPath = logDirectoryPath;
    _maxThreads = threadCount < 1 ? 1 : threadCount;
    _lock = new Object();
    Boolean complete = false;
    try
    {
        List<String> fileNames = Directory.EnumerateFiles(_logDirPath, "log*.txt").ToList();
        countdownEvent = new CountdownEvent(fileNames.Count);
        ThreadPool.SetMaxThreads(_maxThreads, _maxThreads);
        foreach (String filePath in fileNames)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyzeFile), (Object)filePath);
        }
        countdownEvent.Wait();
        lock (_lock)
        {
            using (StreamWriter file = File.AppendText(_logDirPath + "\\" + _logName + ".log"))
            {
                file.WriteLine(String.Format("Successes: {0}", _successes));
            }
        }
        complete = true;
    }
    catch (Exception e)
    {
        Debug.Write(String.Format("{0}", String.IsNullOrEmpty(e.InnerException.Message) ? e.Message : e.InnerException.Message), "LogAnalyzer");
    }
    return complete;
}

Let’s go over a couple of points in the code:

  • In multi-threading we need to lock down resources that are shared amongst multiple threads, so no other thread would try to change that resource, now there’s no need to do this on primitive objects, but a file is definitely not a primitive object that’s why we’re not gonna lock it down but an object that is synced with it.
  • The CountdownEvent class is great cause it let’s us wait and count down till all the threads are complete, very simple and elegant.
  • Using the Threadpool allows us to queue up threads in conjunction with setting the maximum amount of active threads, if we didn’t use it, what would happen is all threads would’ve been created and 1700 threads each polling 10MB files would kill a lot of computers.
  • Queuing up a thread involves instructing that thread which function he needs to perform and we also pass it as a parameter the name of the log file it needs to analyze in the form of an object.

All that’s left is to see how we can combine the regex into the working bee that is the thread:

private void AnalyzeFile(Object p)
{
	String fileName = (String)p;
    StringBuilder results = new StringBuilder();
    try
    {
        String[] lines = File.ReadAllLines(fileName);
    	 
    	foreach (String line in lines)
    	{
    		if (_regex.IsMatch(line))
    		{
    			var matches = _regex.Match(line).Groups;
    			Boolean success = matches["success"].Value == "Success" ? true : false;
    			if (success)
    			{
    				Interlocked.Increment(ref _successes);
    			}
    			results.Append(matches["id"].Value + " " + success + Environment.NewLine);
    		}
    	}
    	lock (_lock)
    	{
    		using (StreamWriter file = File.AppendText(_logDirPath + "\\" + _logName + ".log"))
    		{
    			file.WriteLine(String.Format("File Processed: {0} at: {1}", fileName, DateTime.Now.ToLocalTime().ToString()));
    			file.Write(results.ToString());
    		}
    	}
    }
    catch (Exception e)
    {
        Debug.Write(String.Format("{0}", String.IsNullOrEmpty(e.InnerException.Message) ? e.Message : e.InnerException.Message), "LogAnalyzer");
    }
	countdownEvent.Signal(); // Signal the thread finished
}

Seems simple enough but there’s a few things to note:

  • We need to cast the object parameter to a string to extract the name of the file to be analyzed.
  • Remember I said we didn’t have to lock up primitive objects to change them in a multi-threaded environment? well that wasn’t very exact. We use the Interlocked class to perform the increment as an atomic function, that can’t be interrupted by another thread.
  • Writing to the file is also done with multi-threading in mind, see how we used a StringBuilder to prepare all the data? that was so that the actual writing would take less time, making the lock on the file shorter, allowing other threads to write to the file sooner.
  • In the end we signal to the CountdownEvent: “hey papa process, I’m done here.” what it does behind the scenes it basically decrements the CountdownEvent, when it reaches zero the big process then returns to work.

So before you go on ahead, I want to give you a bit of a comparison:

  • Single-threaded, the process would take about a day’s worth, roughly 24 hours.
  • Multi-threaded with a maximum of 8 active threads, the process finished just a bit over 3 hours.

Here’s the complete code for reference, important to note the code requires .Net Framework 4.0:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
 
namespace LogAnalysis
{
    class LogAnalyzer
    {
        String _logDirPath;
        String _logName;
        int _maxThreads;
        Object _lock;
        CountdownEvent countdownEvent;
        Regex _regex = new Regex(@".*DAL\.GetEntity.*(?<id>\d+).*End\|(?<success>.*)");
        long _successes;
 
        public Boolean AnalyzeAllFiles(String logDirectoryPath, String logName = "AnalysisLog", int threadCount = 8)
        {
            _successes = 0;
            _logName = logName;
            _logDirPath = logDirectoryPath;
            _maxThreads = threadCount < 1 ? 1 : threadCount;
            _lock = new Object();
            Boolean complete = false;
            try
            {
                List<String> fileNames = Directory.EnumerateFiles(_logDirPath, "log*.txt").ToList();
                countdownEvent = new CountdownEvent(fileNames.Count);
                ThreadPool.SetMaxThreads(_maxThreads, _maxThreads);
                foreach (String filePath in fileNames)
                {
                    ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyzeFile), (Object)filePath);
                }
                countdownEvent.Wait(); // Wait until all threads completed
                lock (_lock)
                {
                    using (StreamWriter file = File.AppendText(_logDirPath + "\\" + _logName + ".log"))
                    {
                        file.WriteLine(String.Format("Successes: {0}", _successes));
                    }
                }
                complete = true;
            }
            catch (Exception e)
            {
                Debug.Write(String.Format("{0}", String.IsNullOrEmpty(e.InnerException.Message) ? e.Message : e.InnerException.Message), "LogAnalyzer");
            }
            return complete;
        }
        
        private void AnalyzeFile(Object p)
        {
            String fileName = (String)p;
            StringBuilder results = new StringBuilder();
            try
            {
                String[] lines = File.ReadAllLines(fileName);
                 
                foreach (String line in lines)
                {
                    if (_regex.IsMatch(line))
                    {
                        var matches = _regex.Match(line).Groups;
                        Boolean success = matches["success"].Value == "Success" ? true : false;
                        if (success)
                        {
                            Interlocked.Increment(ref _successes);
                        }
                        results.Append(matches["id"].Value + " " + success + Environment.NewLine);
                    }
                }
                lock (_lock)
                {
                    using (StreamWriter file = File.AppendText(_logDirPath + "\\" + _logName + ".log"))
                    {
                        file.WriteLine(String.Format("File Processed: {0} at: {1}", fileName, DateTime.Now.ToLocalTime().ToString()));
                        file.Write(results.ToString());
                    }
                }
            }
            catch (Exception e)
            {
                Debug.Write(String.Format("{0}", String.IsNullOrEmpty(e.InnerException.Message) ? e.Message : e.InnerException.Message), "LogAnalyzer");
            }
            countdownEvent.Signal(); // Signal the thread finished
        }
    }
}

I hope you learned something from this post, stay tuned for more posts.

Advertisements