SuperJMN SuperJMN - 12 days ago 5
C# Question

Observable.Using and async streams getting corrupted data

I have a flow of streams whose goal is to calculate a simple "checksum" of the contents in a set of .zip files.

To do it, I have set an observable that:


  1. takes all files in a given folder

  2. reads the contents of each file (reading as a
    ZipArchive
    )

  3. for each entry in each file, performs the calculation of the checksum



To illustrate it, I have created this example:

NOTICE the usage of
AsyncContext.Run
(http://stackoverflow.com/a/9212343/1025407) to make the
Main
method await
GetChecksum
since it's a Console Application

namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;


class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}

private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));

return await bytes.ToList();
}

private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}

private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
}

private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}


Running the application, I get all kinds of errors:


'System.IO.InvalidDataException': A local file header is corrupt.
'System.NotSupportedException': Stream does not support reading.
'System.ObjectDisposedException' : Cannot access a disposed object.
'System.IO.InvalidDataException' : Block length does not match with its complement.


What am I doing wrong?

Is there a problem with the observable itself or is it because
ZipArchive
isn't thread-safe? If it isn't, how do I make the code work?

Answer

There appears to be nothing "Rx" about your problem.

If you mod the whole thing to an imperative set of loops it works fine

private static async Task<IList<byte>> GetChecksums()
{
    var bytes = new List<byte>();
    foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
    {
        using (var archive = CreateZipArchive(path))
        {
            foreach (var entry in archive.Entries)
            {
                using (var stream = entry.Open())
                {
                    var checksum = await CalculateChecksum(stream, entry.Length);
                    bytes.Add(checksum);
                }
            }
        }
    }

    return bytes;
}

So I would imagine you have a set of race conditions (concurrency) and/or out of order disposal issues.