How to Read ORC Files from S3 or an InputStream in Java

Published Dec 20, 2021  ∙  Updated Jan 3, 2022

There doesn’t seem to be a nice way to read ORC files directly from S3 in a Java class not running in a Spark context.

The problem with OrcFile.createReader

For instance, given an S3Object from the AWS SDK for Java that contains an ORC file, how can we evaluate the contents of the file in a Java class?

The Apache-provided OrcFile class with createReader() is designed to read from the Hadoop filesystem, not directly from S3.

Let’s see how we can use a mock filesystem to imitate HDFS and read an ORC file from an S3Object.

Converting an ORC file to JSON using a mock filesystem

We’re going to use a mock filesystem found in Apache’s ORC test implementations here.

Suppose we want to write a function that takes in an S3Object and returns a JSON string of the data in the ORC file.

Let’s take a look at the steps and function to get an idea of how we plan to achieve this, then we’ll go through the four extra files needed to make it happen.

  1. Obtain byte array of the ORC file contents in S3
  2. Add this byte array to our mock filesystem
  3. Read the byte array from the mock filesystem
  4. Convert contents to a JSON string
  5. Profit
import com.amazonaws.services.s3.model.S3Object;
import java.io.InputStream;
import java.io.StringWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.impl.MockFileSystem;
import org.apache.orc.tools.convert.OrcToJson;

String orcToJsonString(S3Object object) throws Exception {
  // Step 1
  InputStream stream = object.getObjectContent();
  byte[] streamBytes = stream.readAllBytes();
  // Step 2
  Configuration conf = new Configuration();
  MockFileSystem fs = new MockFileSystem(conf, streamBytes);
  // Step 3
  Reader reader = OrcFile.createReader(
    new Path("/foobar"),
    OrcFile.readerOptions(conf).filesystem(fs)
  );
  // Step 4
  StringWriter contents = new StringWriter();
  OrcToJson.printJsonData(contents, reader);
  // Step 5
  return contents.toString();
}

The following four files can be used to read an ORC file from S3.

MockFileSystem

Our mock filesystem will extend Hadoop’s FileSystem class, which can be implemented as a local filesystem for testing purposes.

We’ll copy this MockFileSystem class from Apache’s ORC repository.

However, we’ll modify the constructor to accept a byte array streamBytes, which will represent the contents of the S3Object (our ORC file).

Let’s follow Apache and place this MockFileSystem.java class in a package org.apache.orc.impl.

package org.apache.orc.impl;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

public class MockFileSystem extends FileSystem {
  final List<MockInputStream> streams = new ArrayList<>();
  byte[] streamBytes;
  public MockFileSystem(Configuration conf, byte[] streamBytes) {
    setConf(conf);
    this.streamBytes = streamBytes;
  }
  @Override
  public URI getUri() {
    try {
      return new URI("mock:///");
    } catch (URISyntaxException e) {
      throw new IllegalArgumentException("bad uri", e);
    }
  }
  @Override
  public FSDataInputStream open(Path path, int i) throws IOException {
    MockInputStream result = new MockInputStream(this, streamBytes);
    streams.add(result);
    return result;
  }
  @Override
  public FileStatus getFileStatus(Path path) {
    return new FileStatus(streamBytes.length, false, 1, 4096, 0, path);
  }
  void removeStream(MockInputStream stream) {
    streams.remove(stream);
  }

  int streamCount() {
    return streams.size();
  }
  @Override
  public FSDataOutputStream create(
    Path path,
    FsPermission fsPermission,
    boolean b,
    int i,
    short i1,
    long l,
    Progressable progressable
  ) throws IOException {
    throw new IOException("Can't create");
  }
  @Override
  public FSDataOutputStream append(Path path, int i, Progressable progressable)
    throws IOException {
    throw new IOException("Can't append");
  }
  @Override
  public boolean rename(Path path, Path path1) {
    return false;
  }
  @Override
  public boolean delete(Path path, boolean b) {
    return false;
  }
  @Override
  public FileStatus[] listStatus(Path path) {
    return new FileStatus[0];
  }
  @Override
  public void setWorkingDirectory(Path path) {}
  @Override
  public Path getWorkingDirectory() {
    return new Path("/");
  }
  @Override
  public boolean mkdirs(Path path, FsPermission fsPermission) {
    return false;
  }
}

