eklavya eklavya - 10 months ago 29
Scala Question

Spark shell - How can I retrieve rows from my dataset based on a time period or in between 2 given dates or two years

I recently started with spark. I am practicing on spark shell.

I've a dataset "movies.dat" and is in the following format:

MovieID,Title,Genres

Sample Record :-

2,Jumanji (1995),Adventure|Children|Fantasy


I want to generate the list of “Horror” movies released in between 1985 to 1995.

Here is my approach.

scala> val movies_data = sc.textFile("file:///home/cloudera/cs/movies.dat")

scala> val tags=movies_data.map(line=>line.split(","))

scala> tags.take(5)
res3: Array[Array[String]] = Array(Array(1, Toy Story (1995), Adventure|Animation|Children|Comedy|Fantasy), Array(2, Jumanji (1995), Adventure|Children|Fantasy), Array(3, Grumpier Old Men (1995), Comedy|Romance), Array(4, Waiting to Exhale (1995), Comedy|Drama|Romance), Array(5, Father of the Bride Part II (1995), Comedy))

scala> val horrorMovies = tags.filter(genre=>genre.contains("Horror"))

scala> horrorMovies.take(5)
res4: Array[Array[String]] = Array(Array(177, Lord of Illusions (1995), Horror), Array(220, Castle Freak (1995), Horror), Array(841, Eyes Without a Face (Les Yeux sans visage) (1959), Horror), Array(1105, Children of the Corn IV: The Gathering (1996), Horror), Array(1322, Amityville 1992: It's About Time (1992), Horror))


I want to retrieve the data using Spark Shell only. I am able to retrieve all the movies of the "Horror" genres.
Now, is there any way to filter out of those movies and get only the ones that have the release year in between 1985 and 1995?

Answer Source

You can write logics to extract year from the second element of the splitted line (Array) and compare with the range you have as below

scala> val movies_data = sc.textFile("file:///home/cloudera/cs/movies.dat")
movies_data: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/cs/movies.dat MapPartitionsRDD[5] at textFile at <console>:25

scala> val tags=movies_data.map(line=>line.split(","))
tags: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:27

scala> val horrorMovies = tags.filter(genre => {
     | val date = genre(1).substring(genre(1).lastIndexOf("(")+1, genre(1).lastIndexOf(")")).toInt
     | date >= 1985 && date <= 1995 && genre(2).contains("Horror")
     | })
horrorMovies: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:29

scala> horrorMovies.take(3)
res1: Array[Array[String]] = Array(Array(177, " Lord of Illusions (1995)", " Horror"), Array(220, " Castle Freak (1995)", " Horror"), Array(1322, " Amityville 1992: It's About Time (1992)", " Horror"))

I hope the answer is helpful

Edited

You can do the above logic with regex too

scala> val horrorMovies = tags.filter(genre => {
     | val str = """(\d+)""".r findAllIn genre(1) mkString
     | val date = if(str.length == 4) str.toInt else 0
     | date >= 1985 && date <= 1995 && genre(2).contains("Horror")
     | })
warning: there was one feature warning; re-run with -feature for details
horrorMovies: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:33

rest of the codes are same as above.

I hope the answer is helpful

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download