user2818430 user2818430 - 14 days ago 10
C# Question

c# multi threading process large file lines in batches of 100

I have a file with 500.000.000 lines.

The lines are string of max 10 characters.

How can I process this file using multi threading and in batches of 100?

Answer

Using additional libraries is not required if you use Parallel.ForEach from built-in TPL and write a couple of enumerators (listed below). Your code can look like this:

using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt")))
{
    Parallel.ForEach(
        input.ReadLines().TakeChunks(100),
        new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
        batchOfLines => {
            DoMyProcessing(batchOfLines);
        });
}

for this to work, you need a couple of extension methods on IEnumerable<T> and a couple of enumerators, defined as follows:

public static class EnumerableExtensions
{
    public static IEnumerable<string> ReadLines(this StreamReader input)
    {
        return new LineReadingEnumerable(input);
    }

    public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length)
    {
        return new ChunkingEnumerable<T>(source, length);
    }

    public class LineReadingEnumerable : IEnumerable<string>
    {
        private readonly StreamReader _input;

        public LineReadingEnumerable(StreamReader input)
        {
            _input = input;
        }
        public IEnumerator<string> GetEnumerator()
        {
            return new LineReadingEnumerator(_input);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }

    public class LineReadingEnumerator : IEnumerator<string>
    {
        private readonly StreamReader _input;
        private string _current;

        public LineReadingEnumerator(StreamReader input)
        {
            _input = input;
        }
        public void Dispose()
        {
            _input.Dispose();
        }
        public bool MoveNext()
        {
            _current = _input.ReadLine();
            return (_current != null);
        }
        public void Reset()
        {
            throw new NotSupportedException();
        }
        public string Current
        {
            get { return _current; }
        }
        object IEnumerator.Current
        {
            get { return _current; }
        }
    }

    public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>>
    {
        private readonly IEnumerable<T> _inner;
        private readonly int _length;

        public ChunkingEnumerable(IEnumerable<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public IEnumerator<IReadOnlyList<T>> GetEnumerator()
        {
            return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length);
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
    }

    public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>>
    {
        private readonly IEnumerator<T> _inner;
        private readonly int _length;
        private IReadOnlyList<T> _current;
        private bool _endOfInner;

        public ChunkingEnumerator(IEnumerator<T> inner, int length)
        {
            _inner = inner;
            _length = length;
        }
        public void Dispose()
        {
            _inner.Dispose();
            _current = null;
        }
        public bool MoveNext()
        {
            var currentBuffer = new List<T>();

            while (currentBuffer.Count < _length && !_endOfInner)
            {
                if (!_inner.MoveNext())
                {
                    _endOfInner = true;
                    break;
                }

                currentBuffer.Add(_inner.Current);
            }

            if (currentBuffer.Count > 0)
            {
                _current = currentBuffer;
                return true;
            }

            _current = null;
            return false;
        }
        public void Reset()
        {
            _inner.Reset();
            _current = null;
            _endOfInner = false;
        }
        public IReadOnlyList<T> Current
        {
            get
            {
                if (_current != null)
                {
                    return _current;
                }

                throw new InvalidOperationException();
            }
        }
        object IEnumerator.Current
        {
            get
            {
                return this.Current;
            }
        }
    }
}