saeed talaee saeed talaee -4 years ago 44
Scala Question

My feature column becomes null in the dataframe

I am new in spark and I need to do some machine learning on my data and predict the "count" value. Here is my raw data:

05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 10136
05:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 0


I made a dataframe with columns of time_stamp_0, sender_ip_1 and receiver_ip_2 using the following code:

val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))

///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/trace1.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""

val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema).toDF("time_stamp_0", "sender_ip_1", "receiver_ip_2")
val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy


///I add count to the dataframe
val Dataframe = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2)
Dataframe.show()


Here is the output:

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.2| 10.0.0.3| 19|
| 05:49:56| 10.0.0.3| 10.0.0.2| 10|
+------------+-----------+-------------+-----+


I would like to predict the number of connections between two IPs. I added count to the dataframe. I also try to make label and feature to start predictions. I also need to spilt the data for training and testing part. I used the following code:

val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
val e4 = d match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
Vectors.dense(a, b, e3, e4)
}

//val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )

val final_df = Dataframe.withColumn(
"features",
toVec4(
Dataframe("time_stamp_0"),
Dataframe("count"),
Dataframe("sender_ip_1"),
Dataframe("receiver_ip_2")
)
).withColumn("label", (Dataframe("count"))).select("features", "label")

final_df.show()

val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
//TrainingDF.show()
//TestingDF.show()


However the problem is the feature becomes null!

+--------+-----+
|features|label|
+--------+-----+
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 19|
| null| 10|
+--------+-----+


Can anybody help me to solve the problem. Thanks in advance.

Answer Source

The problem here is that your UDF expects the four input columns to be of types Int, Int, String, String, and you're passing a String as the first column (time_stamp_0).

You can fix that by adjusting the UDF or by casting the field into an Int:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val final_df = df.withColumn(
  "features",
  toVec4(
    // casting into Timestamp to parse the string, and then into Int
    $"time_stamp_0".cast(TimestampType).cast(IntegerType),
    $"count",
    $"sender_ip_1",
    $"receiver_ip_2"
  )
)

I must say I would expect a proper excpetion and not null result, but apparently that's the current behavior.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download