Sunday, 30 June 2013

Fun with Concurrent Collections...

Recently, I was writing a program which used a collection which was accessed by many threads. My obvious choice was to use one of the .net thread safe collections. I chose ConcurrentBag and stumbled upon something. In-fact things could easily go wrong if you do not have proper understanding. I was basically using Concat operator to add collections returned from my threads. But the resulting bag was not showing up any item. Let's play around this little thingy and see how it works. Let us start with the following self contained program:-


using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MyConsoleApplication
{
    public class Trade
    {
        public string TradeId { get; set; }
        public string Ticker { get; set; }
        public double Price { get; set; }
    }

    public class CSVLoader
    {
        public string path { get; set; }

        public List<Trade> LoadBlotter()
        {
            var trades = new List<Trade>();

            using (var reader = new StreamReader(File.OpenRead(path)))
            {
                while (!reader.EndOfStream)
                {
                    var tline = reader.ReadLine().Split(',');
                    Trade t = new Trade() { TradeId = tline[0], Ticker = tline[1], Price = Convert.ToDouble(tline[2]) };
                    trades.Add(t);
                }
            }

            return trades;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            List<CSVLoader> loaders = new List<CSVLoader>();
            loaders.Add(new CSVLoader { path = "Blotters\\blotter1.txt"});
            loaders.Add(new CSVLoader { path = "Blotters\\blotter2.txt" });
            loaders.Add(new CSVLoader { path = "Blotters\\blotter3.txt" });

            //List to store resultant trades
            List<Trade> alltrades = new List<Trade>();
            loaders.ForEach(l => alltrades.AddRange(l.LoadBlotter()));

            Console.WriteLine("Total trades read="+alltrades.Count);

            Console.ReadLine();

        }
    }
}




Blotter files are nothing but a bunch of dummy trades in txt file. I copy paste the following lines multiple times:-

1,AAAA,0.8
2,BBBB,0.95


Now we have a working program which runs great and load trades, but not in parallel. Now lets change the code from:-

            loaders.ForEach(l => alltrades.AddRange(l.LoadBlotter()));

To

            loaders.AsParallel.ForAll(l => alltrades.AddRange(l.LoadBlotter()));

Bingo!! It runs great (if you are lucky). But try to run it a few more times, it will surprise you with :-


Now what's that. We somehow knew that something was going to blow away as we never used a thread safe collection. But the error says index out of range!!! WTF??

This is because, unsafe collections are allocated size as and when required in chunks of exponents of two i.e. 2,4,8,16,32 etc.. whenever current size gets exhausted. In this case a thread came in, tried to allocate more memory but before that another thread tried to use it, which gets this exception.


Now What?? Use Concurrent Bag! But lets see how it works.

Lets include:-
using System.Collections.Concurrent;

and change the code from
List<Trade> alltrades = new List<Trade>();
To
ConcurrentBag<Trade> alltrades = new ConcurrentBag<Trade>();


Now the immediate problem we see is that ConcurrentBag doesn't have AddRange function, so I thought I would use Concat rather. Changed it to concat & to my surprise, what happened was:-



So it doesn't work, this is because theoretically only sequences can be concatenated!

Now what are our options?? Union doesn't work either. My way to cope up with this problem is describes as follows.


Change LoadBlotter function to accept ConcurrentBag


        public void LoadBlotter(ConcurrentBag<Trade> alltrades)
        {


            using (var reader = new StreamReader(File.OpenRead(path)))
            {
                while (!reader.EndOfStream)
                {                  
                    var tline = reader.ReadLine().Split(',');
                    Trade t = new Trade() { TradeId = tline[0], Ticker = tline[1], Price = Convert.ToDouble(tline[2]) };
                    alltrades.Add(t);
                }
            }
        }



and in your calling code:-
            ConcurrentBag<Trade> alltrades = new ConcurrentBag<Trade>();
            loaders.AsParallel().ForAll(l => l.LoadBlotter(alltrades));



So basically, you are passing the bag as a reference and threads will add to them one at a time. This works great and is probably the correct way of using a ConcurrentCollection.

There is one more collection which can be used here that is BlockingCollection. This is similar to ConcurrentBag in a way that both of them implement IPropducerConsumerCollection<T>. The difference between the two is however very important.


ConcurrentBag<T> Vs BlockingCollection<T>

Let's see what is the definition of BlockingCollection:-
BlockingCollection<T> is used as a wrapper for an IProducerConsumerCollection<T> instance, allowing removal attempts from the collection to block until data is available to be removed. Similarly, a BlockingCollection<T> can be created to enforce an upper-bound on the number of data elements allowed in the IProducerConsumerCollection<T

So this makes it clear that if we are just adding to collection and will need the collection only after we have added everything, we would rather use ConcurrentBag as it will be faster. BlockingCollection will only be required when we want readers to block unless the item is found. In other words, in circumstances when we know that some item is about to come and we can use them concurrently.


Any more suggestions and explanations regarding above behavior of concurrent collection is welcome!



No comments:

Post a Comment