How to Read ORC Files from S3 or an InputStream in Java
There doesn’t seem to be a nice way to read ORC files directly from S3 in a Java class not running in a Spark context.
The problem with OrcFile.createReader
For instance, given an S3Object
from the AWS SDK for Java that contains an ORC file, how can we evaluate the contents of the file in a Java class?
The Apache-provided OrcFile
class with createReader()
is designed to read from the Hadoop filesystem, not directly from S3.
Let’s see how we can use a mock filesystem to imitate HDFS and read an ORC file from an S3Object
.
Converting an ORC file to JSON using a mock filesystem
We’re going to use a mock filesystem found in Apache’s ORC test implementations.
Suppose we want to write a function that takes in an S3Object
and returns a JSON string of the data in the ORC file.
Let’s take a look at the steps and function to get an idea of how we plan to achieve this, then we’ll go through the four extra files needed to make it happen.
- Obtain byte array of the ORC file contents in S3
- Add this byte array to our mock filesystem
- Read the byte array from the mock filesystem
- Convert contents to a JSON string
- Profit
import com.amazonaws.services.s3.model.S3Object;
import java.io.InputStream;
import java.io.StringWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.impl.MockFileSystem;
import org.apache.orc.tools.convert.OrcToJson;
String orcToJsonString(S3Object object) throws Exception {
// Step 1
InputStream stream = object.getObjectContent();
byte[] streamBytes = stream.readAllBytes();
// Step 2
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf, streamBytes);
// Step 3
Reader reader = OrcFile.createReader(
new Path("/foobar"),
OrcFile.readerOptions(conf).filesystem(fs)
);
// Step 4
StringWriter contents = new StringWriter();
OrcToJson.printJsonData(contents, reader);
// Step 5
return contents.toString();
}
The following four files can be used to read an ORC file from S3.
MockFileSystem
Our mock filesystem will extend Hadoop’s FileSystem
class, which can be implemented as a local filesystem for testing purposes.
We’ll copy this MockFileSystem
class from Apache’s ORC repository.
However, we’ll modify the constructor to accept a byte array streamBytes
, which will represent the contents of the S3Object
(our ORC file).
Let’s follow Apache and place this
MockFileSystem.java
class in a packageorg.apache.orc.impl
.
package org.apache.orc.impl;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
public class MockFileSystem extends FileSystem {
final List<MockInputStream> streams = new ArrayList<>();
byte[] streamBytes;
public MockFileSystem(Configuration conf, byte[] streamBytes) {
setConf(conf);
this.streamBytes = streamBytes;
}
@Override
public URI getUri() {
try {
return new URI("mock:///");
} catch (URISyntaxException e) {
throw new IllegalArgumentException("bad uri", e);
}
}
@Override
public FSDataInputStream open(Path path, int i) throws IOException {
MockInputStream result = new MockInputStream(this, streamBytes);
streams.add(result);
return result;
}
@Override
public FileStatus getFileStatus(Path path) {
return new FileStatus(streamBytes.length, false, 1, 4096, 0, path);
}
void removeStream(MockInputStream stream) {
streams.remove(stream);
}
int streamCount() {
return streams.size();
}
@Override
public FSDataOutputStream create(
Path path,
FsPermission fsPermission,
boolean b,
int i,
short i1,
long l,
Progressable progressable
) throws IOException {
throw new IOException("Can't create");
}
@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable)
throws IOException {
throw new IOException("Can't append");
}
@Override
public boolean rename(Path path, Path path1) {
return false;
}
@Override
public boolean delete(Path path, boolean b) {
return false;
}
@Override
public FileStatus[] listStatus(Path path) {
return new FileStatus[0];
}
@Override
public void setWorkingDirectory(Path path) {}
@Override
public Path getWorkingDirectory() {
return new Path("/");
}
@Override
public boolean mkdirs(Path path, FsPermission fsPermission) {
return false;
}
}
MockInputStream
You might have noticed that MockFileSystem
references a MockInputStream
object. This is also defined in the ORC tests.
In the ORC tests, the byte array of the ORC file is explicitly defined in this section. Since we’re passing the byte array in the MockFileSystem constructor, we can remove that definition and simply pass that array into MockInputStream
.
Let’s follow Apache and place this
MockInputStream.java
class in the same package as above:org.apache.orc.impl
.
package org.apache.orc.impl;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
public class MockInputStream extends FSDataInputStream {
MockFileSystem fs;
public MockInputStream(MockFileSystem fs, byte[] streamBytes) throws IOException {
super(new SeekableByteArrayInputStream(streamBytes));
this.fs = fs;
}
public void close() {
fs.removeStream(this);
}
}
SeekableByteArrayInputStream
Once again, you might have noticed that MockInputStream
references a SeekableByteArrayInputStream
object. This is also defined in the ORC tests.
Once again, let’s follow Apache and place this
SeekableByteArrayInputStream.java
class in the same packageorg.apache.orc.impl
.
package org.apache.orc.impl;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
public class SeekableByteArrayInputStream
extends ByteArrayInputStream
implements Seekable, PositionedReadable {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);
}
@Override
public void seek(long pos) throws IOException {
this.reset();
this.skip(pos);
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
long oldPos = getPos();
int nread = -1;
try {
seek(position);
nread = read(buffer, offset, length);
} finally {
seek(oldPos);
}
return nread;
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
int nread = 0;
while (nread < length) {
int nbytes = read(position + nread, buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
}
nread += nbytes;
}
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
readFully(position, buffer, 0, buffer.length);
}
}
OrcToJson
Lastly, we need to convert from ORC to JSON, which we can (once again) grab from Apache’s ORC utility classes to print data.
Let’s follow Apache and place this
OrcToJson.java
class in a packageorg.apache.orc.tools.convert
.
package org.apache.orc.tools.convert;
import java.io.IOException;
import java.io.Writer;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;
public class OrcToJson {
public static void printJsonData(Writer out, Reader reader)
throws IOException, JSONException {
RecordReader rows = reader.rows();
try {
TypeDescription schema = reader.getSchema();
VectorizedRowBatch batch = schema.createRowBatch();
while (rows.nextBatch(batch)) {
for (int r = 0; r < batch.size; ++r) {
JSONWriter writer = new JSONWriter(out);
printRow(writer, batch, schema, r);
out.write("\n");
out.flush();
}
}
} finally {
rows.close();
}
}
static void printRow(
JSONWriter writer,
VectorizedRowBatch batch,
TypeDescription schema,
int row
) throws JSONException {
if (schema.getCategory() == TypeDescription.Category.STRUCT) {
List<TypeDescription> fieldTypes = schema.getChildren();
List<String> fieldNames = schema.getFieldNames();
writer.object();
for (int c = 0; c < batch.cols.length; ++c) {
writer.key(fieldNames.get(c));
printValue(writer, batch.cols[c], fieldTypes.get(c), row);
}
writer.endObject();
} else {
printValue(writer, batch.cols[0], schema, row);
}
}
static void printMap(
JSONWriter writer,
MapColumnVector vector,
TypeDescription schema,
int row
) throws JSONException {
writer.array();
TypeDescription keyType = schema.getChildren().get(0);
TypeDescription valueType = schema.getChildren().get(1);
int offset = (int) vector.offsets[row];
for (int i = 0; i < vector.lengths[row]; ++i) {
writer.object();
writer.key("_key");
printValue(writer, vector.keys, keyType, offset + i);
writer.key("_value");
printValue(writer, vector.values, valueType, offset + i);
writer.endObject();
}
writer.endArray();
}
static void printList(
JSONWriter writer,
ListColumnVector vector,
TypeDescription schema,
int row
) throws JSONException {
writer.array();
int offset = (int) vector.offsets[row];
TypeDescription childType = schema.getChildren().get(0);
for (int i = 0; i < vector.lengths[row]; ++i) {
printValue(writer, vector.child, childType, offset + i);
}
writer.endArray();
}
private static void printUnion(
JSONWriter writer,
UnionColumnVector vector,
TypeDescription schema,
int row
) throws JSONException {
int tag = vector.tags[row];
printValue(writer, vector.fields[tag], schema.getChildren().get(tag), row);
}
static void printStruct(
JSONWriter writer,
StructColumnVector batch,
TypeDescription schema,
int row
) throws JSONException {
writer.object();
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> fieldTypes = schema.getChildren();
for (int i = 0; i < fieldTypes.size(); ++i) {
writer.key(fieldNames.get(i));
printValue(writer, batch.fields[i], fieldTypes.get(i), row);
}
writer.endObject();
}
static void printBinary(JSONWriter writer, BytesColumnVector vector, int row)
throws JSONException {
writer.array();
int offset = vector.start[row];
for (int i = 0; i < vector.length[row]; ++i) {
writer.value(0xff & (int) vector.vector[row][offset + i]);
}
writer.endArray();
}
static void printValue(
JSONWriter writer,
ColumnVector vector,
TypeDescription schema,
int row
) throws JSONException {
if (vector.isRepeating) {
row = 0;
}
if (vector.noNulls || !vector.isNull[row]) {
switch (schema.getCategory()) {
case BOOLEAN:
writer.value(((LongColumnVector) vector).vector[row] != 0);
break;
case BYTE:
case SHORT:
case INT:
case LONG:
writer.value(((LongColumnVector) vector).vector[row]);
break;
case FLOAT:
case DOUBLE:
writer.value(((DoubleColumnVector) vector).vector[row]);
break;
case STRING:
case CHAR:
case VARCHAR:
writer.value(((BytesColumnVector) vector).toString(row));
break;
case BINARY:
printBinary(writer, (BytesColumnVector) vector, row);
break;
case DECIMAL:
writer.value(((DecimalColumnVector) vector).vector[row].toString());
break;
case DATE:
writer.value(
new DateWritable((int) ((LongColumnVector) vector).vector[row]).toString()
);
break;
case TIMESTAMP:
case TIMESTAMP_INSTANT:
writer.value(
((TimestampColumnVector) vector).asScratchTimestamp(row).toString()
);
break;
case LIST:
printList(writer, (ListColumnVector) vector, schema, row);
break;
case MAP:
printMap(writer, (MapColumnVector) vector, schema, row);
break;
case STRUCT:
printStruct(writer, (StructColumnVector) vector, schema, row);
break;
case UNION:
printUnion(writer, (UnionColumnVector) vector, schema, row);
break;
default:
throw new IllegalArgumentException("Unknown type " + schema.toString());
}
} else {
writer.value(null);
}
}
}