diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:33 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:36 -0800 |
commit | 07f9d46ea06ccee867369b759c00c6bfe9b91b13 (patch) | |
tree | 9e9623cb92c3c63c0f033d79802d5b95fb0a3454 /python/fatcat_import.py | |
parent | dbcf33944dca294472e7ab42f632d8f64ef1c006 (diff) | |
download | fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.tar.gz fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.zip |
start supporting kafka importers
A nice feature would be some/any log output as to progress.
Diffstat (limited to 'python/fatcat_import.py')
-rwxr-xr-x | python/fatcat_import.py | 20 |
1 files changed, 17 insertions, 3 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index cdf04db1..555d4083 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -3,13 +3,18 @@ import sys import argparse from fatcat_tools.importers import CrossrefImporter, OrcidImporter, \ - IssnImporter, MatchedImporter, GrobidMetadataImporter + IssnImporter, MatchedImporter, GrobidMetadataImporter, make_kafka_consumer def run_crossref(args): fci = CrossrefImporter(args.host_url, args.issn_map_file, args.extid_map_file, create_containers=(not args.no_create_containers)) - fci.process_batch(args.json_file, size=args.batch_size) + if args.kafka_mode: + consumer = make_kafka_consumer( + args.kafka_hosts, args.kafka_env, "crossref", "fatcat-import") + fci.process_batch(consumer, size=args.batch_size, decode_kafka=True) + else: + fci.process_batch(args.json_file, size=args.batch_size) fci.describe_run() def run_orcid(args): @@ -41,6 +46,12 @@ def main(): parser.add_argument('--host-url', default="http://localhost:9411/v0", help="connect to this host/port") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--kafka-env', + default="qa", + help="Kafka topic namespace to use (eg, prod, qa)") subparsers = parser.add_subparsers() sub_crossref = subparsers.add_parser('crossref') @@ -51,7 +62,7 @@ def main(): sub_crossref.add_argument('issn_map_file', help="ISSN to ISSN-L mapping file", default=None, type=argparse.FileType('r')) - sub_crossref.add_argument('extid_map_file', + sub_crossref.add_argument('--extid-map-file', help="DOI-to-other-identifiers sqlite3 database", default=None, type=str) sub_crossref.add_argument('--no-create-containers', @@ -60,6 +71,9 @@ def main(): sub_crossref.add_argument('--batch-size', help="size of batch to send", default=50, type=int) + sub_crossref.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") sub_orcid = subparsers.add_parser('orcid') sub_orcid.set_defaults(func=run_orcid) |