Vijay_Shinde Vijay_Shinde - 3 months ago 17
Java Question

Getting exception: IOException input buffer is closed exception while extracting a tar file

I have some tar files on HDFS. My aim is to extract those files & stored extracted files on HDFS.

for Example:

This is my input directory structure(HDFS).

Path : /data/160823 -->
--------
| 160823 |
--------
|
| --- 00
|----- xyz.tar
|----- xyz2.tar

| --- 01
|----- xyz3.tar
|----- abc2.tar

| --- 02
|----- abc3.tar
|----- abc4.tar

.
.
.
--- 23
|----- pqr.tar
|----- pqr2.tar


Expected output will be:

--------
| Output |
--------
|
|----- xyz.gz
|----- xyz2.gz


My code extracting those tar files and store those files to a path on HDFS.

So I'm able to extract first .tar file & able to store output on HDFS as well, but after that while reading next .tar file, I'm getting this exception.

java.io.IOException: input buffer is closed
at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
at com.lsr.TarMapper.call(TarMapper.java:53)
at com.lsr.TarMapper.call(TarMapper.java:1)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


This my code snippet,

import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;

public class TarMapper implements FlatMapFunction<String, String>{

public Iterable<String> call(String arg0) throws Exception {
System.out.println("Arg0 : "+arg0);
List<String> untarFile = new ArrayList<String>();
FileSystem fileSystem = LTar.fs;
FSDataInputStream fsin = null;
TarArchiveInputStream tarin = null;
OutputStream outstr = null;
TarArchiveEntry tarentry = null;
FSDataOutputStream fsDataOutputStream = null;
Path outputPath = null;
try{
fileSystem = FileSystem.get(LTar.conf);
fsin = fileSystem.open(new Path(arg0));
tarin = new TarArchiveInputStream(fsin);
tarentry = tarin.getNextTarEntry();
while (tarentry != null) {
if (!tarentry.isDirectory()) {
System.out.println("TAR ENTRY : "+tarentry);
outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
fsDataOutputStream = fileSystem.create(outputPath);
System.out.println("Name : "+tarentry.getName()+"Other : ");
IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
}
tarentry = tarin.getNextTarEntry();
}
}catch (Exception e) {
e.printStackTrace();
} finally {
if (tarin != null) {
tarin.close();
}
if (fsin != null) {
fsin.close();
}
if (fileSystem != null) {
fileSystem.close();
}
if(outstr !=null){
outstr.close();
}
if(fsDataOutputStream != null){
fsDataOutputStream.close();
}
}
return untarFile;
}
}


Please provide your suggestion on this issue.

EJP EJP
Answer

The overload of copyBytes() you are calling closes the input stream at the end of the copy.

Use another one.

Comments