MockInputStream

You might have noticed that MockFileSystem references a MockInputStream object. This is also defined in the ORC tests here.

In the ORC tests, the byte array of the ORC file is explicitly defined in this section. Since we’re passing the byte array in the MockFileSystem constructor, we can remove that definition and simply pass that array into MockInputStream.

Let’s follow Apache and place this MockInputStream.java class in the same package as above: org.apache.orc.impl.

package org.apache.orc.impl;

import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;

public class MockInputStream extends FSDataInputStream {
  MockFileSystem fs;
  public MockInputStream(MockFileSystem fs, byte[] streamBytes) throws IOException {
    super(new SeekableByteArrayInputStream(streamBytes));
    this.fs = fs;
  }
  public void close() {
    fs.removeStream(this);
  }
}

SeekableByteArrayInputStream

Once again, you might have noticed that MockInputStream references a SeekableByteArrayInputStream object. This is also defined in the ORC tests here.

Once again, let’s follow Apache and place this SeekableByteArrayInputStream.java class in the same package org.apache.orc.impl.

package org.apache.orc.impl;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;

public class SeekableByteArrayInputStream
  extends ByteArrayInputStream
  implements Seekable, PositionedReadable {
  public SeekableByteArrayInputStream(byte[] buf) {
    super(buf);
  }
  @Override
  public void seek(long pos) throws IOException {
    this.reset();
    this.skip(pos);
  }
  @Override
  public long getPos() throws IOException {
    return pos;
  }
  @Override
  public boolean seekToNewSource(long targetPos) throws IOException {
    return false;
  }
  @Override
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
    long oldPos = getPos();
    int nread = -1;
    try {
      seek(position);
      nread = read(buffer, offset, length);
    } finally {
      seek(oldPos);
    }
    return nread;
  }
  @Override
  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException {
    int nread = 0;
    while (nread < length) {
      int nbytes = read(position + nread, buffer, offset + nread, length - nread);
      if (nbytes < 0) {
        throw new EOFException("End of file reached before reading fully.");
      }
      nread += nbytes;
    }
  }
  @Override
  public void readFully(long position, byte[] buffer) throws IOException {
    readFully(position, buffer, 0, buffer.length);
  }
}

OrcToJson

Lastly, we need to convert from ORC to JSON, which we can (once again) grab from Apache’s ORC utility classes to print data.

Let’s follow Apache and place this OrcToJson.java class in a package org.apache.orc.tools.convert.

package org.apache.orc.tools.convert;

import java.io.IOException;
import java.io.Writer;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;

