diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/jdbc/db')
4 files changed, 1202 insertions, 0 deletions
| diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java new file mode 100644 index 0000000..1bb0786 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * A container for configuration property names for jobs with DB input/output. <br> The job can be + * configured using the static methods in this class, {@link DBInputFormat}, and {@link + * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper + * values. + */ +public class DBConfiguration { + +    /** The JDBC Driver class name */ +    public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class"; + +    /** JDBC Database access URL */ +    public static final String URL_PROPERTY = "mapred.jdbc.url"; + +    /** User name to access the database */ +    public static final String USERNAME_PROPERTY = "mapred.jdbc.username"; + +    /** Password to access the database */ +    public static final String PASSWORD_PROPERTY = "mapred.jdbc.password"; + +    /** Input table name */ +    public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name"; + +    /** Field names in the Input table */ +    public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names"; + +    /** WHERE clause in the input SELECT statement */ +    public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions"; + +    /** ORDER BY clause in the input SELECT statement */ +    public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby"; + +    /** Whole input query, exluding LIMIT...OFFSET */ +    public static final String INPUT_QUERY = "mapred.jdbc.input.query"; + +    /** The number of records to LIMIT, useful for testing */ +    public static final String INPUT_LIMIT = "mapred.jdbc.input.limit"; + +    /** Input query to get the count of records */ +    public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + +    /** Class name implementing DBWritable which will hold input tuples */ +    public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; + +    /** Output table name */ +    public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names"; + +    /** Field names in the Output table */ +    public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = +        "mapred.jdbc.output.update.field.names"; + +    /** The number of statements to batch before executing */ +    public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num"; + +    /** The number of splits allowed, becomes max concurrent reads. */ +    public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num"; + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     * @param userName    DB access username +     * @param passwd      DB access passwd +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl, +        String userName, String passwd) { +        job.set(DRIVER_CLASS_PROPERTY, driverClass); +        job.set(URL_PROPERTY, dbUrl); + +        if (userName != null) { job.set(USERNAME_PROPERTY, userName); } + +        if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); } +    } + +    /** +     * Sets the DB access related fields in the Configuration. +     * +     * @param job         the job +     * @param driverClass JDBC Driver class name +     * @param dbUrl       JDBC DB access URL. +     */ +    public static void configureDB(Configuration job, String driverClass, String dbUrl) { +        configureDB(job, driverClass, dbUrl, null, null); +    } + +    private Configuration job; + +    DBConfiguration(Configuration job) { +        this.job = job; +    } + +    /** +     * Returns a connection object to the DB +     * +     * @throws ClassNotFoundException +     * @throws SQLException +     */ +    Connection getConnection() throws IOException { +        try { +            Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); +        } catch (ClassNotFoundException exception) { +            throw new IOException("unable to load conection driver", exception); +        } + +        try { +            if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); +            } else { +                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job +                    .get(DBConfiguration.USERNAME_PROPERTY), job +                    .get(DBConfiguration.PASSWORD_PROPERTY)); +            } +        } catch (SQLException exception) { +            throw new IOException("unable to create connection", exception); +        } +    } + +    String getInputTableName() { +        return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); +    } + +    void setInputTableName(String tableName) { +        job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getInputFieldNames() { +        return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); +    } + +    void setInputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String getInputConditions() { +        return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); +    } + +    void setInputConditions(String conditions) { +        if (conditions != null && conditions.length() > 0) { +            job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); +        } +    } + +    String getInputOrderBy() { +        return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); +    } + +    void setInputOrderBy(String orderby) { +        if (orderby != null && orderby.length() > 0) { +            job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); +        } +    } + +    String getInputQuery() { +        return job.get(DBConfiguration.INPUT_QUERY); +    } + +    void setInputQuery(String query) { +        if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); } +    } + +    long getInputLimit() { +        return job.getLong(DBConfiguration.INPUT_LIMIT, -1); +    } + +    void setInputLimit(long limit) { +        job.setLong(DBConfiguration.INPUT_LIMIT, limit); +    } + +    String getInputCountQuery() { +        return job.get(DBConfiguration.INPUT_COUNT_QUERY); +    } + +    void setInputCountQuery(String query) { +        if (query != null && query.length() > 0) { +            job.set(DBConfiguration.INPUT_COUNT_QUERY, query); +        } +    } + +    Class<?> getInputClass() { +        return job +            .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class); +    } + +    void setInputClass(Class<? extends DBWritable> inputClass) { +        job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); +    } + +    String getOutputTableName() { +        return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); +    } + +    void setOutputTableName(String tableName) { +        job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); +    } + +    String[] getOutputFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); +    } + +    void setOutputFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    String[] getOutputUpdateFieldNames() { +        return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY); +    } + +    void setOutputUpdateFieldNames(String... fieldNames) { +        job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames); +    } + +    int getBatchStatementsNum() { +        return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000); +    } + +    void setBatchStatementsNum(int batchStatementsNum) { +        job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum); +    } + +    int getMaxConcurrentReadsNum() { +        return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0); +    } + +    void setMaxConcurrentReadsNum(int maxConcurrentReads) { +        if (maxConcurrentReads < 0) { +            throw new IllegalArgumentException("maxConcurrentReads must be a positive value"); +        } + +        job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads); +    } + +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java new file mode 100644 index 0000000..115d9bd --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBInputFormat.java @@ -0,0 +1,452 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.*; + +/** + * A InputFormat that reads input data from an SQL table. <p/> DBInputFormat emits LongWritables + * containing the record number as key and DBWritables as value. <p/> The SQL query, and input class + * can be using one of the two setInput methods. + */ +public class DBInputFormat<T extends DBWritable> +    implements InputFormat<LongWritable, T>, JobConfigurable { +    /** Field LOG */ +    private static final Logger LOG = LoggerFactory.getLogger(DBInputFormat.class); + +    /** +     * A RecordReader that reads records from a SQL table. Emits LongWritables containing the record +     * number as key and DBWritables as value. +     */ +    protected class DBRecordReader implements RecordReader<LongWritable, T> { +        private ResultSet results; +        private Statement statement; +        private Class<T> inputClass; +        private JobConf job; +        private DBInputSplit split; +        private long pos = 0; + +        /** +         * @param split The InputSplit to read data for +         * @throws SQLException +         */ +        protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) +            throws SQLException, IOException { +            this.inputClass = inputClass; +            this.split = split; +            this.job = job; + +            statement = +                connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + +            //statement.setFetchSize(Integer.MIN_VALUE); +            String query = getSelectQuery(); +            try { +                LOG.info(query); +                results = statement.executeQuery(query); +                LOG.info("done executing select query"); +            } catch (SQLException exception) { +                LOG.error("unable to execute select query: " + query, exception); +                throw new IOException("unable to execute select query: " + query, exception); +            } +        } + +        /** +         * Returns the query for selecting the records, subclasses can override this for custom +         * behaviour. +         */ +        protected String getSelectQuery() { +            LOG.info("Executing select query"); +            StringBuilder query = new StringBuilder(); + +            if (dbConf.getInputQuery() == null) { +                query.append("SELECT "); + +                for (int i = 0; i < fieldNames.length; i++) { +                    query.append(fieldNames[i]); + +                    if (i != fieldNames.length - 1) +                        query.append(", "); +                } + +                query.append(" FROM ").append(tableName); +                query.append(" AS ").append(tableName); //in hsqldb this is necessary + +                if (conditions != null && conditions.length() > 0) +                    query.append(" WHERE (").append(conditions).append(")"); + +                String orderBy = dbConf.getInputOrderBy(); + +                if (orderBy != null && orderBy.length() > 0) +                    query.append(" ORDER BY ").append(orderBy); + +            } +            else +                query.append(dbConf.getInputQuery()); + +            try { +                // Only add limit and offset if you have multiple chunks +                if(split.getChunks() > 1) { +                    query.append(" LIMIT ").append(split.getLength()); +                    query.append(" OFFSET ").append(split.getStart()); +                } +            } catch (IOException ex) { +                //ignore, will not throw +            } + +            return query.toString(); +        } + +        /** {@inheritDoc} */ +        public void close() throws IOException { +            try { +                connection.commit(); +                results.close(); +                statement.close(); +            } catch (SQLException exception) { +                throw new IOException("unable to commit and close", exception); +            } +        } + +        /** {@inheritDoc} */ +        public LongWritable createKey() { +            return new LongWritable(); +        } + +        /** {@inheritDoc} */ +        public T createValue() { +            return ReflectionUtils.newInstance(inputClass, job); +        } + +        /** {@inheritDoc} */ +        public long getPos() throws IOException { +            return pos; +        } + +        /** {@inheritDoc} */ +        public float getProgress() throws IOException { +            return pos / (float) split.getLength(); +        } + +        /** {@inheritDoc} */ +        public boolean next(LongWritable key, T value) throws IOException { +            try { +                if (!results.next()) +                    return false; + +                // Set the key field value as the output key value +                key.set(pos + split.getStart()); + +                value.readFields(results); + +                pos++; +            } catch (SQLException exception) { +                throw new IOException("unable to get next value", exception); +            } + +            return true; +        } +    } + +    /** A Class that does nothing, implementing DBWritable */ +    public static class NullDBWritable implements DBWritable, Writable { + +        public void readFields(DataInput in) throws IOException { +        } + +        public void readFields(ResultSet arg0) throws SQLException { +        } + +        public void write(DataOutput out) throws IOException { +        } + +        public void write(PreparedStatement arg0) throws SQLException { +        } +    } + +    /** A InputSplit that spans a set of rows */ +    protected static class DBInputSplit implements InputSplit { +        private long end = 0; +        private long start = 0; +        private long chunks = 0; + +        /** Default Constructor */ +        public DBInputSplit() { +        } + +        /** +         * Convenience Constructor +         * +         * @param start the index of the first row to select +         * @param end   the index of the last row to select +         */ +        public DBInputSplit(long start, long end, long chunks) { +            this.start = start; +            this.end = end; +            this.chunks = chunks; +            LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks); +        } + +        /** {@inheritDoc} */ +        public String[] getLocations() throws IOException { +            // TODO Add a layer to enable SQL "sharding" and support locality +            return new String[]{}; +        } + +        /** @return The index of the first row to select */ +        public long getStart() { +            return start; +        } + +        /** @return The index of the last row to select */ +        public long getEnd() { +            return end; +        } + +        /** @return The total row count in this split */ +        public long getLength() throws IOException { +            return end - start; +        } + +        /** @return The total number of chucks accross all splits */ +        public long getChunks() { +            return chunks; +        } + +        /** {@inheritDoc} */ +        public void readFields(DataInput input) throws IOException { +            start = input.readLong(); +            end = input.readLong(); +            chunks = input.readLong(); +        } + +        /** {@inheritDoc} */ +        public void write(DataOutput output) throws IOException { +            output.writeLong(start); +            output.writeLong(end); +            output.writeLong(chunks); +        } +    } + +    protected DBConfiguration dbConf; +    protected Connection connection; + +    protected String tableName; +    protected String[] fieldNames; +    protected String conditions; +    protected long limit; +    protected int maxConcurrentReads; + + +    /** {@inheritDoc} */ +    public void configure(JobConf job) { +        dbConf = new DBConfiguration(job); + +        tableName = dbConf.getInputTableName(); +        fieldNames = dbConf.getInputFieldNames(); +        conditions = dbConf.getInputConditions(); +        limit = dbConf.getInputLimit(); +        maxConcurrentReads = dbConf.getMaxConcurrentReadsNum(); + +        try { +            connection = dbConf.getConnection(); +        } catch (IOException exception) { +            throw new RuntimeException("unable to create connection", exception.getCause()); +        } + +        configureConnection(connection); +    } + +    protected void configureConnection(Connection connection) { +        setTransactionIsolationLevel(connection); +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    protected void setTransactionIsolationLevel(Connection connection) { +        try { +            connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); +        } catch (SQLException exception) { +            throw new RuntimeException("unable to configure transaction isolation level", exception); +        } +    } + +    /** {@inheritDoc} */ +    @SuppressWarnings("unchecked") +    public RecordReader<LongWritable, T> getRecordReader(InputSplit split, JobConf job, +        Reporter reporter) throws IOException { +        Class inputClass = dbConf.getInputClass(); +        try { +            return new DBRecordReader((DBInputSplit) split, inputClass, job); +        } catch (SQLException exception) { +            throw new IOException(exception.getMessage(), exception); +        } +    } + +    /** {@inheritDoc} */ +    public InputSplit[] getSplits(JobConf job, int chunks) throws IOException { +        // use the configured value if avail +        chunks = maxConcurrentReads == 0 ? chunks : maxConcurrentReads; + +        try { +            Statement statement = connection.createStatement(); + +            ResultSet results = statement.executeQuery(getCountQuery()); + +            long count = 0; + +            while (results.next()) +                count += results.getLong(1); + +            if (limit != -1) +                count = Math.min(limit, count); + +            long chunkSize = (count / chunks); + +            results.close(); +            statement.close(); + +            InputSplit[] splits = new InputSplit[chunks]; + +            // Split the rows into n-number of chunks and adjust the last chunk +            // accordingly +            for (int i = 0; i < chunks; i++) { +                DBInputSplit split; + +                if (i + 1 == chunks) +                    split = new DBInputSplit(i * chunkSize, count, chunks); +                else +                    split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks); + +                splits[i] = split; +            } + +            return splits; +        } catch (SQLException e) { +            throw new IOException(e.getMessage()); +        } +    } + +    /** +     * Returns the query for getting the total number of rows, subclasses can override this for +     * custom behaviour. +     */ +    protected String getCountQuery() { + +        if (dbConf.getInputCountQuery() != null) { return dbConf.getInputCountQuery(); } + +        StringBuilder query = new StringBuilder(); + +        query.append("SELECT COUNT(*) FROM " + tableName); + +        if (conditions != null && conditions.length() > 0) +            query.append(" WHERE " + conditions); + +        return query.toString(); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param tableName       The table to read data from +     * @param conditions      The condition which to select data with, eg. '(updated > 20070101 AND +     *                        length > 0)' +     * @param orderBy         the fieldNames in the orderBy clause. +     * @param limit +     * @param fieldNames      The field names in the table +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String tableName, String conditions, String orderBy, long limit, int concurrentReads, +        String... fieldNames) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputTableName(tableName); +        dbConf.setInputFieldNames(fieldNames); +        dbConf.setInputConditions(conditions); +        dbConf.setInputOrderBy(orderBy); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } + +    /** +     * Initializes the map-part of the job with the appropriate input settings. +     * +     * @param job             The job +     * @param inputClass      the class object implementing DBWritable, which is the Java object +     *                        holding tuple fields. +     * @param selectQuery     the input query to select fields. Example : "SELECT f1, f2, f3 FROM +     *                        Mytable ORDER BY f1" +     * @param countQuery      the input query that returns the number of records in the table. +     *                        Example : "SELECT COUNT(f1) FROM Mytable" +     * @param concurrentReads +     */ +    public static void setInput(JobConf job, Class<? extends DBWritable> inputClass, +        String selectQuery, String countQuery, long limit, int concurrentReads) { +        job.setInputFormat(DBInputFormat.class); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setInputClass(inputClass); +        dbConf.setInputQuery(selectQuery); +        dbConf.setInputCountQuery(countQuery); + +        if (limit != -1) +            dbConf.setInputLimit(limit); + +        dbConf.setMaxConcurrentReadsNum(concurrentReads); +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java new file mode 100644 index 0000000..1166970 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBOutputFormat.java @@ -0,0 +1,391 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A OutputFormat that sends the reduce output to a SQL table. <p/> {@link DBOutputFormat} accepts + * <key,value> pairs, where key has a type extending DBWritable. Returned {@link RecordWriter} + * writes <b>only the key</b> to the database with a batch SQL query. + */ +public class DBOutputFormat<K extends DBWritable, V> implements OutputFormat<K, V> { +    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class); + +    /** A RecordWriter that writes the reduce output to a SQL table */ +    protected class DBRecordWriter implements RecordWriter<K, V> { +        private Connection connection; +        private PreparedStatement insertStatement; +        private PreparedStatement updateStatement; +        private final int statementsBeforeExecute; + +        private long statementsAdded = 0; +        private long insertStatementsCurrent = 0; +        private long updateStatementsCurrent = 0; + +        protected DBRecordWriter(Connection connection, PreparedStatement insertStatement, +            PreparedStatement updateStatement, int statementsBeforeExecute) { +            this.connection = connection; +            this.insertStatement = insertStatement; +            this.updateStatement = updateStatement; +            this.statementsBeforeExecute = statementsBeforeExecute; +        } + +        /** {@inheritDoc} */ +        public void close(Reporter reporter) throws IOException { +            executeBatch(); + +            try { +                if (insertStatement != null) { insertStatement.close(); } + +                if (updateStatement != null) { updateStatement.close(); } + +                connection.commit(); +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to commit batch", 0, exception); +            } finally { +                try { +                    connection.close(); +                } catch (SQLException exception) { +                    throw new IOException("unable to close connection", exception); +                } +            } +        } + +        private void executeBatch() throws IOException { +            try { +                if (insertStatementsCurrent != 0) { +                    LOG.info( +                        "executing insert batch " + createBatchMessage(insertStatementsCurrent)); + +                    insertStatement.executeBatch(); +                } + +                insertStatementsCurrent = 0; +            } catch (SQLException exception) { +                rollBack(); + +                createThrowMessage("unable to execute insert batch", insertStatementsCurrent, exception); +            } + +            try { +                if (updateStatementsCurrent != 0) { +                    LOG.info( +                        "executing update batch " + createBatchMessage(updateStatementsCurrent)); + +                    int[] result = updateStatement.executeBatch(); + +                    int count = 0; + +                    for (int value : result) { count += value; } + +                    if (count != updateStatementsCurrent) { +                        throw new IOException( +                            "update did not update same number of statements executed in batch, batch: " +                            + updateStatementsCurrent + " updated: " + count); +                    } +                } + +                updateStatementsCurrent = 0; +            } catch (SQLException exception) { +            	 +            	String message = exception.getMessage(); +            	if (message.indexOf("Duplicate Key") >= 0) { +            		LOG.warn("In exception block. Bypass exception becuase of Insert/Update."); +            	} else { +                    rollBack(); + +                    createThrowMessage("unable to execute update batch", updateStatementsCurrent, exception); +            	} +            } +        } + +        private void rollBack() { +            try { +                connection.rollback(); +            } catch (SQLException sqlException) { +                LOG.warn(StringUtils.stringifyException(sqlException)); +            } +        } + +        private String createBatchMessage(long currentStatements) { +            return String +                .format("[totstmts: %d][crntstmts: %d][batch: %d]", statementsAdded, currentStatements, statementsBeforeExecute); +        } + +        private void createThrowMessage(String stateMessage, long currentStatements, +            SQLException exception) throws IOException { +            String message = exception.getMessage(); + +            message = message.substring(0, Math.min(75, message.length())); + +            int messageLength = exception.getMessage().length(); +            String batchMessage = createBatchMessage(currentStatements); +            String template = "%s [msglength: %d]%s %s"; +            String errorMessage = +                String.format(template, stateMessage, messageLength, batchMessage, message); + +            LOG.error(errorMessage, exception.getNextException()); + +            throw new IOException(errorMessage, exception.getNextException()); +        } + +        /** {@inheritDoc} */ +        public synchronized void write(K key, V value) throws IOException { +            try { +                if (value == null) { +                    key.write(insertStatement); +                    insertStatement.addBatch(); +                    insertStatementsCurrent++; +                } else { +                    key.write(updateStatement); +                    updateStatement.addBatch(); +                    updateStatementsCurrent++; +                } +            } catch (SQLException exception) { +                throw new IOException("unable to add batch statement", exception); +            } + +            statementsAdded++; + +            if (statementsAdded % statementsBeforeExecute == 0) { executeBatch(); } +        } +    } + +    /** +     * Constructs the query used as the prepared statement to insert data. +     * +     * @param table      the table to insert into +     * @param fieldNames the fields to insert into. If field names are unknown, supply an array of +     *                   nulls. +     */ +    protected String constructInsertQuery(String table, String[] fieldNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("Field names may not be null"); +        } + +        StringBuilder query = new StringBuilder(); + +        query.append("INSERT INTO ").append(table); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            query.append(" ("); + +            for (int i = 0; i < fieldNames.length; i++) { +                query.append(fieldNames[i]); + +                if (i != fieldNames.length - 1) { query.append(","); } +            } + +            query.append(")"); + +        } + +        query.append(" VALUES ("); + +        for (int i = 0; i < fieldNames.length; i++) { +            query.append("?"); + +            if (i != fieldNames.length - 1) { query.append(","); } +        } + +        query.append(")"); +         +        boolean test = true; +        if (test) { +        	query.append(" ON DUPLICATE KEY UPDATE "); +        	 +        	 +            for (int i = 1; i < fieldNames.length; i++) { + +                 +                if ( (i != 1) ) { query.append(","); } +                //if (i != fieldNames.length - 1) { query.append(","); } +                //&& (i != fieldNames.length - 1) +                query.append(fieldNames[i]); +                query.append(" = ?"); +                 +                 +            } +        } +         +        query.append(";"); +         +        LOG.info(" ===================== " + query.toString()); +        return query.toString(); +    } + +    protected String constructUpdateQuery(String table, String[] fieldNames, String[] updateNames) { +        if (fieldNames == null) { +            throw new IllegalArgumentException("field names may not be null"); +        } + +        Set<String> updateNamesSet = new HashSet<String>(); +        Collections.addAll(updateNamesSet, updateNames); + +        StringBuilder query = new StringBuilder(); + +        query.append("UPDATE ").append(table); + +        query.append(" SET "); + +        if (fieldNames.length > 0 && fieldNames[0] != null) { +            int count = 0; + +            for (int i = 0; i < fieldNames.length; i++) { +                if (updateNamesSet.contains(fieldNames[i])) { continue; } + +                if (count != 0) { query.append(","); } + +                query.append(fieldNames[i]); +                query.append(" = ?"); + +                count++; +            } +        } + +        query.append(" WHERE "); + +        if (updateNames.length > 0 && updateNames[0] != null) { +            for (int i = 0; i < updateNames.length; i++) { +                query.append(updateNames[i]); +                query.append(" = ?"); + +                if (i != updateNames.length - 1) { query.append(" and "); } +            } +        } + +        query.append(";"); +        System.out.println("Update Query => " + query.toString()); +        return query.toString(); +    } + +    /** {@inheritDoc} */ +    public void checkOutputSpecs(FileSystem filesystem, JobConf job) throws IOException { +    } + +    /** {@inheritDoc} */ +    public RecordWriter<K, V> getRecordWriter(FileSystem filesystem, JobConf job, String name, +        Progressable progress) throws IOException { +        DBConfiguration dbConf = new DBConfiguration(job); + +        String tableName = dbConf.getOutputTableName(); +        String[] fieldNames = dbConf.getOutputFieldNames(); +        String[] updateNames = dbConf.getOutputUpdateFieldNames(); +        int batchStatements = dbConf.getBatchStatementsNum(); + +        Connection connection = dbConf.getConnection(); + +        configureConnection(connection); + +        String sqlInsert = constructInsertQuery(tableName, fieldNames); +        PreparedStatement insertPreparedStatement; + +        try { +            insertPreparedStatement = connection.prepareStatement(sqlInsert); +            insertPreparedStatement.setEscapeProcessing(true); // should be on by default +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlInsert, exception); +        } + +        String sqlUpdate = +            updateNames != null ? constructUpdateQuery(tableName, fieldNames, updateNames) : null; +        PreparedStatement updatePreparedStatement = null; + +        try { +            updatePreparedStatement = +                sqlUpdate != null ? connection.prepareStatement(sqlUpdate) : null; +        } catch (SQLException exception) { +            throw new IOException("unable to create statement for: " + sqlUpdate, exception); +        } + +        return new DBRecordWriter(connection, insertPreparedStatement, updatePreparedStatement, batchStatements); +    } + +    protected void configureConnection(Connection connection) { +        setAutoCommit(connection); +    } + +    protected void setAutoCommit(Connection connection) { +        try { +            connection.setAutoCommit(false); +        } catch (Exception exception) { +            throw new RuntimeException("unable to set auto commit", exception); +        } +    } + +    /** +     * Initializes the reduce-part of the job with the appropriate output settings +     * +     * @param job                 The job +     * @param dbOutputFormatClass +     * @param tableName           The table to insert data into +     * @param fieldNames          The field names in the table. If unknown, supply the appropriate +     */ +    public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass, +        String tableName, String[] fieldNames, String[] updateFields, int batchSize) { +        if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else { +            job.setOutputFormat(dbOutputFormatClass); +        } + +        // writing doesn't always happen in reduce +        job.setReduceSpeculativeExecution(false); +        job.setMapSpeculativeExecution(false); + +        DBConfiguration dbConf = new DBConfiguration(job); + +        dbConf.setOutputTableName(tableName); +        dbConf.setOutputFieldNames(fieldNames); + +        if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); } + +        if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); } +    } +} diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java new file mode 100644 index 0000000..1369c74 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBWritable.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.io.Writable; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Objects that are read from/written to a database should implement <code>DBWritable</code>. + * DBWritable, is similar to {@link Writable} except that the {@link #write(PreparedStatement)} + * method takes a {@link PreparedStatement}, and {@link #readFields(ResultSet)} takes a {@link + * ResultSet}. <p> Implementations are responsible for writing the fields of the object to + * PreparedStatement, and reading the fields of the object from the ResultSet. <p/> <p>Example:</p> + * If we have the following table in the database : + * <pre> + * CREATE TABLE MyTable ( + *   counter        INTEGER NOT NULL, + *   timestamp      BIGINT  NOT NULL, + * ); + * </pre> + * then we can read/write the tuples from/to the table with : + * <p><pre> + * public class MyWritable implements Writable, DBWritable { + *   // Some data + *   private int counter; + *   private long timestamp; + * <p/> + *   //Writable#write() implementation + *   public void write(DataOutput out) throws IOException { + *     out.writeInt(counter); + *     out.writeLong(timestamp); + *   } + * <p/> + *   //Writable#readFields() implementation + *   public void readFields(DataInput in) throws IOException { + *     counter = in.readInt(); + *     timestamp = in.readLong(); + *   } + * <p/> + *   public void write(PreparedStatement statement) throws SQLException { + *     statement.setInt(1, counter); + *     statement.setLong(2, timestamp); + *   } + * <p/> + *   public void readFields(ResultSet resultSet) throws SQLException { + *     counter = resultSet.getInt(1); + *     timestamp = resultSet.getLong(2); + *   } + * } + * </pre></p> + */ +public interface DBWritable { + +    /** +     * Sets the fields of the object in the {@link PreparedStatement}. +     * +     * @param statement the statement that the fields are put into. +     * @throws SQLException +     */ +    public void write(PreparedStatement statement) throws SQLException; + +    /** +     * Reads the fields of the object from the {@link ResultSet}. +     * +     * @param resultSet the {@link ResultSet} to get the fields from. +     * @throws SQLException +     */ +    public void readFields(ResultSet resultSet) throws SQLException; + +} | 
