Beril Aydemir Beril Aydemir - 9 months ago 76
JSON Question

Updating column in spark dataframe with json schema

I have json files, and I'm trying to hash one field of it with SHA 256. These files are on AWS S3. I am currently using spark with python on Apache Zeppelin.

Here is my json schema, I am trying to hash 'mac' field;

|-- Document: struct (nullable = true)
| |-- data: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- mac: string (nullable = true)

I've tried couple of things;

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib

hcData ="inferSchema","true").json(inputPath)

name = 'Document'
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType())
new_df =*[udf(column).alias(name) if column == name else column for column in hcData.columns])

This code works fine. But when I try to hash mac field and change name variable nothing happens;

name = '[0].mac'
name = 'mac'

I guess it is because, it couldn't find column with given name.

I've tried to change the code a bit;

def valueToCategory(value):
return hashlib.sha256(str(value).encode('utf-8')).hexdigest()

udfValueToCategory = udf(valueToCategory, StringType())
df = hcData.withColumn("[0].mac",udfValueToCategory(""))

This code hashes "" and creates new column with hashed mac addresses. I want to update existing column. For those variables not nested it can update, there is no problem, but for nested variables I couldn't find a way to update.

So basically, I want to hash a field in nested json file with spark python. Can anyone knows how to update spark dataframe with schema?

Answer Source

Well, I've found a solution for my question with scala. There can be redundant codes but it worked anyway.

import scala.util.matching.Regex

val inputPath = ""
val outputPath = ""

//finds mac addresses with given regex
def find(s: String, r: Regex): List[String] = {
    val l = r.findAllIn(s).toList
        return l
    } else {
        val lis: List[String] = List("null")
        return lis

//hashes given string with sha256
def hash(s: String): String = {
    return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map { "%02x".format(_) }.foldLeft(""){_ + _}

//hashes given line
def hashAll(s: String, r:Regex): String = {
    var st = s
    val macs = find(s, r)
    for (mac <- macs){
        st = st.replaceAll(mac, hash(mac))
    return st

//read data
val rdd = sc.textFile(inputPath)

//mac address regular expression
val regex = "(([0-9A-Z]{1,2}[:-]){5}([0-9A-Z]{1,2})|([A-Z]+[-][0-9A-Z]+[-][0-9A-Z]{12}))".r

//hash data
val hashed_rdd = => hashAll(line, regex))

//write hashed data