From 3e4a74de169104679f2459947c27d5cfb8cc430b Mon Sep 17 00:00:00 2001 From: Saad Rashid Date: Mon, 24 Jun 2013 17:13:19 +0100 Subject: Added JDBCTap support. --- .../spyglass/jdbc/db/DBOutputFormat.java | 391 +++++++++++++++++++++ 1 file changed, 391 insertions(+) create mode 100644 src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java (limited to 'src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java') diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java new file mode 100644 index 0000000..1166970 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A OutputFormat that sends the reduce output to a SQL table.

{@link DBOutputFormat} accepts + * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} + * writes only the key to the database with a batch SQL query. + */ +public class DBOutputFormat implements OutputFormat { + private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + + /** A RecordWriter that writes the reduce output to a SQL table */ + protected class DBRecordWriter implements RecordWriter { + private Connection connection; + private PreparedStatement insertStatement; + private PreparedStatement updateStatement; + private final int statementsBeforeExecute; + + private long statementsAdded = 0; + private long insertStatementsCurrent = 0; + private long updateStatementsCurrent = 0; + + protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, + PreparedStatement updateStatement, int statementsBeforeExecute) { + this.connection = connection; + this.insertStatement = insertStatement; + this.updateStatement = updateStatement; + this.statementsBeforeExecute = statementsBeforeExecute; + } + + /** {@inheritDoc} */ + public void close(Reporter reporter) throws IOException { + executeBatch(); + + try { + if (insertStatement != null) { insertStatement.close(); } + + if (updateStatement != null) { updateStatement.close(); } + + connection.commit(); + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to commit batch", 0, exception); + } finally { + try { + connection.close(); + } catch (SQLException exception) { + throw new IOException("unable to close connection", exception); + } + } + } + + private void executeBatch() throws IOException { + try { + if (insertStatementsCurrent != 0) { + LOG.info( + "executing insert batch " + createBatchMessage(insertStatementsCurrent)); + + insertStatement.executeBatch(); + } + + insertStatementsCurrent = 0; + } catch (SQLException exception) { + rollBack(); + + createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); + } + + try { + if (updateStatementsCurrent != 0) { + LOG.info( + "executing update batch " + createBatchMessage(updateStatementsCurrent)); + + int[] result = updateStatement.executeBatch(); + + int count = 0; + + for (int value : result) { count += value; } + + if (count != updateStatementsCurrent) { + throw new IOException( + "update did not update same number of statements executed in batch, batch: " + + updateStatementsCurrent + " updated: " + count); + } + } + + updateStatementsCurrent = 0; + } catch (SQLException exception) { + + String message = exception.getMessage(); + if (message.indexOf("Duplicate Key") >= 0) { + LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); + } else { + rollBack(); + + createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); + } + } + } + + private void rollBack() { + try { + connection.rollback(); + } catch (SQLException sqlException) { + LOG.warn(StringUtils.stringifyException(sqlException)); + } + } + + private String createBatchMessage(long currentStatements) { + return String + .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); + } + + private void createThrowMessage(String stateMessage, long currentStatements, + SQLException exception) throws IOException { + String message = exception.getMessage(); + + message = message.substring(0, Math.min(75, message.length())); + + int messageLength = exception.getMessage().length(); + String batchMessage = createBatchMessage(currentStatements); + String template = "%s [msglength: %d]%s %s"; + String errorMessage = + String.format(template, stateMessage, messageLength, batchMessage, message); + + LOG.error(errorMessage, exception.getNextException()); + + throw new IOException(errorMessage, exception.getNextException()); + } + + /** {@inheritDoc} */ + public synchronized void write(K key, V value) throws IOException { + try { + if (value == null) { + key.write(insertStatement); + insertStatement.addBatch(); + insertStatementsCurrent++; + } else { + key.write(updateStatement); + updateStatement.addBatch(); + updateStatementsCurrent++; + } + } catch (SQLException exception) { + throw new IOException("unable to add batch statement", exception); + } + + statementsAdded++; + + if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } + } + } + + /** + * Constructs the query used as the prepared statement to insert data. + * + * @param table the table to insert into + * @param fieldNames the fields to insert into. If field names are unknown, supply an array of + * nulls. + */ + protected String constructInsertQuery(String table, String[] fieldNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("Field names may not be null"); + } + + StringBuilder query = new StringBuilder(); + + query.append("INSERT INTO ").append(table); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + query.append(" ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append(fieldNames[i]); + + if (i != fieldNames.length - 1) { query.append(","); } + } + + query.append(")"); + + } + + query.append(" VALUES ("); + + for (int i = 0; i < fieldNames.length; i++) { + query.append("?"); + + if (i != fieldNames.length - 1) { query.append(","); } + } + + query.append(")"); + + boolean test = true; + if (test) { + query.append(" ON DUPLICATE KEY UPDATE "); + + + for (int i = 1; i < fieldNames.length; i++) { + + + if ( (i != 1) ) { query.append(","); } + //if (i != fieldNames.length - 1) { query.append(","); } + //&& (i != fieldNames.length - 1) + query.append(fieldNames[i]); + query.append(" = ?"); + + + } + } + + query.append(";"); + + LOG.info(" ===================== " + query.toString()); + return query.toString(); + } + + protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { + if (fieldNames == null) { + throw new IllegalArgumentException("field names may not be null"); + } + + Set updateNamesSet = new HashSet(); + Collections.addAll(updateNamesSet, updateNames); + + StringBuilder query = new StringBuilder(); + + query.append("UPDATE ").append(table); + + query.append(" SET "); + + if (fieldNames.length > 0 && fieldNames[0] != null) { + int count = 0; + + for (int i = 0; i < fieldNames.length; i++) { + if (updateNamesSet.contains(fieldNames[i])) { continue; } + + if (count != 0) { query.append(","); } + + query.append(fieldNames[i]); + query.append(" = ?"); + + count++; + } + } + + query.append(" WHERE "); + + if (updateNames.length > 0 && updateNames[0] != null) { + for (int i = 0; i < updateNames.length; i++) { + query.append(updateNames[i]); + query.append(" = ?"); + + if (i != updateNames.length - 1) { query.append(" and "); } + } + } + + query.append(";"); + System.out.println("Update Query => " + query.toString()); + return query.toString(); + } + + /** {@inheritDoc} */ + public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { + } + + /** {@inheritDoc} */ + public RecordWriter getRecordWriter(FileSystem filesystem, JobConf job, String name, + Progressable progress) throws IOException { + DBConfiguration dbConf = new DBConfiguration(job); + + String tableName = dbConf.getOutputTableName(); + String[] fieldNames = dbConf.getOutputFieldNames(); + String[] updateNames = dbConf.getOutputUpdateFieldNames(); + int batchStatements = dbConf.getBatchStatementsNum(); + + Connection connection = dbConf.getConnection(); + + configureConnection(connection); + + String sqlInsert = constructInsertQuery(tableName, fieldNames); + PreparedStatement insertPreparedStatement; + + try { + insertPreparedStatement = connection.prepareStatement(sqlInsert); + insertPreparedStatement.setEscapeProcessing(true); // should be on by default + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlInsert, exception); + } + + String sqlUpdate = + updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; + PreparedStatement updatePreparedStatement = null; + + try { + updatePreparedStatement = + sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; + } catch (SQLException exception) { + throw new IOException("unable to create statement for: " + sqlUpdate, exception); + } + + return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); + } + + protected void configureConnection(Connection connection) { + setAutoCommit(connection); + } + + protected void setAutoCommit(Connection connection) { + try { + connection.setAutoCommit(false); + } catch (Exception exception) { + throw new RuntimeException("unable to set auto commit", exception); + } + } + + /** + * Initializes the reduce-part of the job with the appropriate output settings + * + * @param job The job + * @param dbOutputFormatClass + * @param tableName The table to insert data into + * @param fieldNames The field names in the table. If unknown, supply the appropriate + */ + public static void setOutput(JobConf job, Class dbOutputFormatClass, + String tableName, String[] fieldNames, String[] updateFields, int batchSize) { + if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { + job.setOutputFormat(dbOutputFormatClass); + } + + // writing doesn't always happen in reduce + job.setReduceSpeculativeExecution(false); + job.setMapSpeculativeExecution(false); + + DBConfiguration dbConf = new DBConfiguration(job); + + dbConf.setOutputTableName(tableName); + dbConf.setOutputFieldNames(fieldNames); + + if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } + + if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } + } +} -- cgit v1.2.3