Recently cloud dataflow python sdk was made available and I decided to use it. Unfortunately the support to read from cloud datastore is yet to come so I have to fall back on writing custom source so that I can utilize the benefits of dynamic splitting, progress estimation etc as promised. I did study the documentation thoroughly but am unable to put the pieces together so that I can speed up my entire process.
To be more clear my first approach was:
In the code you provided, the access to Datastore happens before the pipeline is even constructed:
query = client.query(kind='User').fetch()
This executes the whole query and reads all entities before the Beam SDK gets involved at all.
fetch() returns a lazy iterable over the query results, and they get iterated over when you construct the pipeline, at
beam.Create(query) - but, once again, this happens in your main program, before the pipeline starts. Most likely, this is what's taking 13 minutes, rather than the pipeline itself (but please feel free to provide a job ID so we can take a deeper look). You can verify this by making a small change to your code:
query = list(client.query(kind='User').fetch())
However, I think your intention was to both read and process the entities in parallel.
For Cloud Datastore in particular, the custom source API is not the best choice to do that. The reason is that the underlying Cloud Datastore API itself does not currently provide the properties necessary to implement the custom source "goodies" such as progress estimation and dynamic splitting, because its querying API is very generic (unlike, say, Cloud Bigtable, which always returns results ordered by key, so e.g. you can estimate progress by looking at the current key).
We are currently rewriting the Java Cloud Datastore connector to use a different approach, which uses a
ParDo to split the query and a
ParDo to read each of the sub-queries. Please see this pull request for details.