I recently started with spark. I am practicing on spark shell.
I've a dataset "movies.dat" and is in the following format:
MovieID,Title,Genres
Sample Record :-
2,Jumanji (1995),Adventure|Children|Fantasy
scala> val movies_data = sc.textFile("file:///home/cloudera/cs/movies.dat")
scala> val tags=movies_data.map(line=>line.split(","))
scala> tags.take(5)
res3: Array[Array[String]] = Array(Array(1, Toy Story (1995), Adventure|Animation|Children|Comedy|Fantasy), Array(2, Jumanji (1995), Adventure|Children|Fantasy), Array(3, Grumpier Old Men (1995), Comedy|Romance), Array(4, Waiting to Exhale (1995), Comedy|Drama|Romance), Array(5, Father of the Bride Part II (1995), Comedy))
scala> val horrorMovies = tags.filter(genre=>genre.contains("Horror"))
scala> horrorMovies.take(5)
res4: Array[Array[String]] = Array(Array(177, Lord of Illusions (1995), Horror), Array(220, Castle Freak (1995), Horror), Array(841, Eyes Without a Face (Les Yeux sans visage) (1959), Horror), Array(1105, Children of the Corn IV: The Gathering (1996), Horror), Array(1322, Amityville 1992: It's About Time (1992), Horror))
You can write logics to extract year from the second element of the splitted line (Array) and compare with the range you have as below
scala> val movies_data = sc.textFile("file:///home/cloudera/cs/movies.dat")
movies_data: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/cs/movies.dat MapPartitionsRDD[5] at textFile at <console>:25
scala> val tags=movies_data.map(line=>line.split(","))
tags: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:27
scala> val horrorMovies = tags.filter(genre => {
| val date = genre(1).substring(genre(1).lastIndexOf("(")+1, genre(1).lastIndexOf(")")).toInt
| date >= 1985 && date <= 1995 && genre(2).contains("Horror")
| })
horrorMovies: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:29
scala> horrorMovies.take(3)
res1: Array[Array[String]] = Array(Array(177, " Lord of Illusions (1995)", " Horror"), Array(220, " Castle Freak (1995)", " Horror"), Array(1322, " Amityville 1992: It's About Time (1992)", " Horror"))
I hope the answer is helpful
Edited
You can do the above logic with regex
too
scala> val horrorMovies = tags.filter(genre => {
| val str = """(\d+)""".r findAllIn genre(1) mkString
| val date = if(str.length == 4) str.toInt else 0
| date >= 1985 && date <= 1995 && genre(2).contains("Horror")
| })
warning: there was one feature warning; re-run with -feature for details
horrorMovies: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:33
rest of the codes are same as above.
I hope the answer is helpful