aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/jdbc/db/DBConfiguration.java
blob: 1bb0786099b26cd29f239d8c557c5463b76cb9ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
 * Copyright (c) 2009 Concurrent, Inc.
 *
 * This work has been released into the public domain
 * by the copyright holder. This applies worldwide.
 *
 * In case this is not legally possible:
 * The copyright holder grants any entity the right
 * to use this work for any purpose, without any
 * conditions, unless such conditions are required by law.
 */
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package parallelai.spyglass.jdbc.db;

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/**
 * A container for configuration property names for jobs with DB input/output. <br> The job can be
 * configured using the static methods in this class, {@link DBInputFormat}, and {@link
 * DBOutputFormat}. <p/> Alternatively, the properties can be set in the configuration with proper
 * values.
 */
public class DBConfiguration {

    /** The JDBC Driver class name */
    public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";

    /** JDBC Database access URL */
    public static final String URL_PROPERTY = "mapred.jdbc.url";

    /** User name to access the database */
    public static final String USERNAME_PROPERTY = "mapred.jdbc.username";

    /** Password to access the database */
    public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";

    /** Input table name */
    public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";

    /** Field names in the Input table */
    public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";

    /** WHERE clause in the input SELECT statement */
    public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";

    /** ORDER BY clause in the input SELECT statement */
    public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";

    /** Whole input query, exluding LIMIT...OFFSET */
    public static final String INPUT_QUERY = "mapred.jdbc.input.query";

    /** The number of records to LIMIT, useful for testing */
    public static final String INPUT_LIMIT = "mapred.jdbc.input.limit";

    /** Input query to get the count of records */
    public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";

    /** Class name implementing DBWritable which will hold input tuples */
    public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";

    /** Output table name */
    public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";

    /** Field names in the Output table */
    public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";

    /** Field names in the Output table */
    public static final String OUTPUT_UPDATE_FIELD_NAMES_PROPERTY =
        "mapred.jdbc.output.update.field.names";

    /** The number of statements to batch before executing */
    public static final String BATCH_STATEMENTS_PROPERTY = "mapred.jdbc.batch.statements.num";

    /** The number of splits allowed, becomes max concurrent reads. */
    public static final String CONCURRENT_READS_PROPERTY = "mapred.jdbc.concurrent.reads.num";

    /**
     * Sets the DB access related fields in the Configuration.
     *
     * @param job         the job
     * @param driverClass JDBC Driver class name
     * @param dbUrl       JDBC DB access URL.
     * @param userName    DB access username
     * @param passwd      DB access passwd
     */
    public static void configureDB(Configuration job, String driverClass, String dbUrl,
        String userName, String passwd) {
        job.set(DRIVER_CLASS_PROPERTY, driverClass);
        job.set(URL_PROPERTY, dbUrl);

        if (userName != null) { job.set(USERNAME_PROPERTY, userName); }

        if (passwd != null) { job.set(PASSWORD_PROPERTY, passwd); }
    }

    /**
     * Sets the DB access related fields in the Configuration.
     *
     * @param job         the job
     * @param driverClass JDBC Driver class name
     * @param dbUrl       JDBC DB access URL.
     */
    public static void configureDB(Configuration job, String driverClass, String dbUrl) {
        configureDB(job, driverClass, dbUrl, null, null);
    }

    private Configuration job;

    DBConfiguration(Configuration job) {
        this.job = job;
    }

    /**
     * Returns a connection object to the DB
     *
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    Connection getConnection() throws IOException {
        try {
            Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
        } catch (ClassNotFoundException exception) {
            throw new IOException("unable to load conection driver", exception);
        }

        try {
            if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
            } else {
                return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job
                    .get(DBConfiguration.USERNAME_PROPERTY), job
                    .get(DBConfiguration.PASSWORD_PROPERTY));
            }
        } catch (SQLException exception) {
            throw new IOException("unable to create connection", exception);
        }
    }

    String getInputTableName() {
        return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
    }

    void setInputTableName(String tableName) {
        job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
    }

    String[] getInputFieldNames() {
        return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
    }

    void setInputFieldNames(String... fieldNames) {
        job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
    }

    String getInputConditions() {
        return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
    }

    void setInputConditions(String conditions) {
        if (conditions != null && conditions.length() > 0) {
            job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
        }
    }

    String getInputOrderBy() {
        return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
    }

    void setInputOrderBy(String orderby) {
        if (orderby != null && orderby.length() > 0) {
            job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
        }
    }

    String getInputQuery() {
        return job.get(DBConfiguration.INPUT_QUERY);
    }

    void setInputQuery(String query) {
        if (query != null && query.length() > 0) { job.set(DBConfiguration.INPUT_QUERY, query); }
    }

    long getInputLimit() {
        return job.getLong(DBConfiguration.INPUT_LIMIT, -1);
    }

    void setInputLimit(long limit) {
        job.setLong(DBConfiguration.INPUT_LIMIT, limit);
    }

    String getInputCountQuery() {
        return job.get(DBConfiguration.INPUT_COUNT_QUERY);
    }

    void setInputCountQuery(String query) {
        if (query != null && query.length() > 0) {
            job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
        }
    }

    Class<?> getInputClass() {
        return job
            .getClass(DBConfiguration.INPUT_CLASS_PROPERTY, DBInputFormat.NullDBWritable.class);
    }

    void setInputClass(Class<? extends DBWritable> inputClass) {
        job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
    }

    String getOutputTableName() {
        return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
    }

    void setOutputTableName(String tableName) {
        job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
    }

    String[] getOutputFieldNames() {
        return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
    }

    void setOutputFieldNames(String... fieldNames) {
        job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
    }

    String[] getOutputUpdateFieldNames() {
        return job.getStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY);
    }

    void setOutputUpdateFieldNames(String... fieldNames) {
        job.setStrings(DBConfiguration.OUTPUT_UPDATE_FIELD_NAMES_PROPERTY, fieldNames);
    }

    int getBatchStatementsNum() {
        return job.getInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, 1000);
    }

    void setBatchStatementsNum(int batchStatementsNum) {
        job.setInt(DBConfiguration.BATCH_STATEMENTS_PROPERTY, batchStatementsNum);
    }

    int getMaxConcurrentReadsNum() {
        return job.getInt(DBConfiguration.CONCURRENT_READS_PROPERTY, 0);
    }

    void setMaxConcurrentReadsNum(int maxConcurrentReads) {
        if (maxConcurrentReads < 0) {
            throw new IllegalArgumentException("maxConcurrentReads must be a positive value");
        }

        job.setInt(DBConfiguration.CONCURRENT_READS_PROPERTY, maxConcurrentReads);
    }

}