user666 user666 - 3 months ago 13
Python Question

Compute first order derivative with MongoDB aggregation framework

Is it possible to calculate a first order derivative using the aggregate framework?

For example, I have the data :

{time_series : [10,20,40,70,110]}


I'm trying to obtain an output like:

{derivative : [10,20,30,40]}

Answer

We can do this using the aggregation framework in MongoDB 3.2 or newer because what we really need is a way to keep tracking of the index of the current and previous element in our array and fortunately starting from MongoDB 3.2 we can use the $unwind operator to deconstruct our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.

From there we have two options. The first is in MongoDB 3.2 and the second in the upcoming release of MongoDB (as of this writing).

Next in the pipeline, we need to $group our documents and use the $push accumulator operator to return an array of sub-documents that look like this:

{
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
        { "value" : 10, "index" : NumberLong(0) },
        { "value" : 20, "index" : NumberLong(1) },
        { "value" : 40, "index" : NumberLong(2) },
        { "value" : 70, "index" : NumberLong(3) },
        { "value" : 110, "index" : NumberLong(4) }
    ]
}

Finally comes the $project stage. In this stage, we need to use the $map operator to apply a series of expression to each element in the the newly computed array in the $group stage.

Here is what is going on inside the $map (see $map as a for loop) in expression:

For each subdocument, we assign the value field to a variable using the $let variable operator. We then subtract it value from the value of the "value" field of the next element in the array.

Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt operator and a simple $addition of the current element's index and 1.

The $subtract expression return a negative value so we need to multiply the value by -1 using the $multiply operator.

We also need to $filter the resulted array because it the last element is None or null. The reason is that when the current element is the last element, $subtract return None because the index of the next element equal the size of the array.

db.collection.aggregate(
    [ 
        { "$unwind": { 
            "path": "$time_series", 
            "includeArrayIndex": "index" 
        }}, 
        { "$group": { 
            "_id": "$_id", 
            "time_series": { 
                "$push": { 
                    "value": "$time_series", 
                    "index": "$index" 
                } 
            } 
        }}, 
        { "$project": { 
            "time_series": { 
                "$filter": { 
                    "input": { 
                        "$map": { 
                            "input": "$time_series",  
                            "as": "el", 
                            "in": { 
                                "$multiply": [ 
                                    { "$subtract": [ 
                                        "$$el.value",  
                                        { "$let": {
                                            "vars": { 
                                                "nextElement": { 
                                                    "$arrayElemAt": [
                                                        "$time_series", 
                                                        { "$add": [
                                                            "$$el.index", 
                                                            1 
                                                        ]}
                                                    ]}
                                                }, 
                                                "in": "$$nextElement.value"
                                            }
                                        }
                                    ]}, 
                                    -1 
                                ]
                            }
                        }
                    }, 
                    "as": "item", 
                    "cond": { "$gte": [ "$$item",  0 ] } 
                }
            }
        }}
    ]
)

In the upcoming version will provide another alternative.

First in the $group stage we return two different arrays. One for the elements and the other one for their indexes then $zip the two arrays as shown here. From their, we simply access each element using integer indexing instead of assigning their value to a variable with $let.

db.collection.aggregate(
    [ 
        { "$unwind": { 
            "path": "$time_series", 
            "includeArrayIndex": "index" 
        }}, 
        { "$group": { 
            "_id": "$_id", 
            "values": { "$push": "$time_series" }, 
            "indexes": { "$push": "$index" } 
        }}, 
        { "$project": { 
            "time_series": { 
                "$filter": { 
                    "input": { 
                        "$map": { 
                            "input": { 
                                "$zip": { 
                                    "inputs": [ 
                                        "$values", 
                                        "$indexes" 
                                    ] 
                                } 
                            },  
                            "as": "el", 
                            "in": { 
                                "$multiply": [
                                    { "$subtract": [ 
                                        { "$arrayElemAt": [ 
                                            "$$el", 
                                            0 
                                        ]}, 
                                        { "$arrayElemAt": [ 
                                            "$values", 
                                            { "$add": [
                                                { "$arrayElemAt": [ 
                                                    "$$el", 
                                                    1 
                                                ]}, 
                                                1
                                            ]}
                                        ]}
                                    ]}, 
                                    -1 
                                ]
                            }
                        }
                    }, 
                    "as": "item", 
                    "cond": { "$gte": [ "$$item", 0 ]  } 
                }
            }
        }}
    ]
)

Note that we could also reverse the array early in the $project stage using $reverse as shown here to avoid using $multiply.


Both queries* yield something like:

{ 
     "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"), 
     "time_series" : [ 10, 20, 30, 40 ] 
}

Another option which I think is less efficient is perform a map/reduce operation on our collection using the map_reduce method.

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() {
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) {
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 }
...                 emit(this._id, derivatives);
...               }
...               """)
>>> reducer = Code("""
...                function(key, value) {}
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
...     print(res)  # or do something with the document.
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}

You can also retrieve all the document and use the numpy.diff to return the derivative like this:

import numpy as np


for document in collection.find({}, {'time_series': 1}):
    result = np.diff(document['time_series']) 

Now how about a little benchmarking:

Machine:

OS: Ubuntu 16.04
Memory: 15.6 GiB
Processor: Intel® Xeon(R) CPU E3-1231 v3 @ 3.40GHz × 8 

The three queries run in this order on my machine give respectively the following result:

Benchmark test result with 500 documents:

MongoDB 3.2

100 loops, best of 3: 2.32 ms per loop  

MongoDB 3.3.11

1000 loops, best of 3: 1.72 ms per loop

MapReduce

100 loops, best of 3: 15.7 ms per loop

Numpy using numpy.diff

100 loops, best of 3: 3.61 ms per loop

Conclusion

Using the aggregation is the best option here as expected even if the solution is not obvious.

The mapReduce solution is trivial but very inefficient because of the JavaScript evaluation.

* You can test the second query by installing a current development version of MongoDB (as the time of this writing).

Comments