diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java new file mode 100644 index 0000000..1bb0786 --- /dev/null +++ b/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.jdbc.db; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * A container for configuration property names for jobs with DB input/output. <br> The job can be + * configured using the static methods in this class, {@link DBInputFormat}, and {@link + * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper + * values. + */ +public class DBConfiguration { + + /** The JDBC Driver class name */ + public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class"; + + /** JDBC Database access URL */ + public static final String URL_PROPERTY = "mapred.jdbc.url"; + + /** User name to access the database */ + public static final String USERNAME_PROPERTY = "mapred.jdbc.username"; + + /** Password to access the database */ + public static final String PASSWORD_PROPERTY = "mapred.jdbc.password"; + + /** Input table name */ + public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name"; + + /** Field names in the Input table */ + public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names"; + + /** WHERE clause in the input SELECT statement */ + public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions"; + + /** ORDER BY clause in the input SELECT statement */ + public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby"; + + /** Whole input query, exluding LIMIT...OFFSET */ + public static final String INPUT_QUERY = "mapred.jdbc.input.query"; + + /** The number of records to LIMIT, useful for testing */ + public static final String INPUT_LIMIT = "mapred.jdbc.input.limit"; + + /** Input query to get the count of records */ + public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query"; + + /** Class name implementing DBWritable which will hold input tuples */ + public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class"; + + /** Output table name */ + public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name"; + + /** Field names in the Output table */ + public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names"; + + /** Field names in the Output table */ + public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY = + "mapred.jdbc.output.update.field.names"; + + /** The number of statements to batch before executing */ + public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num"; + + /** The number of splits allowed, becomes max concurrent reads. */ + public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num"; + + /** + * Sets the DB access related fields in the Configuration. + * + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL. + * @param userName DB access username + * @param passwd DB access passwd + */ + public static void configureDB(Configuration job, String driverClass, String dbUrl, + String userName, String passwd) { + job.set(DRIVER_CLASS_PROPERTY, driverClass); + job.set(URL_PROPERTY, dbUrl); + + if (userName != null) { job.set(USERNAME_PROPERTY, userName); } + + if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); } + } + + /** + * Sets the DB access related fields in the Configuration. + * + * @param job the job + * @param driverClass JDBC Driver class name + * @param dbUrl JDBC DB access URL. + */ + public static void configureDB(Configuration job, String driverClass, String dbUrl) { + configureDB(job, driverClass, dbUrl, null, null); + } + + private Configuration job; + + DBConfiguration(Configuration job) { + this.job = job; + } + + /** + * Returns a connection object to the DB + * + * @throws ClassNotFoundException + * @throws SQLException + */ + Connection getConnection() throws IOException { + try { + Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY)); + } catch (ClassNotFoundException exception) { + throw new IOException("unable to load conection driver", exception); + } + + try { + if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { + return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); + } else { + return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job + .get(DBConfiguration.USERNAME_PROPERTY), job + .get(DBConfiguration.PASSWORD_PROPERTY)); + } + } catch (SQLException exception) { + throw new IOException("unable to create connection", exception); + } + } + + String getInputTableName() { + return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY); + } + + void setInputTableName(String tableName) { + job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName); + } + + String[] getInputFieldNames() { + return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY); + } + + void setInputFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames); + } + + String getInputConditions() { + return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY); + } + + void setInputConditions(String conditions) { + if (conditions != null && conditions.length() > 0) { + job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions); + } + } + + String getInputOrderBy() { + return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY); + } + + void setInputOrderBy(String orderby) { + if (orderby != null && orderby.length() > 0) { + job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby); + } + } + + String getInputQuery() { + return job.get(DBConfiguration.INPUT_QUERY); + } + + void setInputQuery(String query) { + if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); } + } + + long getInputLimit() { + return job.getLong(DBConfiguration.INPUT_LIMIT, -1); + } + + void setInputLimit(long limit) { + job.setLong(DBConfiguration.INPUT_LIMIT, limit); + } + + String getInputCountQuery() { + return job.get(DBConfiguration.INPUT_COUNT_QUERY); + } + + void setInputCountQuery(String query) { + if (query != null && query.length() > 0) { + job.set(DBConfiguration.INPUT_COUNT_QUERY, query); + } + } + + Class<?> getInputClass() { + return job + .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class); + } + + void setInputClass(Class<? extends DBWritable> inputClass) { + job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class); + } + + String getOutputTableName() { + return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY); + } + + void setOutputTableName(String tableName) { + job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); + } + + String[] getOutputFieldNames() { + return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY); + } + + void setOutputFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames); + } + + String[] getOutputUpdateFieldNames() { + return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY); + } + + void setOutputUpdateFieldNames(String... fieldNames) { + job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames); + } + + int getBatchStatementsNum() { + return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000); + } + + void setBatchStatementsNum(int batchStatementsNum) { + job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum); + } + + int getMaxConcurrentReadsNum() { + return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0); + } + + void setMaxConcurrentReadsNum(int maxConcurrentReads) { + if (maxConcurrentReads < 0) { + throw new IllegalArgumentException("maxConcurrentReads must be a positive value"); + } + + job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads); + } + +} |