user3290807 user3290807 - 21 days ago 5
Scala Question

Filtering dataframe based on dynamic value in Spark/Scala

I have a json in the following format :

{"Request": {"TrancheList": {"Tranche": [{"TrancheId": "500192163","OwnedAmt": "26500000", "Curr": "USD" }, { "TrancheId": "500213369", "OwnedAmt": "41000000","Curr": "USD"}]},"FxRatesList": {"FxRatesContract": [{"Currency": "CHF","FxRate": "0.97919983706115"},{"Currency": "AUD", "FxRate": "1.2966804979253"},{ "Currency": "USD","FxRate": "1"},{"Currency": "SEK","FxRate": "8.1561012531034"},{"Currency": "NOK", "FxRate": "8.2454981641398"},{"Currency": "JPY","FxRate": "111.79999785344"},{"Currency": "HKD","FxRate": "7.7568025218916"},{"Currency": "GBP","FxRate": "0.69425159677867"}, {"Currency": "EUR","FxRate": "0.88991723769689"},{"Currency": "DKK", "FxRate": "6.629598372301"}]},"isExcludeDeals": "true","baseCurrency": "USD"}}


I am trying to get the Fxrate value for a Currency which is equal to baseCurrency Tag

I am reading the json from hdfs cluster

val hdfsRequest = spark.read.json("localhost/user/request.json")
val baseCurrency = hdfsRequest.select("Request.baseCurrency")
var fxRates = hdfsRequest.select("Request.FxRatesList.FxRatesContract")
val fxRatesDF = fxRates.select(explode(fxRates("FxRatesContract"))).toDF("FxRatesContract").select("FxRatesContract.Currency", "FxRatesContract.FxRate").filter($"Currency=baseCurrency")


The error that i got running this line of code is :

org.apache.spark.sql.AnalysisException: cannot resolve '`Currency=baseCurrency`' given input columns: [Currency, FxRate];


How do i specify varaible baseCurrency in the filter expression of dataframe in Scala/Spark?

Thanks

Answer

If base currency is just a single value then What you can do is :

val hdfsRequest = spark.read.json("localhost/user/request.json")
val baseCurrency = hdfsRequest.select("Request.baseCurrency")
  .map(_.getString(0)).collect.headOption
var fxRates = hdfsRequest.select("Request.FxRatesList.FxRatesContract")
val fxRatesDF = fxRates.select(explode(fxRates("FxRatesContract")))
  .toDF("FxRatesContract")
  .select("FxRatesContract.Currency", "FxRatesContract.FxRate")
  .filter($"Currency"===baseCurrency.fold(-1D)(identity))