subho subho - 1 month ago 19
Scala Question

Twitter data from spark

I am learning Twitter integretion with Spark streaming.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf

/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
*
* Run this on your local machine as
*
*/
object TwitterPopularTags {
def main(args: Array[String]) {


if (args.length < 4) {
System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)

// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream

val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))


// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})

ssc.start()
ssc.awaitTermination()
}
}


I am not able to understand fully the 2 code lines below:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)


Can someone please explain me these 2 lines?

Thanks and Regards,

Answer
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)

args is an Array; take(4) returns a sub-Array with the first (left-most) four elements. Assigning these four elements into Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) means that the val consumerKey will hold the value of the first element; consumerSecret will hold the value of the second, and so on. This is a neat Scala trick of "unpacking" an Array (can be done with other collections too) into named values.

val filters = args.takeRight(args.length - 4)

takeRight(n) returns a sub-Array from the right, meaning the last n elements in the array. Here, an Array with everything but the first four elements is assigned into a new value named filters.