aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-19 23:45:33 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-19 23:45:36 -0800
commit07f9d46ea06ccee867369b759c00c6bfe9b91b13 (patch)
tree9e9623cb92c3c63c0f033d79802d5b95fb0a3454 /python
parentdbcf33944dca294472e7ab42f632d8f64ef1c006 (diff)
downloadfatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.tar.gz
fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.zip
start supporting kafka importers
A nice feature would be some/any log output as to progress.
Diffstat (limited to 'python')
-rw-r--r--python/README_import.md2
-rwxr-xr-xpython/fatcat_import.py20
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/common.py17
4 files changed, 36 insertions, 5 deletions
diff --git a/python/README_import.md b/python/README_import.md
index 6334dbc6..9ee24f8e 100644
--- a/python/README_import.md
+++ b/python/README_import.md
@@ -50,7 +50,7 @@ Usually tens of minutes on fast production machine.
Usually 24 hours or so on fast production machine.
- time xzcat /srv/fatcat/datasets/crossref-works.2018-09-05.json.xz | time parallel -j20 --round-robin --pipe ./fatcat_import.py crossref - /srv/fatcat/datasets/20180216.ISSN-to-ISSN-L.txt /srv/fatcat/datasets/release_ids.ia_munge_20180908.sqlite3
+ time xzcat /srv/fatcat/datasets/crossref-works.2018-09-05.json.xz | time parallel -j20 --round-robin --pipe ./fatcat_import.py crossref - /srv/fatcat/datasets/20180216.ISSN-to-ISSN-L.txt --extid-map-file /srv/fatcat/datasets/release_ids.ia_munge_20180908.sqlite3
## Matched
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index cdf04db1..555d4083 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -3,13 +3,18 @@
import sys
import argparse
from fatcat_tools.importers import CrossrefImporter, OrcidImporter, \
- IssnImporter, MatchedImporter, GrobidMetadataImporter
+ IssnImporter, MatchedImporter, GrobidMetadataImporter, make_kafka_consumer
def run_crossref(args):
fci = CrossrefImporter(args.host_url, args.issn_map_file,
args.extid_map_file, create_containers=(not args.no_create_containers))
- fci.process_batch(args.json_file, size=args.batch_size)
+ if args.kafka_mode:
+ consumer = make_kafka_consumer(
+ args.kafka_hosts, args.kafka_env, "crossref", "fatcat-import")
+ fci.process_batch(consumer, size=args.batch_size, decode_kafka=True)
+ else:
+ fci.process_batch(args.json_file, size=args.batch_size)
fci.describe_run()
def run_orcid(args):
@@ -41,6 +46,12 @@ def main():
parser.add_argument('--host-url',
default="http://localhost:9411/v0",
help="connect to this host/port")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--kafka-env',
+ default="qa",
+ help="Kafka topic namespace to use (eg, prod, qa)")
subparsers = parser.add_subparsers()
sub_crossref = subparsers.add_parser('crossref')
@@ -51,7 +62,7 @@ def main():
sub_crossref.add_argument('issn_map_file',
help="ISSN to ISSN-L mapping file",
default=None, type=argparse.FileType('r'))
- sub_crossref.add_argument('extid_map_file',
+ sub_crossref.add_argument('--extid-map-file',
help="DOI-to-other-identifiers sqlite3 database",
default=None, type=str)
sub_crossref.add_argument('--no-create-containers',
@@ -60,6 +71,9 @@ def main():
sub_crossref.add_argument('--batch-size',
help="size of batch to send",
default=50, type=int)
+ sub_crossref.add_argument('--kafka-mode',
+ action='store_true',
+ help="consume from kafka topic (not stdin)")
sub_orcid = subparsers.add_parser('orcid')
sub_orcid.set_defaults(func=run_orcid)
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 0f5fafb6..e6f081e5 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -1,5 +1,5 @@
-from .common import FatcatImporter
+from .common import FatcatImporter, make_kafka_consumer
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP
from .grobid_metadata import GrobidMetadataImporter
from .issn import IssnImporter
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 18594884..65976a21 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -5,16 +5,33 @@ import csv
import json
import itertools
from collections import Counter
+import pykafka
import fatcat_client
from fatcat_client.rest import ApiException
+
# from: https://docs.python.org/3/library/itertools.html
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
+def make_kafka_consumer(hosts, env, topic_suffix, group):
+ topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8')
+ client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0")
+ consume_topic = client.topics[topic_name]
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group))
+
+ consumer = consume_topic.get_balanced_consumer(
+ consumer_group=group.encode('utf-8'),
+ managed=True,
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30000, # 30 seconds
+ compacted_topic=True,
+ )
+ return consumer
+
class FatcatImporter:
"""
Base class for fatcat importers