Linwoodc3 Linwoodc3 - 1 year ago 112
Python Question

How do you transpose a dask dataframe (convert columns to rows) to approach tidy data principles

TLDR: I created a dask dataframe from a dask bag. The dask dataframe treats every observation (event) as a column. So, instead of having rows of data for each event, I have a column for each event. The goal is to transpose the columns to rows in the same way that pandas can transpose a dataframe using df.T.

I have sample twitter data from my timeline here. To get to my starting point, here is the code to read a json from disk into a

and then convert that into a

import dask.bag as db
import dask.dataframe as dd
import json

b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()

The Problem All my individual events (i.e. tweets) are recorded as columns vice rows. In keeping with
principles, I would like to have rows for each event.
has a transpose method for dataframes
and dask.array has a transpose method for arrays. My goal is to do the same transpose operation, but on a dask dataframe. How would I do that?

  1. Convert rows to columns

Edit for solution

This code cleans Twitter json files by defining the columns you want to keep and dropping the rest. Then, we write a MUCH smaller, cleaned file to disk.

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/worldTweet*.json')

# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)

# see all the fields of the dataframe
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
if f in fields:

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk

Answer Source

I think you can get the result you want by bypassing bag altogether, with code like

import glob

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

filenames = glob.glob('sampleTwitter*.json')
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
ddf = dd.from_delayed(dfs)