public class OrcToJson {
  public static void printJsonData(Writer out, Reader reader)
    throws IOException, JSONException {
    RecordReader rows = reader.rows();
    try {
      TypeDescription schema = reader.getSchema();
      VectorizedRowBatch batch = schema.createRowBatch();
      while (rows.nextBatch(batch)) {
        for (int r = 0; r < batch.size; ++r) {
          JSONWriter writer = new JSONWriter(out);
          printRow(writer, batch, schema, r);
          out.write("\n");
          out.flush();
        }
      }
    } finally {
      rows.close();
    }
  }
  static void printRow(
    JSONWriter writer,
    VectorizedRowBatch batch,
    TypeDescription schema,
    int row
  ) throws JSONException {
    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
      List<TypeDescription> fieldTypes = schema.getChildren();
      List<String> fieldNames = schema.getFieldNames();
      writer.object();
      for (int c = 0; c < batch.cols.length; ++c) {
        writer.key(fieldNames.get(c));
        printValue(writer, batch.cols[c], fieldTypes.get(c), row);
      }
      writer.endObject();
    } else {
      printValue(writer, batch.cols[0], schema, row);
    }
  }
  static void printMap(
    JSONWriter writer,
    MapColumnVector vector,
    TypeDescription schema,
    int row
  ) throws JSONException {
    writer.array();
    TypeDescription keyType = schema.getChildren().get(0);
    TypeDescription valueType = schema.getChildren().get(1);
    int offset = (int) vector.offsets[row];
    for (int i = 0; i < vector.lengths[row]; ++i) {
      writer.object();
      writer.key("_key");
      printValue(writer, vector.keys, keyType, offset + i);
      writer.key("_value");
      printValue(writer, vector.values, valueType, offset + i);
      writer.endObject();
    }
    writer.endArray();
  }
  static void printList(
    JSONWriter writer,
    ListColumnVector vector,
    TypeDescription schema,
    int row
  ) throws JSONException {
    writer.array();
    int offset = (int) vector.offsets[row];
    TypeDescription childType = schema.getChildren().get(0);
    for (int i = 0; i < vector.lengths[row]; ++i) {
      printValue(writer, vector.child, childType, offset + i);
    }
    writer.endArray();
  }
  private static void printUnion(
    JSONWriter writer,
    UnionColumnVector vector,
    TypeDescription schema,
    int row
  ) throws JSONException {
    int tag = vector.tags[row];
    printValue(writer, vector.fields[tag], schema.getChildren().get(tag), row);
  }
  static void printStruct(
    JSONWriter writer,
    StructColumnVector batch,
    TypeDescription schema,
    int row
  ) throws JSONException {
    writer.object();
    List<String> fieldNames = schema.getFieldNames();
    List<TypeDescription> fieldTypes = schema.getChildren();
    for (int i = 0; i < fieldTypes.size(); ++i) {
      writer.key(fieldNames.get(i));
      printValue(writer, batch.fields[i], fieldTypes.get(i), row);
    }
    writer.endObject();
  }
  static void printBinary(JSONWriter writer, BytesColumnVector vector, int row)
    throws JSONException {
    writer.array();
    int offset = vector.start[row];
    for (int i = 0; i < vector.length[row]; ++i) {
      writer.value(0xff & (int) vector.vector[row][offset + i]);
    }
    writer.endArray();
  }
  static void printValue(
    JSONWriter writer,
    ColumnVector vector,
    TypeDescription schema,
    int row
  ) throws JSONException {
    if (vector.isRepeating) {
      row = 0;
    }
    if (vector.noNulls || !vector.isNull[row]) {
      switch (schema.getCategory()) {
        case BOOLEAN:
          writer.value(((LongColumnVector) vector).vector[row] != 0);
          break;
        case BYTE:
        case SHORT:
        case INT:
        case LONG:
          writer.value(((LongColumnVector) vector).vector[row]);
          break;
        case FLOAT:
        case DOUBLE:
          writer.value(((DoubleColumnVector) vector).vector[row]);
          break;
        case STRING:
        case CHAR:
        case VARCHAR:
          writer.value(((BytesColumnVector) vector).toString(row));
          break;
        case BINARY:
          printBinary(writer, (BytesColumnVector) vector, row);
          break;
        case DECIMAL:
          writer.value(((DecimalColumnVector) vector).vector[row].toString());
          break;
        case DATE:
          writer.value(
            new DateWritable((int) ((LongColumnVector) vector).vector[row]).toString()
          );
          break;
        case TIMESTAMP:
        case TIMESTAMP_INSTANT:
          writer.value(
            ((TimestampColumnVector) vector).asScratchTimestamp(row).toString()
          );
          break;
        case LIST:
          printList(writer, (ListColumnVector) vector, schema, row);
          break;
        case MAP:
          printMap(writer, (MapColumnVector) vector, schema, row);
          break;
        case STRUCT:
          printStruct(writer, (StructColumnVector) vector, schema, row);
          break;
        case UNION:
          printUnion(writer, (UnionColumnVector) vector, schema, row);
          break;
        default:
          throw new IllegalArgumentException("Unknown type " + schema.toString());
      }
    } else {
      writer.value(null);
    }
  }
}