diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java | 734 |
1 files changed, 387 insertions, 347 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java index 1166970..3f10a04 100644 --- a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -29,6 +29,14 @@ package parallelai.spyglass.jdbc.db; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -39,353 +47,385 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; -import com.jcraft.jsch.Logger; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - /** - * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts - * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} - * writes <b>only the key</b> to the database with a batch SQL query. + * A OutputFormat that sends the reduce output to a SQL table. + * <p/> + * {@link DBOutputFormat} accepts <key,value> pairs, where key has a type + * extending DBWritable. Returned {@link RecordWriter} writes <b>only the + * key</b> to the database with a batch SQL query. */ -public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { - private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); - - /** A RecordWriter that writes the reduce output to a SQL table */ - protected class DBRecordWriter implements RecordWriter<K, V> { - private Connection connection; - private PreparedStatement insertStatement; - private PreparedStatement updateStatement; - private final int statementsBeforeExecute; - - private long statementsAdded = 0; - private long insertStatementsCurrent = 0; - private long updateStatementsCurrent = 0; - - protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, - PreparedStatement updateStatement, int statementsBeforeExecute) { - this.connection = connection; - this.insertStatement = insertStatement; - this.updateStatement = updateStatement; - this.statementsBeforeExecute = statementsBeforeExecute; - } - - /** {@inheritDoc} */ - public void close(Reporter reporter) throws IOException { - executeBatch(); - - try { - if (insertStatement != null) { insertStatement.close(); } - - if (updateStatement != null) { updateStatement.close(); } - - connection.commit(); - } catch (SQLException exception) { - rollBack(); - - createThrowMessage("unable to commit batch", 0, exception); - } finally { - try { - connection.close(); - } catch (SQLException exception) { - throw new IOException("unable to close connection", exception); - } - } - } - - private void executeBatch() throws IOException { - try { - if (insertStatementsCurrent != 0) { - LOG.info( - "executing insert batch " + createBatchMessage(insertStatementsCurrent)); - - insertStatement.executeBatch(); - } - - insertStatementsCurrent = 0; - } catch (SQLException exception) { - rollBack(); - - createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); - } - - try { - if (updateStatementsCurrent != 0) { - LOG.info( - "executing update batch " + createBatchMessage(updateStatementsCurrent)); - - int[] result = updateStatement.executeBatch(); - - int count = 0; - - for (int value : result) { count += value; } - - if (count != updateStatementsCurrent) { - throw new IOException( - "update did not update same number of statements executed in batch, batch: " - + updateStatementsCurrent + " updated: " + count); - } - } - - updateStatementsCurrent = 0; - } catch (SQLException exception) { - - String message = exception.getMessage(); - if (message.indexOf("Duplicate Key") >= 0) { - LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); - } else { - rollBack(); - - createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); - } - } - } - - private void rollBack() { - try { - connection.rollback(); - } catch (SQLException sqlException) { - LOG.warn(StringUtils.stringifyException(sqlException)); - } - } - - private String createBatchMessage(long currentStatements) { - return String - .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); - } - - private void createThrowMessage(String stateMessage, long currentStatements, - SQLException exception) throws IOException { - String message = exception.getMessage(); - - message = message.substring(0, Math.min(75, message.length())); - - int messageLength = exception.getMessage().length(); - String batchMessage = createBatchMessage(currentStatements); - String template = "%s [msglength: %d]%s %s"; - String errorMessage = - String.format(template, stateMessage, messageLength, batchMessage, message); - - LOG.error(errorMessage, exception.getNextException()); - - throw new IOException(errorMessage, exception.getNextException()); - } - - /** {@inheritDoc} */ - public synchronized void write(K key, V value) throws IOException { - try { - if (value == null) { - key.write(insertStatement); - insertStatement.addBatch(); - insertStatementsCurrent++; - } else { - key.write(updateStatement); - updateStatement.addBatch(); - updateStatementsCurrent++; - } - } catch (SQLException exception) { - throw new IOException("unable to add batch statement", exception); - } - - statementsAdded++; - - if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } - } - } - - /** - * Constructs the query used as the prepared statement to insert data. - * - * @param table the table to insert into - * @param fieldNames the fields to insert into. If field names are unknown, supply an array of - * nulls. - */ - protected String constructInsertQuery(String table, String[] fieldNames) { - if (fieldNames == null) { - throw new IllegalArgumentException("Field names may not be null"); - } - - StringBuilder query = new StringBuilder(); - - query.append("INSERT INTO ").append(table); - - if (fieldNames.length > 0 && fieldNames[0] != null) { - query.append(" ("); - - for (int i = 0; i < fieldNames.length; i++) { - query.append(fieldNames[i]); - - if (i != fieldNames.length - 1) { query.append(","); } - } - - query.append(")"); - - } - - query.append(" VALUES ("); - - for (int i = 0; i < fieldNames.length; i++) { - query.append("?"); - - if (i != fieldNames.length - 1) { query.append(","); } - } - - query.append(")"); - - boolean test = true; - if (test) { - query.append(" ON DUPLICATE KEY UPDATE "); - - - for (int i = 1; i < fieldNames.length; i++) { - - - if ( (i != 1) ) { query.append(","); } - //if (i != fieldNames.length - 1) { query.append(","); } - //&& (i != fieldNames.length - 1) - query.append(fieldNames[i]); - query.append(" = ?"); - - - } - } - - query.append(";"); - - LOG.info(" ===================== " + query.toString()); - return query.toString(); - } - - protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { - if (fieldNames == null) { - throw new IllegalArgumentException("field names may not be null"); - } - - Set<String> updateNamesSet = new HashSet<String>(); - Collections.addAll(updateNamesSet, updateNames); - - StringBuilder query = new StringBuilder(); - - query.append("UPDATE ").append(table); - - query.append(" SET "); - - if (fieldNames.length > 0 && fieldNames[0] != null) { - int count = 0; - - for (int i = 0; i < fieldNames.length; i++) { - if (updateNamesSet.contains(fieldNames[i])) { continue; } - - if (count != 0) { query.append(","); } - - query.append(fieldNames[i]); - query.append(" = ?"); - - count++; - } - } - - query.append(" WHERE "); - - if (updateNames.length > 0 && updateNames[0] != null) { - for (int i = 0; i < updateNames.length; i++) { - query.append(updateNames[i]); - query.append(" = ?"); - - if (i != updateNames.length - 1) { query.append(" and "); } - } - } - - query.append(";"); - System.out.println("Update Query => " + query.toString()); - return query.toString(); - } - - /** {@inheritDoc} */ - public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { - } - - /** {@inheritDoc} */ - public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, - Progressable progress) throws IOException { - DBConfiguration dbConf = new DBConfiguration(job); - - String tableName = dbConf.getOutputTableName(); - String[] fieldNames = dbConf.getOutputFieldNames(); - String[] updateNames = dbConf.getOutputUpdateFieldNames(); - int batchStatements = dbConf.getBatchStatementsNum(); - - Connection connection = dbConf.getConnection(); - - configureConnection(connection); - - String sqlInsert = constructInsertQuery(tableName, fieldNames); - PreparedStatement insertPreparedStatement; - - try { - insertPreparedStatement = connection.prepareStatement(sqlInsert); - insertPreparedStatement.setEscapeProcessing(true); // should be on by default - } catch (SQLException exception) { - throw new IOException("unable to create statement for: " + sqlInsert, exception); - } - - String sqlUpdate = - updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; - PreparedStatement updatePreparedStatement = null; - - try { - updatePreparedStatement = - sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; - } catch (SQLException exception) { - throw new IOException("unable to create statement for: " + sqlUpdate, exception); - } - - return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); - } - - protected void configureConnection(Connection connection) { - setAutoCommit(connection); - } - - protected void setAutoCommit(Connection connection) { - try { - connection.setAutoCommit(false); - } catch (Exception exception) { - throw new RuntimeException("unable to set auto commit", exception); - } - } - - /** - * Initializes the reduce-part of the job with the appropriate output settings - * - * @param job The job - * @param dbOutputFormatClass - * @param tableName The table to insert data into - * @param fieldNames The field names in the table. If unknown, supply the appropriate - */ - public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, - String tableName, String[] fieldNames, String[] updateFields, int batchSize) { - if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { - job.setOutputFormat(dbOutputFormatClass); - } - - // writing doesn't always happen in reduce - job.setReduceSpeculativeExecution(false); - job.setMapSpeculativeExecution(false); - - DBConfiguration dbConf = new DBConfiguration(job); - - dbConf.setOutputTableName(tableName); - dbConf.setOutputFieldNames(fieldNames); - - if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } - - if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } - } +public class DBOutputFormat<K extends DBWritable, V> implements + OutputFormat<K, V> { + private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + + /** A RecordWriter that writes the reduce output to a SQL table */ + protected class DBRecordWriter implements RecordWriter<K, V> { + private Connection connection; + private PreparedStatement insertStatement; + private PreparedStatement updateStatement; + private final int statementsBeforeExecute; + + private long statementsAdded = 0; + private long insertStatementsCurrent = 0; + private long updateStatementsCurrent = 0; + + protected DBRecordWriter(Connection connection, + PreparedStatement insertStatement, + PreparedStatement updateStatement, int statementsBeforeExecute) { + this.connection = connection; + this.insertStatement = insertStatement; + this.updateStatement = updateStatement; + this.statementsBeforeExecute = statementsBeforeExecute; + } + + /** {@inheritDoc} */ + public void close(Reporter reporter) throws IOException { + executeBatch(); + + try { + if (insertStatement != null) { + insertStatement.close(); + } + + if (updateStatement != null) { + updateStatement.close(); + } + + connection.commit(); + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to commit batch", 0, exception); + } finally { + try { + connection.close(); + } catch (SQLException exception) { + throw new IOException("unable to close connection", exception); + } + } + } + + private void executeBatch() throws IOException { + try { + if (insertStatementsCurrent != 0) { + LOG.info("executing insert batch " + + createBatchMessage(insertStatementsCurrent)); + + insertStatement.executeBatch(); + } + + insertStatementsCurrent = 0; + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to execute insert batch", + insertStatementsCurrent, exception); + } + + try { + if (updateStatementsCurrent != 0) { + LOG.info("executing update batch " + + createBatchMessage(updateStatementsCurrent)); + + int[] result = updateStatement.executeBatch(); + + int count = 0; + + for (int value : result) { + count += value; + } + + if (count != updateStatementsCurrent) { + throw new IOException( + "update did not update same number of statements executed in batch, batch: " + + updateStatementsCurrent + " updated: " + count); + } + } + + updateStatementsCurrent = 0; + } catch (SQLException exception) { + + String message = exception.getMessage(); + if (message.indexOf("Duplicate Key") >= 0) { + LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); + } else { + rollBack(); + + createThrowMessage("unable to execute update batch", + updateStatementsCurrent, exception); + } + } + } + + private void rollBack() { + try { + connection.rollback(); + } catch (SQLException sqlException) { + LOG.warn(StringUtils.stringifyException(sqlException)); + } + } + + private String createBatchMessage(long currentStatements) { + return String.format("[totstmts: %d][crntstmts: %d][batch: %d]", + statementsAdded, currentStatements, statementsBeforeExecute); + } + + private void createThrowMessage(String stateMessage, + long currentStatements, SQLException exception) throws IOException { + String message = exception.getMessage(); + + // message = message.substring(0, Math.min(75, message.length())); + + int messageLength = exception.getMessage().length(); + String batchMessage = createBatchMessage(currentStatements); + String template = "%s [msglength: %d]%s %s"; + String errorMessage = String.format(template, stateMessage, + messageLength, batchMessage, message); + + LOG.error(errorMessage, exception.getNextException()); + + throw new IOException(errorMessage, exception.getNextException()); + } + + /** {@inheritDoc} */ + public synchronized void write(K key, V value) throws IOException { + try { + if (value == null) { + key.write(insertStatement); + insertStatement.addBatch(); + insertStatementsCurrent++; + } else { + key.write(updateStatement); + updateStatement.addBatch(); + updateStatementsCurrent++; + } + } catch (SQLException exception) { + throw new IOException("unable to add batch statement", exception); + } + + statementsAdded++; + + if (statementsAdded % statementsBeforeExecute == 0) { + executeBatch(); + } + } + } + + /** + * Constructs the query used as the prepared statement to insert data. + * + * @param table + * the table to insert into + * @param fieldNames + * the fields to insert into. If field names are unknown, supply an + * array of nulls. + */ + protected String constructInsertQuery(String table, String[] fieldNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("Field names may not be null"); + } + + StringBuilder query = new StringBuilder(); + + query.append("INSERT INTO ").append(table); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + query.append(" ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + + if (i != fieldNames.length - 1) { + query.append(","); + } + } + + query.append(")"); + + } + + query.append(" VALUES ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append("?"); + + if (i != fieldNames.length - 1) { + query.append(","); + } + } + + query.append(")"); + + boolean test = true; + if (test) { + query.append(" ON DUPLICATE KEY UPDATE "); + + for (int i = 1; i < fieldNames.length; i++) { + + if ((i != 1)) { + query.append(","); + } + // if (i != fieldNames.length - 1) { query.append(","); } + // && (i != fieldNames.length - 1) + query.append(fieldNames[i]); + query.append(" = ?"); + + } + } + + query.append(";"); + + LOG.info(" ===================== " + query.toString()); + return query.toString(); + } + + protected String constructUpdateQuery(String table, String[] fieldNames, + String[] updateNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("field names may not be null"); + } + + Set<String> updateNamesSet = new HashSet<String>(); + Collections.addAll(updateNamesSet, updateNames); + + StringBuilder query = new StringBuilder(); + + query.append("UPDATE ").append(table); + + query.append(" SET "); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + int count = 0; + + for (int i = 0; i < fieldNames.length; i++) { + if (updateNamesSet.contains(fieldNames[i])) { + continue; + } + + if (count != 0) { + query.append(","); + } + + query.append(fieldNames[i]); + query.append(" = ?"); + + count++; + } + } + + query.append(" WHERE "); + + if (updateNames.length > 0 && updateNames[0] != null) { + for (int i = 0; i < updateNames.length; i++) { + query.append(updateNames[i]); + query.append(" = ?"); + + if (i != updateNames.length - 1) { + query.append(" and "); + } + } + } + + query.append(";"); + System.out.println("Update Query => " + query.toString()); + return query.toString(); + } + + /** {@inheritDoc} */ + public void checkOutputSpecs(FileSystem filesystem, JobConf job) + throws IOException { + } + + /** {@inheritDoc} */ + public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, + JobConf job, String name, Progressable progress) throws IOException { + DBConfiguration dbConf = new DBConfiguration(job); + + String tableName = dbConf.getOutputTableName(); + String[] fieldNames = dbConf.getOutputFieldNames(); + String[] updateNames = dbConf.getOutputUpdateFieldNames(); + int batchStatements = dbConf.getBatchStatementsNum(); + + Connection connection = dbConf.getConnection(); + + configureConnection(connection); + + String sqlInsert = constructInsertQuery(tableName, fieldNames); + PreparedStatement insertPreparedStatement; + + try { + insertPreparedStatement = connection.prepareStatement(sqlInsert); + insertPreparedStatement.setEscapeProcessing(true); // should be on by + // default + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlInsert, + exception); + } + + String sqlUpdate = updateNames != null ? constructUpdateQuery(tableName, + fieldNames, updateNames) : null; + PreparedStatement updatePreparedStatement = null; + + try { + updatePreparedStatement = sqlUpdate != null ? connection + .prepareStatement(sqlUpdate) : null; + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlUpdate, + exception); + } + + return new DBRecordWriter(connection, insertPreparedStatement, + updatePreparedStatement, batchStatements); + } + + protected void configureConnection(Connection connection) { + setAutoCommit(connection); + } + + protected void setAutoCommit(Connection connection) { + try { + connection.setAutoCommit(false); + } catch (Exception exception) { + throw new RuntimeException("unable to set auto commit", exception); + } + } + + /** + * Initializes the reduce-part of the job with the appropriate output + * settings + * + * @param job + * The job + * @param dbOutputFormatClass + * @param tableName + * The table to insert data into + * @param fieldNames + * The field names in the table. If unknown, supply the appropriate + */ + public static void setOutput(JobConf job, + Class<? extends DBOutputFormat> dbOutputFormatClass, String tableName, + String[] fieldNames, String[] updateFields, int batchSize) { + if (dbOutputFormatClass == null) { + job.setOutputFormat(DBOutputFormat.class); + } else { + job.setOutputFormat(dbOutputFormatClass); + } + + // writing doesn't always happen in reduce + job.setReduceSpeculativeExecution(false); + job.setMapSpeculativeExecution(false); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setOutputTableName(tableName); + dbConf.setOutputFieldNames(fieldNames); + + if (updateFields != null) { + dbConf.setOutputUpdateFieldNames(updateFields); + } + + if (batchSize != -1) { + dbConf.setBatchStatementsNum(batchSize); + } + } } |