Sitz Blogz Sitz Blogz - 3 months ago 283
Python Question

pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file

I have large csv files with size more than 10 mb each and about 50+ such files. These inputs have more than 25 columns and more than 50K rows.

All these have same headers and I am trying to merge them into one csv with headers to be mentioned only one time.

Option: One
Code: Working for small sized csv -- 25+ columns but size of the file in kbs.

import pandas as pd
import glob

interesting_files = glob.glob("*.csv")
df_list = []
for filename in sorted(interesting_files):
df_list.append(pd.read_csv(filename))

full_df = pd.concat(df_list)

full_df.to_csv('output.csv')


But the above code does not work for the larger files and gives the error.

Error:

Traceback (most recent call last):
File "merge_large.py", line 6, in <module>
all_files = glob.glob("*.csv", encoding='utf8', engine='python')
TypeError: glob() got an unexpected keyword argument 'encoding'
lakshmi@lakshmi-HP-15-Notebook-PC:~/Desktop/Twitter_Lat_lon/nasik_rain/rain_2$ python merge_large.py
Traceback (most recent call last):
File "merge_large.py", line 10, in <module>
df = pd.read_csv(file_,index_col=None, header=0)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 562, in parser_f
return _read(filepath_or_buffer, kwds)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 325, in _read
return parser.read()
File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 815, in read
ret = self._engine.read(nrows)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1314, in read
data = self._reader.read(nrows)
File "pandas/parser.pyx", line 805, in pandas.parser.TextReader.read (pandas/parser.c:8748)
File "pandas/parser.pyx", line 827, in pandas.parser.TextReader._read_low_memory (pandas/parser.c:9003)
File "pandas/parser.pyx", line 881, in pandas.parser.TextReader._read_rows (pandas/parser.c:9731)
File "pandas/parser.pyx", line 868, in pandas.parser.TextReader._tokenize_rows (pandas/parser.c:9602)
File "pandas/parser.pyx", line 1865, in pandas.parser.raise_parser_error (pandas/parser.c:23325)
pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file.


Code: Columns 25+ but size of the file more than 10mb

Option: Two
Option: Three

Option: Four

import pandas as pd
import glob

interesting_files = glob.glob("*.csv")
df_list = []
for filename in sorted(interesting_files):
df_list.append(pd.read_csv(filename))

full_df = pd.concat(df_list)

full_df.to_csv('output.csv')


Error:

Traceback (most recent call last):
File "merge_large.py", line 6, in <module>
allFiles = glob.glob("*.csv", sep=None)
TypeError: glob() got an unexpected keyword argument 'sep'


I have searched extensively but I am not able to find a solution to concatenate large csv files with same headers into one file.

Edit:

Code:

import dask.dataframe as dd

ddf = dd.read_csv('*.csv')

ddf.to_csv('master.csv',index=False)


Error:

