aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/jdbc/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/jdbc/db')
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java276
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java452
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java391
-rw-r--r--src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java83
4 files changed, 1202 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
new file mode 100644
index 0000000..1bb0786
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * A container for configuration property names for jobs with DB input/output. <br> The job can be
+ * configured using the static methods in this class, {@link DBInputFormat}, and {@link
+ * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper
+ * values.
+ */
+public class DBConfiguration {
+
+ /** The JDBC Driver class name */
+ public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+
+ /** JDBC Database access URL */
+ public static final String URL_PROPERTY = "mapred.jdbc.url";
+
+ /** User name to access the database */
+ public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+
+ /** Password to access the database */
+ public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+
+ /** Input table name */
+ public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+
+ /** Field names in the Input table */
+ public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+
+ /** WHERE clause in the input SELECT statement */
+ public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+
+ /** ORDER BY clause in the input SELECT statement */
+ public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+
+ /** Whole input query, exluding LIMIT...OFFSET */
+ public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+
+ /** The number of records to LIMIT, useful for testing */
+ public static final String INPUT_LIMIT = "mapred.jdbc.input.limit";
+
+ /** Input query to get the count of records */
+ public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+
+ /** Class name implementing DBWritable which will hold input tuples */
+ public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+
+ /** Output table name */
+ public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+
+ /** Field names in the Output table */
+ public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";
+
+ /** Field names in the Output table */
+ public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY =
+ "mapred.jdbc.output.update.field.names";
+
+ /** The number of statements to batch before executing */
+ public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num";
+
+ /** The number of splits allowed, becomes max concurrent reads. */
+ public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num";
+
+ /**
+ * Sets the DB access related fields in the Configuration.
+ *
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ * @param userName DB access username
+ * @param passwd DB access passwd
+ */
+ public static void configureDB(Configuration job, String driverClass, String dbUrl,
+ String userName, String passwd) {
+ job.set(DRIVER_CLASS_PROPERTY, driverClass);
+ job.set(URL_PROPERTY, dbUrl);
+
+ if (userName != null) { job.set(USERNAME_PROPERTY, userName); }
+
+ if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); }
+ }
+
+ /**
+ * Sets the DB access related fields in the Configuration.
+ *
+ * @param job the job
+ * @param driverClass JDBC Driver class name
+ * @param dbUrl JDBC DB access URL.
+ */
+ public static void configureDB(Configuration job, String driverClass, String dbUrl) {
+ configureDB(job, driverClass, dbUrl, null, null);
+ }
+
+ private Configuration job;
+
+ DBConfiguration(Configuration job) {
+ this.job = job;
+ }
+
+ /**
+ * Returns a connection object to the DB
+ *
+ * @throws ClassNotFoundException
+ * @throws SQLException
+ */
+ Connection getConnection() throws IOException {
+ try {
+ Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+ } catch (ClassNotFoundException exception) {
+ throw new IOException("unable to load conection driver", exception);
+ }
+
+ try {
+ if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+ return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
+ } else {
+ return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job
+ .get(DBConfiguration.USERNAME_PROPERTY), job
+ .get(DBConfiguration.PASSWORD_PROPERTY));
+ }
+ } catch (SQLException exception) {
+ throw new IOException("unable to create connection", exception);
+ }
+ }
+
+ String getInputTableName() {
+ return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setInputTableName(String tableName) {
+ job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getInputFieldNames() {
+ return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setInputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ String getInputConditions() {
+ return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+ }
+
+ void setInputConditions(String conditions) {
+ if (conditions != null && conditions.length() > 0) {
+ job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+ }
+ }
+
+ String getInputOrderBy() {
+ return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+ }
+
+ void setInputOrderBy(String orderby) {
+ if (orderby != null && orderby.length() > 0) {
+ job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+ }
+ }
+
+ String getInputQuery() {
+ return job.get(DBConfiguration.INPUT_QUERY);
+ }
+
+ void setInputQuery(String query) {
+ if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); }
+ }
+
+ long getInputLimit() {
+ return job.getLong(DBConfiguration.INPUT_LIMIT, -1);
+ }
+
+ void setInputLimit(long limit) {
+ job.setLong(DBConfiguration.INPUT_LIMIT, limit);
+ }
+
+ String getInputCountQuery() {
+ return job.get(DBConfiguration.INPUT_COUNT_QUERY);
+ }
+
+ void setInputCountQuery(String query) {
+ if (query != null && query.length() > 0) {
+ job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+ }
+ }
+
+ Class<?> getInputClass() {
+ return job
+ .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class);
+ }
+
+ void setInputClass(Class<? extends DBWritable> inputClass) {
+ job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
+ }
+
+ String getOutputTableName() {
+ return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+ }
+
+ void setOutputTableName(String tableName) {
+ job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+ }
+
+ String[] getOutputFieldNames() {
+ return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+ }
+
+ void setOutputFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ String[] getOutputUpdateFieldNames() {
+ return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY);
+ }
+
+ void setOutputUpdateFieldNames(String... fieldNames) {
+ job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames);
+ }
+
+ int getBatchStatementsNum() {
+ return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000);
+ }
+
+ void setBatchStatementsNum(int batchStatementsNum) {
+ job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum);
+ }
+
+ int getMaxConcurrentReadsNum() {
+ return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0);
+ }
+
+ void setMaxConcurrentReadsNum(int maxConcurrentReads) {
+ if (maxConcurrentReads < 0) {
+ throw new IllegalArgumentException("maxConcurrentReads must be a positive value");
+ }
+
+ job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads);
+ }
+
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java
new file mode 100644
index 0000000..115d9bd
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.*;
+
+/**
+ * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables
+ * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class
+ * can be using one of the two setInput methods.
+ */
+public class DBInputFormat<T extends DBWritable>
+ implements InputFormat<LongWritable, T>, JobConfigurable {
+ /** Field LOG */
+ private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class);
+
+ /**
+ * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record
+ * number as key and DBWritables as value.
+ */
+ protected class DBRecordReader implements RecordReader<LongWritable, T> {
+ private ResultSet results;
+ private Statement statement;
+ private Class<T> inputClass;
+ private JobConf job;
+ private DBInputSplit split;
+ private long pos = 0;
+
+ /**
+ * @param split The InputSplit to read data for
+ * @throws SQLException
+ */
+ protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job)
+ throws SQLException, IOException {
+ this.inputClass = inputClass;
+ this.split = split;
+ this.job = job;
+
+ statement =
+ connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+ //statement.setFetchSize(Integer.MIN_VALUE);
+ String query = getSelectQuery();
+ try {
+ LOG.info(query);
+ results = statement.executeQuery(query);
+ LOG.info("done executing select query");
+ } catch (SQLException exception) {
+ LOG.error("unable to execute select query: " + query, exception);
+ throw new IOException("unable to execute select query: " + query, exception);
+ }
+ }
+
+ /**
+ * Returns the query for selecting the records, subclasses can override this for custom
+ * behaviour.
+ */
+ protected String getSelectQuery() {
+ LOG.info("Executing select query");
+ StringBuilder query = new StringBuilder();
+
+ if (dbConf.getInputQuery() == null) {
+ query.append("SELECT ");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+
+ if (i != fieldNames.length - 1)
+ query.append(", ");
+ }
+
+ query.append(" FROM ").append(tableName);
+ query.append(" AS ").append(tableName); //in hsqldb this is necessary
+
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE (").append(conditions).append(")");
+
+ String orderBy = dbConf.getInputOrderBy();
+
+ if (orderBy != null && orderBy.length() > 0)
+ query.append(" ORDER BY ").append(orderBy);
+
+ }
+ else
+ query.append(dbConf.getInputQuery());
+
+ try {
+ // Only add limit and offset if you have multiple chunks
+ if(split.getChunks() > 1) {
+ query.append(" LIMIT ").append(split.getLength());
+ query.append(" OFFSET ").append(split.getStart());
+ }
+ } catch (IOException ex) {
+ //ignore, will not throw
+ }
+
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ try {
+ connection.commit();
+ results.close();
+ statement.close();
+ } catch (SQLException exception) {
+ throw new IOException("unable to commit and close", exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ /** {@inheritDoc} */
+ public T createValue() {
+ return ReflectionUtils.newInstance(inputClass, job);
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() throws IOException {
+ return pos / (float) split.getLength();
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(LongWritable key, T value) throws IOException {
+ try {
+ if (!results.next())
+ return false;
+
+ // Set the key field value as the output key value
+ key.set(pos + split.getStart());
+
+ value.readFields(results);
+
+ pos++;
+ } catch (SQLException exception) {
+ throw new IOException("unable to get next value", exception);
+ }
+
+ return true;
+ }
+ }
+
+ /** A Class that does nothing, implementing DBWritable */
+ public static class NullDBWritable implements DBWritable, Writable {
+
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ public void readFields(ResultSet arg0) throws SQLException {
+ }
+
+ public void write(DataOutput out) throws IOException {
+ }
+
+ public void write(PreparedStatement arg0) throws SQLException {
+ }
+ }
+
+ /** A InputSplit that spans a set of rows */
+ protected static class DBInputSplit implements InputSplit {
+ private long end = 0;
+ private long start = 0;
+ private long chunks = 0;
+
+ /** Default Constructor */
+ public DBInputSplit() {
+ }
+
+ /**
+ * Convenience Constructor
+ *
+ * @param start the index of the first row to select
+ * @param end the index of the last row to select
+ */
+ public DBInputSplit(long start, long end, long chunks) {
+ this.start = start;
+ this.end = end;
+ this.chunks = chunks;
+ LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks);
+ }
+
+ /** {@inheritDoc} */
+ public String[] getLocations() throws IOException {
+ // TODO Add a layer to enable SQL "sharding" and support locality
+ return new String[]{};
+ }
+
+ /** @return The index of the first row to select */
+ public long getStart() {
+ return start;
+ }
+
+ /** @return The index of the last row to select */
+ public long getEnd() {
+ return end;
+ }
+
+ /** @return The total row count in this split */
+ public long getLength() throws IOException {
+ return end - start;
+ }
+
+ /** @return The total number of chucks accross all splits */
+ public long getChunks() {
+ return chunks;
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput input) throws IOException {
+ start = input.readLong();
+ end = input.readLong();
+ chunks = input.readLong();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(start);
+ output.writeLong(end);
+ output.writeLong(chunks);
+ }
+ }
+
+ protected DBConfiguration dbConf;
+ protected Connection connection;
+
+ protected String tableName;
+ protected String[] fieldNames;
+ protected String conditions;
+ protected long limit;
+ protected int maxConcurrentReads;
+
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ dbConf = new DBConfiguration(job);
+
+ tableName = dbConf.getInputTableName();
+ fieldNames = dbConf.getInputFieldNames();
+ conditions = dbConf.getInputConditions();
+ limit = dbConf.getInputLimit();
+ maxConcurrentReads = dbConf.getMaxConcurrentReadsNum();
+
+ try {
+ connection = dbConf.getConnection();
+ } catch (IOException exception) {
+ throw new RuntimeException("unable to create connection", exception.getCause());
+ }
+
+ configureConnection(connection);
+ }
+
+ protected void configureConnection(Connection connection) {
+ setTransactionIsolationLevel(connection);
+ setAutoCommit(connection);
+ }
+
+ protected void setAutoCommit(Connection connection) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (Exception exception) {
+ throw new RuntimeException("unable to set auto commit", exception);
+ }
+ }
+
+ protected void setTransactionIsolationLevel(Connection connection) {
+ try {
+ connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ } catch (SQLException exception) {
+ throw new RuntimeException("unable to configure transaction isolation level", exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ Class inputClass = dbConf.getInputClass();
+ try {
+ return new DBRecordReader((DBInputSplit) split, inputClass, job);
+ } catch (SQLException exception) {
+ throw new IOException(exception.getMessage(), exception);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
+ // use the configured value if avail
+ chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads;
+
+ try {
+ Statement statement = connection.createStatement();
+
+ ResultSet results = statement.executeQuery(getCountQuery());
+
+ long count = 0;
+
+ while (results.next())
+ count += results.getLong(1);
+
+ if (limit != -1)
+ count = Math.min(limit, count);
+
+ long chunkSize = (count / chunks);
+
+ results.close();
+ statement.close();
+
+ InputSplit[] splits = new InputSplit[chunks];
+
+ // Split the rows into n-number of chunks and adjust the last chunk
+ // accordingly
+ for (int i = 0; i < chunks; i++) {
+ DBInputSplit split;
+
+ if (i + 1 == chunks)
+ split = new DBInputSplit(i * chunkSize, count, chunks);
+ else
+ split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks);
+
+ splits[i] = split;
+ }
+
+ return splits;
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Returns the query for getting the total number of rows, subclasses can override this for
+ * custom behaviour.
+ */
+ protected String getCountQuery() {
+
+ if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); }
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT COUNT(*) FROM " + tableName);
+
+ if (conditions != null && conditions.length() > 0)
+ query.append(" WHERE " + conditions);
+
+ return query.toString();
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the Java object
+ * holding tuple fields.
+ * @param tableName The table to read data from
+ * @param conditions The condition which to select data with, eg. '(updated > 20070101 AND
+ * length > 0)'
+ * @param orderBy the fieldNames in the orderBy clause.
+ * @param limit
+ * @param fieldNames The field names in the table
+ * @param concurrentReads
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String tableName, String conditions, String orderBy, long limit, int concurrentReads,
+ String... fieldNames) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputTableName(tableName);
+ dbConf.setInputFieldNames(fieldNames);
+ dbConf.setInputConditions(conditions);
+ dbConf.setInputOrderBy(orderBy);
+
+ if (limit != -1)
+ dbConf.setInputLimit(limit);
+
+ dbConf.setMaxConcurrentReadsNum(concurrentReads);
+ }
+
+ /**
+ * Initializes the map-part of the job with the appropriate input settings.
+ *
+ * @param job The job
+ * @param inputClass the class object implementing DBWritable, which is the Java object
+ * holding tuple fields.
+ * @param selectQuery the input query to select fields. Example : "SELECT f1, f2, f3 FROM
+ * Mytable ORDER BY f1"
+ * @param countQuery the input query that returns the number of records in the table.
+ * Example : "SELECT COUNT(f1) FROM Mytable"
+ * @param concurrentReads
+ */
+ public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
+ String selectQuery, String countQuery, long limit, int concurrentReads) {
+ job.setInputFormat(DBInputFormat.class);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setInputClass(inputClass);
+ dbConf.setInputQuery(selectQuery);
+ dbConf.setInputCountQuery(countQuery);
+
+ if (limit != -1)
+ dbConf.setInputLimit(limit);
+
+ dbConf.setMaxConcurrentReadsNum(concurrentReads);
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
new file mode 100644
index 0000000..1166970
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java
@@ -0,0 +1,391 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+import com.jcraft.jsch.Logger;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts
+ * &lt;key,value&gt; pairs, where key has a type extending DBWritable. Returned {@link RecordWriter}
+ * writes <b>only the key</b> to the database with a batch SQL query.
+ */
+public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> {
+ private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+
+ /** A RecordWriter that writes the reduce output to a SQL table */
+ protected class DBRecordWriter implements RecordWriter<K, V> {
+ private Connection connection;
+ private PreparedStatement insertStatement;
+ private PreparedStatement updateStatement;
+ private final int statementsBeforeExecute;
+
+ private long statementsAdded = 0;
+ private long insertStatementsCurrent = 0;
+ private long updateStatementsCurrent = 0;
+
+ protected DBRecordWriter(Connection connection, PreparedStatement insertStatement,
+ PreparedStatement updateStatement, int statementsBeforeExecute) {
+ this.connection = connection;
+ this.insertStatement = insertStatement;
+ this.updateStatement = updateStatement;
+ this.statementsBeforeExecute = statementsBeforeExecute;
+ }
+
+ /** {@inheritDoc} */
+ public void close(Reporter reporter) throws IOException {
+ executeBatch();
+
+ try {
+ if (insertStatement != null) { insertStatement.close(); }
+
+ if (updateStatement != null) { updateStatement.close(); }
+
+ connection.commit();
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to commit batch", 0, exception);
+ } finally {
+ try {
+ connection.close();
+ } catch (SQLException exception) {
+ throw new IOException("unable to close connection", exception);
+ }
+ }
+ }
+
+ private void executeBatch() throws IOException {
+ try {
+ if (insertStatementsCurrent != 0) {
+ LOG.info(
+ "executing insert batch " + createBatchMessage(insertStatementsCurrent));
+
+ insertStatement.executeBatch();
+ }
+
+ insertStatementsCurrent = 0;
+ } catch (SQLException exception) {
+ rollBack();
+
+ createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception);
+ }
+
+ try {
+ if (updateStatementsCurrent != 0) {
+ LOG.info(
+ "executing update batch " + createBatchMessage(updateStatementsCurrent));
+
+ int[] result = updateStatement.executeBatch();
+
+ int count = 0;
+
+ for (int value : result) { count += value; }
+
+ if (count != updateStatementsCurrent) {
+ throw new IOException(
+ "update did not update same number of statements executed in batch, batch: "
+ + updateStatementsCurrent + " updated: " + count);
+ }
+ }
+
+ updateStatementsCurrent = 0;
+ } catch (SQLException exception) {
+
+ String message = exception.getMessage();
+ if (message.indexOf("Duplicate Key") >= 0) {
+ LOG.warn("In exception block. Bypass exception becuase of Insert/Update.");
+ } else {
+ rollBack();
+
+ createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception);
+ }
+ }
+ }
+
+ private void rollBack() {
+ try {
+ connection.rollback();
+ } catch (SQLException sqlException) {
+ LOG.warn(StringUtils.stringifyException(sqlException));
+ }
+ }
+
+ private String createBatchMessage(long currentStatements) {
+ return String
+ .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute);
+ }
+
+ private void createThrowMessage(String stateMessage, long currentStatements,
+ SQLException exception) throws IOException {
+ String message = exception.getMessage();
+
+ message = message.substring(0, Math.min(75, message.length()));
+
+ int messageLength = exception.getMessage().length();
+ String batchMessage = createBatchMessage(currentStatements);
+ String template = "%s [msglength: %d]%s %s";
+ String errorMessage =
+ String.format(template, stateMessage, messageLength, batchMessage, message);
+
+ LOG.error(errorMessage, exception.getNextException());
+
+ throw new IOException(errorMessage, exception.getNextException());
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void write(K key, V value) throws IOException {
+ try {
+ if (value == null) {
+ key.write(insertStatement);
+ insertStatement.addBatch();
+ insertStatementsCurrent++;
+ } else {
+ key.write(updateStatement);
+ updateStatement.addBatch();
+ updateStatementsCurrent++;
+ }
+ } catch (SQLException exception) {
+ throw new IOException("unable to add batch statement", exception);
+ }
+
+ statementsAdded++;
+
+ if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); }
+ }
+ }
+
+ /**
+ * Constructs the query used as the prepared statement to insert data.
+ *
+ * @param table the table to insert into
+ * @param fieldNames the fields to insert into. If field names are unknown, supply an array of
+ * nulls.
+ */
+ protected String constructInsertQuery(String table, String[] fieldNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("Field names may not be null");
+ }
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("INSERT INTO ").append(table);
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ query.append(" (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append(fieldNames[i]);
+
+ if (i != fieldNames.length - 1) { query.append(","); }
+ }
+
+ query.append(")");
+
+ }
+
+ query.append(" VALUES (");
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ query.append("?");
+
+ if (i != fieldNames.length - 1) { query.append(","); }
+ }
+
+ query.append(")");
+
+ boolean test = true;
+ if (test) {
+ query.append(" ON DUPLICATE KEY UPDATE ");
+
+
+ for (int i = 1; i < fieldNames.length; i++) {
+
+
+ if ( (i != 1) ) { query.append(","); }
+ //if (i != fieldNames.length - 1) { query.append(","); }
+ //&& (i != fieldNames.length - 1)
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+
+ }
+ }
+
+ query.append(";");
+
+ LOG.info(" ===================== " + query.toString());
+ return query.toString();
+ }
+
+ protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) {
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("field names may not be null");
+ }
+
+ Set<String> updateNamesSet = new HashSet<String>();
+ Collections.addAll(updateNamesSet, updateNames);
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("UPDATE ").append(table);
+
+ query.append(" SET ");
+
+ if (fieldNames.length > 0 && fieldNames[0] != null) {
+ int count = 0;
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (updateNamesSet.contains(fieldNames[i])) { continue; }
+
+ if (count != 0) { query.append(","); }
+
+ query.append(fieldNames[i]);
+ query.append(" = ?");
+
+ count++;
+ }
+ }
+
+ query.append(" WHERE ");
+
+ if (updateNames.length > 0 && updateNames[0] != null) {
+ for (int i = 0; i < updateNames.length; i++) {
+ query.append(updateNames[i]);
+ query.append(" = ?");
+
+ if (i != updateNames.length - 1) { query.append(" and "); }
+ }
+ }
+
+ query.append(";");
+ System.out.println("Update Query => " + query.toString());
+ return query.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException {
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name,
+ Progressable progress) throws IOException {
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ String tableName = dbConf.getOutputTableName();
+ String[] fieldNames = dbConf.getOutputFieldNames();
+ String[] updateNames = dbConf.getOutputUpdateFieldNames();
+ int batchStatements = dbConf.getBatchStatementsNum();
+
+ Connection connection = dbConf.getConnection();
+
+ configureConnection(connection);
+
+ String sqlInsert = constructInsertQuery(tableName, fieldNames);
+ PreparedStatement insertPreparedStatement;
+
+ try {
+ insertPreparedStatement = connection.prepareStatement(sqlInsert);
+ insertPreparedStatement.setEscapeProcessing(true); // should be on by default
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlInsert, exception);
+ }
+
+ String sqlUpdate =
+ updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null;
+ PreparedStatement updatePreparedStatement = null;
+
+ try {
+ updatePreparedStatement =
+ sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null;
+ } catch (SQLException exception) {
+ throw new IOException("unable to create statement for: " + sqlUpdate, exception);
+ }
+
+ return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements);
+ }
+
+ protected void configureConnection(Connection connection) {
+ setAutoCommit(connection);
+ }
+
+ protected void setAutoCommit(Connection connection) {
+ try {
+ connection.setAutoCommit(false);
+ } catch (Exception exception) {
+ throw new RuntimeException("unable to set auto commit", exception);
+ }
+ }
+
+ /**
+ * Initializes the reduce-part of the job with the appropriate output settings
+ *
+ * @param job The job
+ * @param dbOutputFormatClass
+ * @param tableName The table to insert data into
+ * @param fieldNames The field names in the table. If unknown, supply the appropriate
+ */
+ public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass,
+ String tableName, String[] fieldNames, String[] updateFields, int batchSize) {
+ if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else {
+ job.setOutputFormat(dbOutputFormatClass);
+ }
+
+ // writing doesn't always happen in reduce
+ job.setReduceSpeculativeExecution(false);
+ job.setMapSpeculativeExecution(false);
+
+ DBConfiguration dbConf = new DBConfiguration(job);
+
+ dbConf.setOutputTableName(tableName);
+ dbConf.setOutputFieldNames(fieldNames);
+
+ if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); }
+
+ if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); }
+ }
+}
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java
new file mode 100644
index 0000000..1369c74
--- /dev/null
+++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2009 Concurrent, Inc.
+ *
+ * This work has been released into the public domain
+ * by the copyright holder. This applies worldwide.
+ *
+ * In case this is not legally possible:
+ * The copyright holder grants any entity the right
+ * to use this work for any purpose, without any
+ * conditions, unless such conditions are required by law.
+ */
+
+package parallelai.spyglass.jdbc.db;
+
+import org.apache.hadoop.io.Writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Objects that are read from/written to a database should implement <code>DBWritable</code>.
+ * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)}
+ * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link
+ * ResultSet}. <p> Implementations are responsible for writing the fields of the object to
+ * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p>
+ * If we have the following table in the database :
+ * <pre>
+ * CREATE TABLE MyTable (
+ * counter INTEGER NOT NULL,
+ * timestamp BIGINT NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with :
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ * // Some data
+ * private int counter;
+ * private long timestamp;
+ * <p/>
+ * //Writable#write() implementation
+ * public void write(DataOutput out) throws IOException {
+ * out.writeInt(counter);
+ * out.writeLong(timestamp);
+ * }
+ * <p/>
+ * //Writable#readFields() implementation
+ * public void readFields(DataInput in) throws IOException {
+ * counter = in.readInt();
+ * timestamp = in.readLong();
+ * }
+ * <p/>
+ * public void write(PreparedStatement statement) throws SQLException {
+ * statement.setInt(1, counter);
+ * statement.setLong(2, timestamp);
+ * }
+ * <p/>
+ * public void readFields(ResultSet resultSet) throws SQLException {
+ * counter = resultSet.getInt(1);
+ * timestamp = resultSet.getLong(2);
+ * }
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+ /**
+ * Sets the fields of the object in the {@link PreparedStatement}.
+ *
+ * @param statement the statement that the fields are put into.
+ * @throws SQLException
+ */
+ public void write(PreparedStatement statement) throws SQLException;
+
+ /**
+ * Reads the fields of the object from the {@link ResultSet}.
+ *
+ * @param resultSet the {@link ResultSet} to get the fields from.
+ * @throws SQLException
+ */
+ public void readFields(ResultSet resultSet) throws SQLException;
+
+}