import re import sys import csv import json import sqlite3 import datetime import subprocess from collections import Counter from typing import Optional, Tuple import lxml import xml.etree.ElementTree as ET import elasticsearch from bs4 import BeautifulSoup from confluent_kafka import Consumer, KafkaException import fatcat_openapi_client from fatcat_openapi_client import ReleaseEntity from fatcat_openapi_client.rest import ApiException from fuzzycat.matching import match_release_fuzzy import fuzzycat.common import fuzzycat.verify # TODO: refactor so remove need for this (re-imports for backwards compatibility) from fatcat_tools.normal import (clean_str as clean, is_cjk, b32_hex, LANG_MAP_MARC) # noqa: F401 from fatcat_tools.transforms import entity_to_dict DATE_FMT = "%Y-%m-%d" SANE_MAX_RELEASES = 200 SANE_MAX_URLS = 100 DOMAIN_REL_MAP = { "archive.org": "archive", # LOCKSS, Portico, DuraSpace, etc would also be "archive" "arxiv.org": "repository", "babel.hathitrust.org": "repository", "cds.cern.ch": "repository", "deepblue.lib.umich.edu": "repository", "europepmc.org": "repository", "hal.inria.fr": "repository", "scielo.isciii.es": "repository", "www.dtic.mil": "repository", "www.jstage.jst.go.jp": "repository", "www.jstor.org": "repository", "www.ncbi.nlm.nih.gov": "repository", "ftp.ncbi.nlm.nih.gov": "repository", "www.scielo.br": "repository", "www.scielo.cl": "repository", "www.scielo.org.mx": "repository", "zenodo.org": "repository", "www.biorxiv.org": "repository", "www.medrxiv.org": "repository", "citeseerx.ist.psu.edu": "aggregator", "publisher-connector.core.ac.uk": "aggregator", "core.ac.uk": "aggregator", "static.aminer.org": "aggregator", "aminer.org": "aggregator", "pdfs.semanticscholar.org": "aggregator", "semanticscholar.org": "aggregator", "www.semanticscholar.org": "aggregator", "academic.oup.com": "publisher", "cdn.elifesciences.org": "publisher", "cell.com": "publisher", "dl.acm.org": "publisher", "downloads.hindawi.com": "publisher", "elifesciences.org": "publisher", "iopscience.iop.org": "publisher", "journals.plos.org": "publisher", "link.springer.com": "publisher", "onlinelibrary.wiley.com": "publisher", "works.bepress.com": "publisher", "www.biomedcentral.com": "publisher", "www.cell.com": "publisher", "www.nature.com": "publisher", "www.pnas.org": "publisher", "www.tandfonline.com": "publisher", "www.frontiersin.org": "publisher", "www.degruyter.com": "publisher", "www.mdpi.com": "publisher", "www.ahajournals.org": "publisher", "ehp.niehs.nih.gov": "publisher", "journals.tsu.ru": "publisher", "www.cogentoa.com": "publisher", "www.researchgate.net": "academicsocial", "academia.edu": "academicsocial", "wayback.archive-it.org": "webarchive", "web.archive.org": "webarchive", "archive.is": "webarchive", } def make_rel_url(raw_url, default_link_rel="web"): # this is where we map specific domains to rel types, and also filter out # bad domains, invalid URLs, etc rel = default_link_rel for domain, domain_rel in DOMAIN_REL_MAP.items(): if "//{}/".format(domain) in raw_url: rel = domain_rel break return (rel, raw_url) def test_make_rel_url(): assert make_rel_url("http://example.com/thing.pdf")[0] == "web" assert make_rel_url("http://example.com/thing.pdf", default_link_rel="jeans")[0] == "jeans" assert make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] == "webarchive" assert make_rel_url("http://cell.com/thing.pdf")[0] == "publisher" class EntityImporter: """ Base class for fatcat entity importers. The API exposed to record iterator is: push_record(raw_record) finish() The API that implementations are expected to fill in are: want(raw_record) -> boolean parse_record(raw_record) -> entity try_update(entity) -> boolean insert_batch([entity]) -> None This class exposes helpers for implementations: self.api self.get_editgroup_id() self.create_(entity) -> EntityEdit for related entity types self.push_entity(entity) self.counts['exists'] += 1 if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity Parameters: submit_mode: instead of accepting editgroups, only submits them. implementors must write insert_batch appropriately """ def __init__(self, api, **kwargs): eg_extra = kwargs.get('editgroup_extra', dict()) eg_extra['git_rev'] = eg_extra.get('git_rev', subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') self.api = api self.do_updates = bool(kwargs.get('do_updates', True)) self.do_fuzzy_match = kwargs.get('do_fuzzy_match', True) self.bezerk_mode = kwargs.get('bezerk_mode', False) self.submit_mode = kwargs.get('submit_mode', False) self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = eg_extra self.es_client = kwargs.get('es_client') if not self.es_client: self.es_client = elasticsearch.Elasticsearch("https://search.fatcat.wiki", timeout=120) self._issnl_id_map = dict() self._orcid_id_map = dict() self._orcid_regex = re.compile(r"^\d{4}-\d{4}-\d{4}-\d{3}[\dX]$") self._doi_id_map = dict() self._pmid_id_map = dict() self.reset() def reset(self): self.counts = Counter({'total': 0, 'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) self._edit_count = 0 self._editgroup_id = None self._entity_queue = [] self._edits_inflight = [] def push_record(self, raw_record): """ Returns nothing. """ self.counts['total'] += 1 if (not raw_record) or (not self.want(raw_record)): self.counts['skip'] += 1 return entity = self.parse_record(raw_record) if not entity: self.counts['skip'] += 1 return if self.bezerk_mode: self.push_entity(entity) return if self.try_update(entity): self.push_entity(entity) return def parse_record(self, raw_record): """ Returns an entity class type, or None if we should skip this one. May have side-effects (eg, create related entities), but shouldn't update/mutate the actual entity. """ # implementations should fill this in raise NotImplementedError def finish(self): """ Gets called as cleanup at the end of imports, but can also be called at any time to "snip off" current editgroup progress. In other words, safe to call this but then continue pushing records. For example, in a persistent worker could call this if there have been no new entities fed in for more than some time period, to ensure that entities actually get created within a reasonable time frame. """ if self._edit_count > 0: if self.submit_mode: self.api.submit_editgroup(self._editgroup_id) else: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] if self._entity_queue: self.insert_batch(self._entity_queue) self.counts['insert'] += len(self._entity_queue) self._entity_queue = [] return self.counts def get_editgroup_id(self, edits=1): if self._edit_count >= self.edit_batch_size: if self.submit_mode: self.api.submit_editgroup(self._editgroup_id) else: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] if not self._editgroup_id: eg = self.api.create_editgroup( fatcat_openapi_client.Editgroup( description=self.editgroup_description, extra=self.editgroup_extra)) self._editgroup_id = eg.editgroup_id self._edit_count += edits return self._editgroup_id def create_container(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.container'] += 1 return self.api.create_container(eg_id, entity) def create_release(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.release'] += 1 return self.api.create_release(eg_id, entity) def create_file(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.file'] += 1 return self.api.create_file(eg_id, entity) def updated(self): """ Implementations should call this from try_update() if the update was successful """ self.counts['update'] += 1 def push_entity(self, entity): self._entity_queue.append(entity) if len(self._entity_queue) >= self.edit_batch_size: self.insert_batch(self._entity_queue) self.counts['insert'] += len(self._entity_queue) self._entity_queue = [] def want(self, raw_record): """ Implementations can override for optional fast-path to drop a record. Must have no side-effects; returns bool. """ return True def try_update(self, raw_record): """ Passed the output of parse_record(). Should try to find an existing entity and update it (PUT), decide we should do nothing (based on the existing record), or create a new one. Implementations must update the exists/updated/skip counts appropriately in this method. Returns boolean: True if the entity should still be inserted, False otherwise """ raise NotImplementedError def insert_batch(self, raw_record): raise NotImplementedError def is_orcid(self, orcid): # TODO: replace with clean_orcid() from fatcat_tools.normal return self._orcid_regex.match(orcid) is not None def lookup_orcid(self, orcid): """Caches calls to the Orcid lookup API endpoint in a local dict""" if not self.is_orcid(orcid): return None if orcid in self._orcid_id_map: return self._orcid_id_map[orcid] creator_id = None try: rv = self.api.lookup_creator(orcid=orcid) creator_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae self._orcid_id_map[orcid] = creator_id # might be None return creator_id def is_doi(self, doi): # TODO: replace with clean_doi() from fatcat_tools.normal return doi.startswith("10.") and doi.count("/") >= 1 def lookup_doi(self, doi): """Caches calls to the doi lookup API endpoint in a local dict For identifier lookups only (not full object fetches)""" assert self.is_doi(doi) doi = doi.lower() if doi in self._doi_id_map: return self._doi_id_map[doi] release_id = None try: rv = self.api.lookup_release(doi=doi, hide="abstracts,refs,contribs") release_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae self._doi_id_map[doi] = release_id # might be None return release_id def lookup_pmid(self, pmid): """Caches calls to the pmid lookup API endpoint in a local dict For identifier lookups only (not full object fetches)""" if pmid in self._pmid_id_map: return self._pmid_id_map[pmid] release_id = None try: rv = self.api.lookup_release(pmid=pmid, hide="abstracts,refs,contribs") release_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae self._pmid_id_map[pmid] = release_id # might be None return release_id def is_issnl(self, issnl): return len(issnl) == 9 and issnl[4] == '-' def lookup_issnl(self, issnl): """Caches calls to the ISSN-L lookup API endpoint in a local dict""" if issnl in self._issnl_id_map: return self._issnl_id_map[issnl] container_id = None try: rv = self.api.lookup_container(issnl=issnl) container_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae self._issnl_id_map[issnl] = container_id # might be None return container_id def read_issn_map_file(self, issn_map_file): print("Loading ISSN map file...", file=sys.stderr) self._issn_issnl_map = dict() for line in issn_map_file: if line.startswith("ISSN") or len(line) == 0: continue (issn, issnl) = line.split()[0:2] self._issn_issnl_map[issn] = issnl # double mapping makes lookups easy self._issn_issnl_map[issnl] = issnl print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map)), file=sys.stderr) def issn2issnl(self, issn): if issn is None: return None return self._issn_issnl_map.get(issn) @staticmethod def generic_file_cleanups(existing): """ Conservative cleanup of existing file entities. Intended to be used in most bulk cleanups and other file entity updates, to reduce edit volume for catalog size/churn efficiency. Note: the former check for 'None' as a wayback datetime has been completely cleaned up """ # update old/deprecated 'rel' on URLs for i in range(len(existing.urls)): u = existing.urls[i] if u.rel == 'repository' and '://archive.org/download/' in u.url: existing.urls[i].rel = 'archive' if u.rel == 'social': u.rel = 'academicsocial' # remove URLs which are near-duplicates redundant_urls = [] all_urls = [u.url for u in existing.urls] all_wayback_urls = [u.url for u in existing.urls if '://web.archive.org/web/' in u.url] for url in all_urls: # https/http redundancy if url.startswith('http://') and url.replace('http://', 'https://', 1) in all_urls: redundant_urls.append(url) continue # default HTTP port included and not included if ':80/' in url and url.replace(':80', '', 1) in all_urls: redundant_urls.append(url) continue # partial and complete wayback timestamps if '://web.archive.org/web/2017/' in url: original_url = "/".join(url.split("/")[5:]) assert len(original_url) > 5 for wb_url in all_wayback_urls: alt_timestamp = wb_url.split("/")[4] if len(alt_timestamp) >= 10 and original_url in wb_url: redundant_urls.append(url) break existing.urls = [u for u in existing.urls if u.url not in redundant_urls] return existing def match_existing_release_fuzzy(self, release: ReleaseEntity) -> Optional[Tuple[str, str, ReleaseEntity]]: """ This helper function uses fuzzycat (and elasticsearch) to look for existing release entities with similar metadata. Returns None if there was no match of any kind, or a single tuple (status: str, reason: str, existing: ReleaseEntity) if there was a match. Status string is one of the fuzzycat.common.Status, with "strongest match" in this sorted order: - EXACT - STRONG - WEAK - AMBIGUOUS Eg, if there is any EXACT match that is always returned; an AMBIGIOUS result is only returned if all the candidate matches were ambiguous. """ # this map used to establish priority order of verified matches STATUS_SORT = { fuzzycat.common.Status.TODO: 0, fuzzycat.common.Status.EXACT: 10, fuzzycat.common.Status.STRONG: 20, fuzzycat.common.Status.WEAK: 30, fuzzycat.common.Status.AMBIGUOUS: 40, fuzzycat.common.Status.DIFFERENT: 60, } # TODO: the size here is a first guess; what should it really be? candidates = match_release_fuzzy(release, size=10, es=self.es_client) if not candidates: return None release_dict = entity_to_dict(release, api_client=self.api.api_client) verified = [(fuzzycat.verify.verify(release_dict, entity_to_dict(c, api_client=self.api.api_client)), c) for c in candidates] # chose the "closest" match closest = sorted(verified, key=lambda v: STATUS_SORT[v[0].status])[0] if closest[0].status == fuzzycat.common.Status.DIFFERENT: return None elif closest[0].status == fuzzycat.common.Status.TODO: raise NotImplementedError("fuzzycat verify hit a Status.TODO") else: return (closest[0].status.name, closest[0].reason.value, closest[1]) class RecordPusher: """ Base class for different importer sources. Pretty trivial interface, just wraps an importer and pushes records in to it. """ def __init__(self, importer, **kwargs): self.importer = importer def run(self): """ This will look something like: for line in sys.stdin: record = json.loads(line) self.importer.push_record(record) print(self.importer.finish()) """ raise NotImplementedError class JsonLinePusher(RecordPusher): def __init__(self, importer, json_file, **kwargs): self.importer = importer self.json_file = json_file def run(self): for line in self.json_file: if not line: continue record = json.loads(line) self.importer.push_record(record) counts = self.importer.finish() print(counts, file=sys.stderr) return counts class CsvPusher(RecordPusher): def __init__(self, importer, csv_file, **kwargs): self.importer = importer self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) def run(self): for line in self.reader: if not line: continue self.importer.push_record(line) counts = self.importer.finish() print(counts, file=sys.stderr) return counts class LinePusher(RecordPusher): def __init__(self, importer, text_file, **kwargs): self.importer = importer self.text_file = text_file def run(self): for line in self.text_file: if not line: continue self.importer.push_record(line) counts = self.importer.finish() print(counts, file=sys.stderr) return counts class SqlitePusher(RecordPusher): def __init__(self, importer, db_file, table_name, where_clause="", **kwargs): self.importer = importer self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') self.db.row_factory = sqlite3.Row self.table_name = table_name self.where_clause = where_clause def run(self): cur = self.db.execute("SELECT * FROM {} {};".format( self.table_name, self.where_clause)) for row in cur: self.importer.push_record(row) counts = self.importer.finish() print(counts, file=sys.stderr) return counts class Bs4XmlLinesPusher(RecordPusher): def __init__(self, importer, xml_file, prefix_filter=None, **kwargs): self.importer = importer self.xml_file = xml_file self.prefix_filter = prefix_filter def run(self): for line in self.xml_file: if not line: continue if self.prefix_filter and not line.startswith(self.prefix_filter): continue soup = BeautifulSoup(line, "xml") self.importer.push_record(soup) soup.decompose() counts = self.importer.finish() print(counts, file=sys.stderr) return counts class Bs4XmlFilePusher(RecordPusher): def __init__(self, importer, xml_file, record_tag, **kwargs): self.importer = importer self.xml_file = xml_file self.record_tag = record_tag def run(self): soup = BeautifulSoup(self.xml_file, "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) record.decompose() counts = self.importer.finish() soup.decompose() print(counts, file=sys.stderr) return counts class Bs4XmlLargeFilePusher(RecordPusher): """ This is a variant of Bs4XmlFilePusher which parses large files incrementally, instead of loading the whole thing in RAM first. The dominant source of RAM utilization at start-up is the large ISSN/ISSN-L map. This can be confirmed in local development by using the small map in ./tests/files/. Current implementation is weird/inefficient in that it re-parses with BeautifulSoup (lxml) every article, but I didn't want to mangle or re-write with a different BS back-end. Did at least casual testing and all of: record.decompose(), soup.decompose(), element.clear(), root.clear() helped with memory usage. With all of these, memory growth is very slow and can probably be explained by inner container/release API lookup caches. """ def __init__(self, importer, xml_file, record_tags, use_lxml=False, **kwargs): self.importer = importer self.xml_file = xml_file self.record_tags = record_tags self.use_lxml = use_lxml def run(self): if self.use_lxml: elem_iter = lxml.etree.iterparse(self.xml_file, ["start", "end"], load_dtd=True) else: elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) root = None for (event, element) in elem_iter: if (root is not None) and event == "start": root = element continue if not (element.tag in self.record_tags and event == "end"): continue if self.use_lxml: soup = BeautifulSoup(lxml.etree.tostring(element), "xml") else: soup = BeautifulSoup(ET.tostring(element), "xml") for record in soup.find_all(): if record.name not in self.record_tags: continue self.importer.push_record(record) record.decompose() soup.decompose() element.clear() if root is not None: root.clear() counts = self.importer.finish() print(counts, file=sys.stderr) return counts class Bs4XmlFileListPusher(RecordPusher): def __init__(self, importer, list_file, record_tag, **kwargs): self.importer = importer self.list_file = list_file self.record_tag = record_tag def run(self): for xml_path in self.list_file: xml_path = xml_path.strip() if not xml_path or xml_path.startswith("#"): continue with open(xml_path, 'r') as xml_file: soup = BeautifulSoup(xml_file, "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) record.decompose() soup.decompose() counts = self.importer.finish() print(counts) return counts class KafkaBs4XmlPusher(RecordPusher): """ Fetch XML for an article from Kafka, parse via Bs4. """ def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( kafka_hosts, kafka_env, topic_suffix, group, kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') ) self.poll_interval = kwargs.get('poll_interval', 5.0) self.consume_batch_size = kwargs.get('consume_batch_size', 25) def run(self): count = 0 last_push = datetime.datetime.now() while True: # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed # never created. # This is partially mitigated for the worker case by flushing any # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: if msg.error(): raise KafkaException(msg.error()) # ... then process for msg in batch: soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") self.importer.push_record(soup) soup.decompose() count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value self.consumer.store_offsets(message=msg) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() print(counts) self.consumer.close() return counts class KafkaJsonPusher(RecordPusher): def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( kafka_hosts, kafka_env, topic_suffix, group, kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') ) self.poll_interval = kwargs.get('poll_interval', 5.0) self.consume_batch_size = kwargs.get('consume_batch_size', 100) def run(self): count = 0 last_push = datetime.datetime.now() while True: # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before committing offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed # never created. # This is partially mitigated for the worker case by flushing any # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: if msg.error(): raise KafkaException(msg.error()) # ... then process for msg in batch: record = json.loads(msg.value().decode('utf-8')) self.importer.push_record(record) count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value self.consumer.store_offsets(message=msg) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() print(counts) self.consumer.close() return counts def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat"): topic_name = "{}-{}.{}".format(kafka_namespace, env, topic_suffix) def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(err) for p in partitions: # check for partition-specific commit errors if p.error: print("Kafka consumer commit error: {}".format(p.error)) print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) #print("Kafka consumer commit successful") pass # previously, using pykafka #auto_commit_enable=True, #auto_commit_interval_ms=30000, # 30 seconds conf = { 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, 'enable.auto.commit': True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) 'max.poll.interval.ms': 120000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, } def on_rebalance(consumer, partitions): for p in partitions: if p.error: raise KafkaException(p.error) print("Kafka partitions rebalanced: {} / {}".format( consumer, partitions)) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) consumer.subscribe([topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) print("Consuming from kafka topic {}, group {}".format(topic_name, group)) return consumer