Vijay_Shinde Vijay_Shinde - 4 months ago 24
Java Question

Getting exception: IOException input buffer is closed exception while extracting a tar file

I have some tar files on HDFS. My aim is to extract those files & stored extracted files on HDFS.

for Example:

This is my input directory structure(HDFS).

Path : /data/160823 -->
--------
| 160823 |
--------
|
| --- 00
|----- xyz.tar
|----- xyz2.tar

| --- 01
|----- xyz3.tar
|----- abc2.tar

| --- 02
|----- abc3.tar
|----- abc4.tar

.
.
.
--- 23
|----- pqr.tar
|----- pqr2.tar


Expected output will be:

--------
| Output |
--------
|
|----- xyz.gz
|----- xyz2.gz


My code extracting those tar files and store those files to a path on HDFS.

So I'm able to extract first .tar file & able to store output on HDFS as well, but after that while reading next .tar file, I'm getting this exception.

java.io.IOException: input buffer is closed
at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
at com.lsr.TarMapper.call(TarMapper.java:53)
at com.lsr.TarMapper.call(TarMapper.java:1)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


This my code snippet,

import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;

public class TarMapper implements FlatMapFunction<String, String>{

public Iterable<String> call(String arg0) throws Exception {
System.out.println("Arg0 : "+arg0);
List<String> untarFile = new ArrayList<String>();
FileSystem fileSystem = LTar.fs;
FSDataInputStream fsin = null;
TarArchiveInputStream tarin = null;
OutputStream outstr = null;
TarArchiveEntry tarentry = null;
FSDataOutputStream fsDataOutputStream = null;
Path outputPath = null;
try{
fileSystem = FileSystem.get(LTar.conf);
fsin = fileSystem.open(new Path(arg0));
tarin = new TarArchiveInputStream(fsin);
tarentry = tarin.getNextTarEntry();
while (tarentry != null) {
if (!tarentry.isDirectory()) {
System.out.println("TAR ENTRY : "+tarentry);
outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
fsDataOutputStream = fileSystem.create(outputPath);
System.out.println("Name : "+tarentry.getName()+"Other : ");
IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
}
tarentry = tarin.getNextTarEntry();
}
}catch (Exception e) {
e.printStackTrace();
} finally {
if (tarin != null) {
tarin.close();
}
if (fsin != null) {
fsin.close();
}
if (fileSystem != null) {
fileSystem.close();
}
if(outstr !=null){
outstr.close();
}
if(fsDataOutputStream != null){
fsDataOutputStream.close();
}
}
return untarFile;
}
}


Please provide your suggestion on this issue.

EJP EJP
Answer

The overload of copyBytes() you are calling closes the input stream at the end of the copy.

Use another one.