Kajal Gupta Kajal Gupta - 2 months ago 10x
Python Question

Creating custom source for reading from cloud datastore using latest python apache_beam cloud datafow sdk

Recently cloud dataflow python sdk was made available and I decided to use it. Unfortunately the support to read from cloud datastore is yet to come so I have to fall back on writing custom source so that I can utilize the benefits of dynamic splitting, progress estimation etc as promised. I did study the documentation thoroughly but am unable to put the pieces together so that I can speed up my entire process.

To be more clear my first approach was:

  1. querying the cloud datastore

  2. creating ParDo function and passing the returned query to it.

But with this it took 13 minutes to iterate over 200k entries.

So I decided to write custom source that would read the entities efficiently. But am unable to achieve that due to my lack of understanding of putting the pieces together. Can any one please help me with how to create custom source for reading from datastore.

For first approach the link to my gist is:

Thank you.


In the code you provided, the access to Datastore happens before the pipeline is even constructed:

query = client.query(kind='User').fetch()

This executes the whole query and reads all entities before the Beam SDK gets involved at all.

More precisely, fetch() returns a lazy iterable over the query results, and they get iterated over when you construct the pipeline, at beam.Create(query) - but, once again, this happens in your main program, before the pipeline starts. Most likely, this is what's taking 13 minutes, rather than the pipeline itself (but please feel free to provide a job ID so we can take a deeper look). You can verify this by making a small change to your code:

query = list(client.query(kind='User').fetch())

However, I think your intention was to both read and process the entities in parallel.

For Cloud Datastore in particular, the custom source API is not the best choice to do that. The reason is that the underlying Cloud Datastore API itself does not currently provide the properties necessary to implement the custom source "goodies" such as progress estimation and dynamic splitting, because its querying API is very generic (unlike, say, Cloud Bigtable, which always returns results ordered by key, so e.g. you can estimate progress by looking at the current key).

We are currently rewriting the Java Cloud Datastore connector to use a different approach, which uses a ParDo to split the query and a ParDo to read each of the sub-queries. Please see this pull request for details.