JackR JackR - 12 days ago 7
Python Question

Spark request max count

I'm a beginner on spark and I try to make a request allow me to retrieve the most visited web pages.

My request is the following

mostPopularWebPageDF = logDF.groupBy("webPage").agg(functions.count("webPage").alias("cntWebPage")).agg(functions.max("cntWebPage")).show()


With this request I retrieve only a dataframe with the max count but I want to retrieve a dataframe with this score and the web page that holds this score

Something like that:

webPage max(cntWebPage)
google.com 2


How can I fix my problem?

Thanks a lot.

Answer

In pyspark + sql:

logDF.registerTempTable("logDF")

mostPopularWebPageDF = sqlContext.sql("""select webPage, cntWebPage from (
                                            select webPage, count(*) as cntWebPage, max(count(*)) over () as maxcnt 
                                            from logDF 
                                            group by webPage) as tmp
                                            where tmp.cntWebPage = tmp.maxcnt""")

Maybe I can make it cleaner, but it works. I will try to optimize it.

My result:

webPage      cntWebPage
google.com   2

for dataset:

webPage    usersid
google.com 1
google.com 3
bing.com   10

Explanation: normal counting is done via grouping + count(*) function. Max of all these counts are calculated via window function, so for dataset above, immediate DataFrame /without dropping maxCount column/ is:

webPage    count  maxCount
google.com 2      2
bing.com   1      2

Then we select rows with count equal to maxCount

EDIT: I have deleted DSL version - it does not support window over () and ordering is changing result. Sorry for this bug. SQL version is correct