diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:33 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:45:36 -0800 |
commit | 07f9d46ea06ccee867369b759c00c6bfe9b91b13 (patch) | |
tree | 9e9623cb92c3c63c0f033d79802d5b95fb0a3454 /python/fatcat_tools/importers | |
parent | dbcf33944dca294472e7ab42f632d8f64ef1c006 (diff) | |
download | fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.tar.gz fatcat-07f9d46ea06ccee867369b759c00c6bfe9b91b13.zip |
start supporting kafka importers
A nice feature would be some/any log output as to progress.
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 17 |
2 files changed, 18 insertions, 1 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 0f5fafb6..e6f081e5 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -1,5 +1,5 @@ -from .common import FatcatImporter +from .common import FatcatImporter, make_kafka_consumer from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .issn import IssnImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 18594884..65976a21 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -5,16 +5,33 @@ import csv import json import itertools from collections import Counter +import pykafka import fatcat_client from fatcat_client.rest import ApiException + # from: https://docs.python.org/3/library/itertools.html def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue) +def make_kafka_consumer(hosts, env, topic_suffix, group): + topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') + client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0") + consume_topic = client.topics[topic_name] + print("Consuming from kafka topic {}, group {}".format(topic_name, group)) + + consumer = consume_topic.get_balanced_consumer( + consumer_group=group.encode('utf-8'), + managed=True, + auto_commit_enable=True, + auto_commit_interval_ms=30000, # 30 seconds + compacted_topic=True, + ) + return consumer + class FatcatImporter: """ Base class for fatcat importers |