diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:33 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:36 -0800 | 
| commit | 07f9d46ea06ccee867369b759c00c6bfe9b91b13 (patch) | |
| tree | 9e9623cb92c3c63c0f033d79802d5b95fb0a3454 | |
| parent | dbcf33944dca294472e7ab42f632d8f64ef1c006 (diff) | |
| download | fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.tar.gz fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.zip | |
start supporting kafka importers
A nice feature would be some/any log output as to progress.
| -rw-r--r-- | python/README_import.md | 2 | ||||
| -rwxr-xr-x | python/fatcat_import.py | 20 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 17 | 
4 files changed, 36 insertions, 5 deletions
| diff --git a/python/README_import.md b/python/README_import.md index 6334dbc6..9ee24f8e 100644 --- a/python/README_import.md +++ b/python/README_import.md @@ -50,7 +50,7 @@ Usually tens of minutes on fast production machine.  Usually 24 hours or so on fast production machine. -    time xzcat /srv/fatcat/datasets/crossref-works.2018-09-05.json.xz | time parallel -j20 --round-robin --pipe ./fatcat_import.py crossref - /srv/fatcat/datasets/20180216.ISSN-to-ISSN-L.txt /srv/fatcat/datasets/release_ids.ia_munge_20180908.sqlite3 +    time xzcat /srv/fatcat/datasets/crossref-works.2018-09-05.json.xz | time parallel -j20 --round-robin --pipe ./fatcat_import.py crossref - /srv/fatcat/datasets/20180216.ISSN-to-ISSN-L.txt --extid-map-file /srv/fatcat/datasets/release_ids.ia_munge_20180908.sqlite3  ## Matched diff --git a/python/fatcat_import.py b/python/fatcat_import.py index cdf04db1..555d4083 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -3,13 +3,18 @@  import sys  import argparse  from fatcat_tools.importers import CrossrefImporter, OrcidImporter, \ -    IssnImporter, MatchedImporter, GrobidMetadataImporter +    IssnImporter, MatchedImporter, GrobidMetadataImporter, make_kafka_consumer  def run_crossref(args):      fci = CrossrefImporter(args.host_url, args.issn_map_file,          args.extid_map_file, create_containers=(not args.no_create_containers)) -    fci.process_batch(args.json_file, size=args.batch_size) +    if args.kafka_mode: +        consumer = make_kafka_consumer( +            args.kafka_hosts, args.kafka_env, "crossref", "fatcat-import") +        fci.process_batch(consumer, size=args.batch_size, decode_kafka=True) +    else: +        fci.process_batch(args.json_file, size=args.batch_size)      fci.describe_run()  def run_orcid(args): @@ -41,6 +46,12 @@ def main():      parser.add_argument('--host-url',          default="http://localhost:9411/v0",          help="connect to this host/port") +    parser.add_argument('--kafka-hosts', +        default="localhost:9092", +        help="list of Kafka brokers (host/port) to use") +    parser.add_argument('--kafka-env', +        default="qa", +        help="Kafka topic namespace to use (eg, prod, qa)")      subparsers = parser.add_subparsers()      sub_crossref = subparsers.add_parser('crossref') @@ -51,7 +62,7 @@ def main():      sub_crossref.add_argument('issn_map_file',          help="ISSN to ISSN-L mapping file",          default=None, type=argparse.FileType('r')) -    sub_crossref.add_argument('extid_map_file', +    sub_crossref.add_argument('--extid-map-file',          help="DOI-to-other-identifiers sqlite3 database",          default=None, type=str)      sub_crossref.add_argument('--no-create-containers', @@ -60,6 +71,9 @@ def main():      sub_crossref.add_argument('--batch-size',          help="size of batch to send",          default=50, type=int) +    sub_crossref.add_argument('--kafka-mode', +        action='store_true', +        help="consume from kafka topic (not stdin)")      sub_orcid = subparsers.add_parser('orcid')      sub_orcid.set_defaults(func=run_orcid) diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 0f5fafb6..e6f081e5 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -1,5 +1,5 @@ -from .common import FatcatImporter +from .common import FatcatImporter, make_kafka_consumer  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP  from .grobid_metadata import GrobidMetadataImporter  from .issn import IssnImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 18594884..65976a21 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -5,16 +5,33 @@ import csv  import json  import itertools  from collections import Counter +import pykafka  import fatcat_client  from fatcat_client.rest import ApiException +  # from: https://docs.python.org/3/library/itertools.html  def grouper(iterable, n, fillvalue=None):      "Collect data into fixed-length chunks or blocks"      args = [iter(iterable)] * n      return itertools.zip_longest(*args, fillvalue=fillvalue) +def make_kafka_consumer(hosts, env, topic_suffix, group): +    topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') +    client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0") +    consume_topic = client.topics[topic_name] +    print("Consuming from kafka topic {}, group {}".format(topic_name, group)) + +    consumer = consume_topic.get_balanced_consumer( +        consumer_group=group.encode('utf-8'), +        managed=True, +        auto_commit_enable=True, +        auto_commit_interval_ms=30000, # 30 seconds +        compacted_topic=True, +    ) +    return consumer +  class FatcatImporter:      """      Base class for fatcat importers | 
