From c63b663e2fcfb0c4544653f7f30f7a548103ef2b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 12 Apr 2019 22:09:28 -0700 Subject: add SqlitePusher importer option --- python/fatcat_tools/importers/__init__.py | 2 +- python/fatcat_tools/importers/common.py | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 2112785b..94802915 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, KafkaJsonPusher, make_kafka_consumer, clean +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, KafkaJsonPusher, make_kafka_consumer, clean from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .journal_metadata import JournalMetadataImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b89c3828..23cf2cc7 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -4,6 +4,7 @@ import sys import csv import json import ftfy +import sqlite3 import itertools import subprocess from collections import Counter @@ -419,6 +420,25 @@ class LinePusher(RecordPusher): return counts +class SqlitePusher(RecordPusher): + + def __init__(self, importer, db_file, table_name, where_clause="", **kwargs): + self.importer = importer + self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') + self.db.row_factory = sqlite3.Row + self.table_name = table_name + self.where_clause = where_clause + + def run(self): + cur = self.db.execute("SELECT * FROM {} {};".format( + self.table_name, self.where_clause)) + for row in cur: + self.importer.push_record(row) + counts = self.importer.finish() + print(counts) + return counts + + class KafkaJsonPusher(RecordPusher): def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): -- cgit v1.2.3