abatishchev abatishchev - 3 months ago 32
C# Question

How properly to throttle access to DocumentDb from WebJobs

I have an Azure WebKob with blob and queue triggers to save data to Azure DocumentDb.

From time to time I'm getting an error:


Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}


Currently I throttle requests using this code. A WebJob function:

public async Task ParseCategoriesFromCsv(...)
{
double find = 2.23, add = 5.9, replace = 10.67;
double requestCharge = Math.Round(find + Math.Max(add, replace));

await categoryProvider.SaveCategories(requestCharge , categories);
}


Category provider to manipulate document db client:

public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));

var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx

var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});

return await Task.WhenAll(documents.Select(async d =>
await scheduler.ScheduleTask(
() => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}


Task scheduler to throttle/measure/synchronize requests:

private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;

public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
_observable = _requests.Select(i => Observable.Empty<Action>()
.Delay(requestDelay)
.StartWith(i))
.Concat()
.ObserveOn(scheduler)
.Subscribe(action => action());
}

public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}


So it's basically a number of constants from
ResourceResponse<Document>.RequestCharge
but:


  • When I have 1 queue triggered it works fine but when 8 queue it throws an error.

  • If increase request charge in 8 times then 8 queues work fine but just 1 works 8 times slower than it could.



What a throttling/measuring/synchronization mechanism could work here well?

Answer

When getting a 429 (Request rate too large) the response tells you how long to wait. There is a header x-ms-retry-after. This has a value. Wait for that time period in ms.

catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
    DocumentClientException dce = (DocumentClientException)ex.InnerException;
    switch ((int)dce.StatusCode)
    {
        case 429:
            Thread.Sleep(dce.RetryAfter);
            break;

         default:
             Console.WriteLine("  Failed: {0}", ex.InnerException.Message);
             throw;
     }                    
}
Comments