Traceback (most recent call last):
File "merge_csv_dask.py", line 5, in <module>
ddf.to_csv('master.csv',index=False)
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/core.py", line 792, in to_csv
return to_csv(self, filename, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io.py", line 762, in to_csv
compute(*values)
File "/usr/local/lib/python2.7/dist-packages/dask/base.py", line 179, in compute
results = get(dsk, keys, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/threaded.py", line 58, in get
**kwargs)
File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.ValueError: could not convert string to float: {u'type': u'Point', u'coordinates': [4.34279, 50.8443]}

Traceback
---------
File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 263, in execute_task
result = _execute_task(task, data)
File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 245, in _execute_task
return func(*args2)
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 49, in bytes_read_csv
coerce_dtypes(df, dtypes)
File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 73, in coerce_dtypes
df[c] = df[c].astype(dtypes[c])
File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2950, in astype
raise_on_error=raise_on_error, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2938, in astype
return self.apply('astype', dtype=dtype, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2890, in apply
applied = getattr(b, f)(**kwargs)
File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 434, in astype
values=values, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 477, in _astype
values = com._astype_nansafe(values.ravel(), dtype, copy=True)
File "/usr/local/lib/python2.7/dist-packages/pandas/core/common.py", line 1920, in _astype_nansafe
return arr.astype(dtype


)

Answer

If I understand your problem, you have large csv files with the same structure that you want to merge into one big CSV file.

My suggestion is to use dask from Continuum Analytics to handle this job. You can merge your files but also perform out-of-core computations and analysis of the data just like pandas.

### make sure you include the [complete] tag
pip install dask[complete]

Edit; updated solution

Here's the code to read in ALL your csvs. I did a test with your sample data (made two copies of the data with different names). It read in fine

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
import pandas as pd
import glob

filenames = glob.glob('/Users/linwood/Downloads/rio2016*.csv')

dfs = [delayed(pd.read_csv)(fn) for fn in filenames]
df = dd.from_delayed(dfs)

The next step is doing some pandas-like analysis. Here is some code of me first "cleaning" your data for the 'tweetFavoriteCt' column. All of the data is not an integer, so I replace strings with "0" and convert everything else to an integer. Once I get the integer conversion, I show a simple analytic where I filter the entire dataframe to only include the rows where the favoriteCt is greater than 3

# function to convert numbers to integer and replace string with 0
def conversion(value):
    try:
        return int(value)
    except:
        return int(0)

# apply the function to the column, create a new column of cleaned data
clean = df['tweetFavoriteCt'].apply(lambda x: (conversion(x)),meta=('stuff',str))
df['cleanedFavoriteCt'] = clean

# filter the merged DataFrame retrieve columsn with > 3 favorites
df[(df.cleanedFavoriteCt > 3)].compute()

Stop here

Previous solution; good to example of using dask to merge CSVs

What the code below does:

  • Function to generate synthetic pandas dataframes with 25 columns and 50,000 rows and write to disk as csv
  • Create serialized naming convention for csv files (e.g. my18.csv,my19.csv,etc.)
  • Load and combine all csvs into an out-of-memory dask.dataframe with one line of code
  • one line of code to write those csvs out to a joined csv file
  • optional code to show how dask can perform pandas like dataframe operations on dataframes that are too big for in-memory computations

The first step is making sure you have dask installed. There are install instructions for dask in the documentation page but this should work:

With dask installed it's easy to read in the files. First, here's my code to build dataframes with 50,000 rows and 25 columns.

def create_csvs(iters,rows,columns):
    import random
    import pandas as pd
    import numpy as np
    import string 
    import json

    # Intitalize counter to build specific number of csv files
    count = 0
    setseed=1
    np.random.seed(seed=setseed)
    random.seed(setseed)

    while count <= iters:

        # randomly pick a number as csv filename
        saver = 'my'+str(np.random.randint(100))+'.csv'

        # Make a dataframe; 50000, 25 columns,headers are alphabet a-y
        df = pd.DataFrame(
            np.random.randint(1,300,(rows,columns)),
            columns =list(string.ascii_lowercase)[:-1]
        )

        # Write synthetic csv to directory
        df.to_csv("./daskTest/"+saver,sep=',',index=False)
        fileKeys[saver]=saver.split('.')[0]
        count += 1
        setseed+=1

        # Return the dictionary with file key and filename for testing
        with open('csvKeys.json','w') as f:
            f.write(json.dumps(fileKeys))

    return fileKeys

Now we create whatever number of csv files. Here, I create 11 (zero index inclusive) csvs with 50,000 rows and 25 columns.

create_csvs(10,50000,25)

This gets us to the point where you are; multiple csv files in a directory with the same headers that we want to merge into one. dask makes this simple and efficient.

Some housekeeping first. My directory for storing all the csvs is /daskTest/. My csv files have serialized names such as my18.csv, my19.csv, my20.csv, etc. Name standardization and single directory location are key. This works if you put your csv files in one directory and serialize the names in some way.

Read Here for quick answer to question

In steps:

  1. Import dask, read all the csv files in using wildcard. This merges all csvs into one single dask.dataframe object.
import dask.dataframe as dd  
ddf = dd.read_csv('./daskTest/my*.csv')
  1. Write merged dataframe file to disk in the same directory as original files and name it master.csv
ddf.to_csv('./daskTest/master.csv',index=False)
  1. Optionally, read master.csv, a much bigger in size, into dask.dataframe object for computations. This can also be done after step one above; dask can perform pandas like operations on the staged files...this is a way to do "big data" in Python
# reads in the merged file as one BIG out-of-core dataframe; can perform functions like pangas    
newddf = dd.read_csv('./daskTest/master.csv')

#check the length; this is now length of all merged files. in this example, 50,000 rows times 11 = 550000 rows.
len(newddf)

# perform pandas-like summary stats on entire dataframe
newddf.describe().compute()

Hopefully this helps answer your question. In three steps, you read in all the files, merge to single dataframe, and write that massive dataframe to disk with only one header and all your rows.

Comments