import re import sys import csv import json import ftfy import base64 import sqlite3 import datetime import subprocess import unicodedata from collections import Counter from confluent_kafka import Consumer, KafkaException import xml.etree.ElementTree as ET from bs4 import BeautifulSoup import fatcat_openapi_client from fatcat_openapi_client.rest import ApiException DATE_FMT = "%Y-%m-%d" SANE_MAX_RELEASES = 200 SANE_MAX_URLS = 100 # These are very close, but maybe not exactly 1-to-1 with 639-2? Some mix of # 2/T and 2/B? # PubMed/MEDLINE and JSTOR use these MARC codes # https://www.loc.gov/marc/languages/language_name.html LANG_MAP_MARC = { 'afr': 'af', 'alb': 'sq', 'amh': 'am', 'ara': 'ar', 'arm': 'hy', 'aze': 'az', 'ben': 'bn', 'bos': 'bs', 'bul': 'bg', 'cat': 'ca', 'chi': 'zh', 'cze': 'cs', 'dan': 'da', 'dut': 'nl', 'eng': 'en', 'epo': 'eo', 'est': 'et', 'fin': 'fi', 'fre': 'fr', 'geo': 'ka', 'ger': 'de', 'gla': 'gd', 'gre': 'el', 'heb': 'he', 'hin': 'hi', 'hrv': 'hr', 'hun': 'hu', 'ice': 'is', 'ind': 'id', 'ita': 'it', 'jpn': 'ja', 'kin': 'rw', 'kor': 'ko', 'lat': 'la', 'lav': 'lv', 'lit': 'lt', 'mac': 'mk', 'mal': 'ml', 'mao': 'mi', 'may': 'ms', 'nor': 'no', 'per': 'fa', 'per': 'fa', 'pol': 'pl', 'por': 'pt', 'pus': 'ps', 'rum': 'ro', 'rus': 'ru', 'san': 'sa', 'slo': 'sk', 'slv': 'sl', 'spa': 'es', 'srp': 'sr', 'swe': 'sv', 'tha': 'th', 'tur': 'tr', 'ukr': 'uk', 'urd': 'ur', 'vie': 'vi', 'wel': 'cy', # additions 'gle': 'ga', # "Irish" (Gaelic) 'jav': 'jv', # Javanese 'welsh': 'cy', # Welsh 'oci': 'oc', # Occitan # Don't have ISO 639-1 codes 'grc': 'el', # Ancient Greek; map to modern greek 'map': None, # Austronesian (collection) 'syr': None, # Syriac, Modern 'gem': None, # Old Saxon 'non': None, # Old Norse 'emg': None, # Eastern Meohang 'neg': None, # Negidal 'mul': None, # Multiple languages 'und': None, # Undetermined } def clean(thing, force_xml=False): """ This function is appropriate to be called on any random, non-markup string, such as author names, titles, etc. It will try to clean up common unicode mangles, HTML characters, etc. This will detect XML/HTML and "do the right thing" (aka, not remove entities like '&' if there are tags in the string), unless you pass the 'force_xml' parameter, which might be appropriate for, eg, names and titles, which generally should be projected down to plain text. Also strips extra whitespace. """ if not thing: return None fix_entities = 'auto' if force_xml: fix_entities = True fixed = ftfy.fix_text(thing, fix_entities=fix_entities).strip() if not fixed or len(fixed) <= 1: # wasn't zero-length before, but is now; return None return None return fixed def test_clean(): assert clean(None) == None assert clean('') == None assert clean('1') == None assert clean('123') == '123' assert clean('a&b') == 'a&b' assert clean('a&b') == 'a&b' assert clean('a&b', force_xml=True) == 'a&b' def b32_hex(s): s = s.strip().split()[0].lower() if s.startswith("sha1:"): s = s[5:] if len(s) != 32: return s return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') def is_cjk(s): if not s: return False for c in s: if c.isalpha(): lang_prefix = unicodedata.name(c).split()[0] return lang_prefix in ('CJK', 'HIRAGANA', 'KATAKANA', 'HANGUL') return False def test_is_cjk(): assert is_cjk(None) == False assert is_cjk('') == False assert is_cjk('blah') == False assert is_cjk('岡, 鹿, 梨, 阜, 埼') == True assert is_cjk('[岡, 鹿, 梨, 阜, 埼]') == True assert is_cjk('菊') == True assert is_cjk('岡, 鹿, 梨, 阜, 埼 with eng after') == True assert is_cjk('水道') == True assert is_cjk('オウ, イク') == True # kanji assert is_cjk('ひヒ') == True assert is_cjk('き゚ゅ') == True assert is_cjk('ㄴ, ㄹ, ㅁ, ㅂ, ㅅ') == True DOMAIN_REL_MAP = { "archive.org": "archive", # LOCKSS, Portico, DuraSpace, etc would also be "archive" "arxiv.org": "repository", "babel.hathitrust.org": "repository", "cds.cern.ch": "repository", "deepblue.lib.umich.edu": "repository", "europepmc.org": "repository", "hal.inria.fr": "repository", "scielo.isciii.es": "repository", "www.dtic.mil": "repository", "www.jstage.jst.go.jp": "repository", "www.jstor.org": "repository", "www.ncbi.nlm.nih.gov": "repository", "ftp.ncbi.nlm.nih.gov": "repository", "www.scielo.br": "repository", "www.scielo.cl": "repository", "www.scielo.org.mx": "repository", "zenodo.org": "repository", "www.biorxiv.org": "repository", "www.medrxiv.org": "repository", "citeseerx.ist.psu.edu": "aggregator", "publisher-connector.core.ac.uk": "aggregator", "core.ac.uk": "aggregator", "static.aminer.org": "aggregator", "aminer.org": "aggregator", "pdfs.semanticscholar.org": "aggregator", "semanticscholar.org": "aggregator", "www.semanticscholar.org": "aggregator", "academic.oup.com": "publisher", "cdn.elifesciences.org": "publisher", "cell.com": "publisher", "dl.acm.org": "publisher", "downloads.hindawi.com": "publisher", "elifesciences.org": "publisher", "iopscience.iop.org": "publisher", "journals.plos.org": "publisher", "link.springer.com": "publisher", "onlinelibrary.wiley.com": "publisher", "works.bepress.com": "publisher", "www.biomedcentral.com": "publisher", "www.cell.com": "publisher", "www.nature.com": "publisher", "www.pnas.org": "publisher", "www.tandfonline.com": "publisher", "www.frontiersin.org": "publisher", "www.degruyter.com": "publisher", "www.mdpi.com": "publisher", "www.ahajournals.org": "publisher", "ehp.niehs.nih.gov": "publisher", "journals.tsu.ru": "publisher", "www.cogentoa.com": "publisher", "www.researchgate.net": "academicsocial", "academia.edu": "academicsocial", "wayback.archive-it.org": "webarchive", "web.archive.org": "webarchive", "archive.is": "webarchive", } def make_rel_url(raw_url, default_link_rel="web"): # this is where we map specific domains to rel types, and also filter out # bad domains, invalid URLs, etc rel = default_link_rel for domain, domain_rel in DOMAIN_REL_MAP.items(): if "//{}/".format(domain) in raw_url: rel = domain_rel break return (rel, raw_url) def test_make_rel_url(): assert make_rel_url("http://example.com/thing.pdf")[0] == "web" assert make_rel_url("http://example.com/thing.pdf", default_link_rel="jeans")[0] == "jeans" assert make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] == "webarchive" assert make_rel_url("http://cell.com/thing.pdf")[0] == "publisher" class EntityImporter: """ Base class for fatcat entity importers. The API exposed to record iterator is: push_record(raw_record) finish() The API that implementations are expected to fill in are: want(raw_record) -> boolean parse_record(raw_record) -> entity try_update(entity) -> boolean insert_batch([entity]) -> None This class exposes helpers for implementations: self.api self.get_editgroup_id() self.create_(entity) -> EntityEdit for related entity types self.push_entity(entity) self.counts['exists'] += 1 if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity Parameters: submit_mode: instead of accepting editgroups, only submits them. implementors must write insert_batch appropriately """ def __init__(self, api, **kwargs): eg_extra = kwargs.get('editgroup_extra', dict()) eg_extra['git_rev'] = eg_extra.get('git_rev', subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') self.api = api self.do_updates = bool(kwargs.get('do_updates', True)) self.bezerk_mode = kwargs.get('bezerk_mode', False) self.submit_mode = kwargs.get('submit_mode', False) self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = eg_extra self.reset() self._issnl_id_map = dict() self._orcid_id_map = dict() self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") self._doi_id_map = dict() self._pmid_id_map = dict() def reset(self): self.counts = Counter({'total': 0, 'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) self._edit_count = 0 self._editgroup_id = None self._entity_queue = [] self._edits_inflight = [] def push_record(self, raw_record): """ Returns nothing. """ self.counts['total'] += 1 if (not raw_record) or (not self.want(raw_record)): self.counts['skip'] += 1 return entity = self.parse_record(raw_record) if not entity: self.counts['skip'] += 1 return if self.bezerk_mode: self.push_entity(entity) return if self.try_update(entity): self.push_entity(entity) return def parse_record(self, raw_record): """ Returns an entity class type, or None if we should skip this one. May have side-effects (eg, create related entities), but shouldn't update/mutate the actual entity. """ # implementations should fill this in raise NotImplementedError def finish(self): """ Gets called as cleanup at the end of imports, but can also be called at any time to "snip off" current editgroup progress. In other words, safe to call this but then continue pushing records. For example, in a persistent worker could call this if there have been no new entities fed in for more than some time period, to ensure that entities actually get created within a reasonable time frame. """ if self._edit_count > 0: if self.submit_mode: self.api.submit_editgroup(self._editgroup_id) else: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] if self._entity_queue: self.insert_batch(self._entity_queue) self.counts['insert'] += len(self._entity_queue) self._entity_queue = [] return self.counts def get_editgroup_id(self, edits=1): if self._edit_count >= self.edit_batch_size: if self.submit_mode: self.api.submit_editgroup(self._editgroup_id) else: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] if not self._editgroup_id: eg = self.api.create_editgroup( fatcat_openapi_client.Editgroup( description=self.editgroup_description, extra=self.editgroup_extra)) self._editgroup_id = eg.editgroup_id self._edit_count += edits return self._editgroup_id def create_container(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.container'] += 1 return self.api.create_container(eg_id, entity) def create_release(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.release'] += 1 return self.api.create_release(eg_id, entity) def create_file(self, entity): eg_id = self.get_editgroup_id() self.counts['inserted.file'] += 1 return self.api.create_file(eg_id, entity) def updated(self): """ Implementations should call this from try_update() if the update was successful """ self.counts['update'] += 1 def push_entity(self, entity): self._entity_queue.append(entity) if len(self._entity_queue) >= self.edit_batch_size: self.insert_batch(self._entity_queue) self.counts['insert'] += len(self._entity_queue) self._entity_queue = [] def want(self, raw_record): """ Implementations can override for optional fast-path to drop a record. Must have no side-effects; returns bool. """ return True def try_update(self, raw_record): """ Passed the output of parse_record(). Should try to find an existing entity and update it (PUT), decide we should do nothing (based on the existing record), or create a new one. Implementations must update the exists/updated/skip counts appropriately in this method. Returns boolean: True if the entity should still be inserted, False otherwise """ raise NotImplementedError def insert_batch(self, raw_record): raise NotImplementedError def is_orcid(self, orcid): return self._orcid_regex.match(orcid) is not None def lookup_orcid(self, orcid): """Caches calls to the Orcid lookup API endpoint in a local dict""" if not self.is_orcid(orcid): return None if orcid in self._orcid_id_map: return self._orcid_id_map[orcid] creator_id = None try: rv = self.api.lookup_creator(orcid=orcid) creator_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong assert ae.status == 404 self._orcid_id_map[orcid] = creator_id # might be None return creator_id def is_doi(self, doi): return doi.startswith("10.") and doi.count("/") >= 1 def lookup_doi(self, doi): """Caches calls to the doi lookup API endpoint in a local dict For identifier lookups only (not full object fetches)""" assert self.is_doi(doi) doi = doi.lower() if doi in self._doi_id_map: return self._doi_id_map[doi] release_id = None try: rv = self.api.lookup_release(doi=doi, hide="abstracts,refs,contribs") release_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong assert ae.status == 404 self._doi_id_map[doi] = release_id # might be None return release_id def lookup_pmid(self, pmid): """Caches calls to the pmid lookup API endpoint in a local dict For identifier lookups only (not full object fetches)""" if pmid in self._pmid_id_map: return self._pmid_id_map[pmid] release_id = None try: rv = self.api.lookup_release(pmid=pmid, hide="abstracts,refs,contribs") release_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong assert ae.status == 404 self._pmid_id_map[pmid] = release_id # might be None return release_id def is_issnl(self, issnl): return len(issnl) == 9 and issnl[4] == '-' def lookup_issnl(self, issnl): """Caches calls to the ISSN-L lookup API endpoint in a local dict""" if issnl in self._issnl_id_map: return self._issnl_id_map[issnl] container_id = None try: rv = self.api.lookup_container(issnl=issnl) container_id = rv.ident except ApiException as ae: # If anything other than a 404 (not found), something is wrong assert ae.status == 404 self._issnl_id_map[issnl] = container_id # might be None return container_id def read_issn_map_file(self, issn_map_file): print("Loading ISSN map file...", file=sys.stderr) self._issn_issnl_map = dict() for line in issn_map_file: if line.startswith("ISSN") or len(line) == 0: continue (issn, issnl) = line.split()[0:2] self._issn_issnl_map[issn] = issnl # double mapping makes lookups easy self._issn_issnl_map[issnl] = issnl print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map)), file=sys.stderr) def issn2issnl(self, issn): if issn is None: return None return self._issn_issnl_map.get(issn) class RecordPusher: """ Base class for different importer sources. Pretty trivial interface, just wraps an importer and pushes records in to it. """ def __init__(self, importer, **kwargs): self.importer = importer def run(self): """ This will look something like: for line in sys.stdin: record = json.loads(line) self.importer.push_record(record) print(self.importer.finish()) """ raise NotImplementedError class JsonLinePusher(RecordPusher): def __init__(self, importer, json_file, **kwargs): self.importer = importer self.json_file = json_file def run(self): for line in self.json_file: if not line: continue record = json.loads(line) self.importer.push_record(record) counts = self.importer.finish() print(counts) return counts class CsvPusher(RecordPusher): def __init__(self, importer, csv_file, **kwargs): self.importer = importer self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) def run(self): for line in self.reader: if not line: continue self.importer.push_record(line) counts = self.importer.finish() print(counts) return counts class LinePusher(RecordPusher): def __init__(self, importer, text_file, **kwargs): self.importer = importer self.text_file = text_file def run(self): for line in self.text_file: if not line: continue self.importer.push_record(line) counts = self.importer.finish() print(counts) return counts class SqlitePusher(RecordPusher): def __init__(self, importer, db_file, table_name, where_clause="", **kwargs): self.importer = importer self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') self.db.row_factory = sqlite3.Row self.table_name = table_name self.where_clause = where_clause def run(self): cur = self.db.execute("SELECT * FROM {} {};".format( self.table_name, self.where_clause)) for row in cur: self.importer.push_record(row) counts = self.importer.finish() print(counts) return counts class Bs4XmlLinesPusher(RecordPusher): def __init__(self, importer, xml_file, prefix_filter=None, **kwargs): self.importer = importer self.xml_file = xml_file self.prefix_filter = prefix_filter def run(self): for line in self.xml_file: if not line: continue if self.prefix_filter and not line.startswith(self.prefix_filter): continue soup = BeautifulSoup(line, "xml") self.importer.push_record(soup) soup.decompose() counts = self.importer.finish() print(counts) return counts class Bs4XmlFilePusher(RecordPusher): def __init__(self, importer, xml_file, record_tag, **kwargs): self.importer = importer self.xml_file = xml_file self.record_tag = record_tag def run(self): soup = BeautifulSoup(self.xml_file, "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) record.decompose() counts = self.importer.finish() soup.decompose() print(counts) return counts class Bs4XmlLargeFilePusher(RecordPusher): """ This is a variant of Bs4XmlFilePusher which parses large files incrementally, instead of loading the whole thing in RAM first. The dominant source of RAM utilization at start-up is the large ISSN/ISSN-L map. This can be confirmed in local development by using the small map in ./tests/files/. Current implementation is weird/inefficient in that it re-parses with BeautifulSoup (lxml) every article, but I didn't want to mangle or re-write with a different BS back-end. Did at least casual testing and all of: record.decompose(), soup.decompose(), element.clear(), root.clear() helped with memory usage. With all of these, memory growth is very slow and can probably be explained by inner container/release API lookup caches. """ def __init__(self, importer, xml_file, record_tag, **kwargs): self.importer = importer self.xml_file = xml_file self.record_tag = record_tag def run(self): elem_iter = ET.iterparse(self.xml_file, ["start", "end"]) i = 0 root = None for (event, element) in elem_iter: if not root and event == "start": root = element continue if not (element.tag == self.record_tag and event == "end"): continue soup = BeautifulSoup(ET.tostring(element), "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) record.decompose() soup.decompose() element.clear() root.clear() counts = self.importer.finish() print(counts) return counts class Bs4XmlFileListPusher(RecordPusher): def __init__(self, importer, list_file, record_tag, **kwargs): self.importer = importer self.list_file = list_file self.record_tag = record_tag def run(self): for xml_path in self.list_file: xml_path = xml_path.strip() if not xml_path or xml_path.startswith("#"): continue with open(xml_path, 'r') as xml_file: soup = BeautifulSoup(xml_file, "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) record.decompose() soup.decompose() counts = self.importer.finish() print(counts) return counts class KafkaBs4XmlPusher(RecordPusher): """ Fetch XML for an article from Kafka, parse via Bs4. """ def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( kafka_hosts, kafka_env, topic_suffix, group, kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') ) self.poll_interval = kwargs.get('poll_interval', 5.0) self.consume_batch_size = kwargs.get('consume_batch_size', 25) def run(self): count = 0 last_push = datetime.datetime.now() while True: # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed # never created. # This is partially mitigated for the worker case by flushing any # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: if msg.error(): raise KafkaException(msg.error()) # ... then process for msg in batch: soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") self.importer.push_record(soup) soup.decompose() count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value self.consumer.store_offsets(message=msg) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() print(counts) self.consumer.close() return counts class KafkaJsonPusher(RecordPusher): def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( kafka_hosts, kafka_env, topic_suffix, group, kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') ) self.poll_interval = kwargs.get('poll_interval', 5.0) self.consume_batch_size = kwargs.get('consume_batch_size', 100) def run(self): count = 0 last_push = datetime.datetime.now() while True: # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed # never created. # This is partially mitigated for the worker case by flushing any # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: if msg.error(): raise KafkaException(msg.error()) # ... then process for msg in batch: record = json.loads(msg.value().decode('utf-8')) self.importer.push_record(record) count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value self.consumer.store_offsets(message=msg) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() print(counts) self.consumer.close() return counts def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat"): topic_name = "{}-{}.{}".format(kafka_namespace, env, topic_suffix) def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(err) for p in partitions: # check for partition-specific commit errors if p.error: print("Kafka consumer commit error: {}".format(p.error)) print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) #print("Kafka consumer commit successful") pass # previously, using pykafka #auto_commit_enable=True, #auto_commit_interval_ms=30000, # 30 seconds conf = { 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, 'enable.auto.commit': True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) 'max.poll.interval.ms': 120000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, } def on_rebalance(consumer, partitions): for p in partitions: if p.error: raise KafkaException(p.error) print("Kafka partitions rebalanced: {} / {}".format( consumer, partitions)) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) consumer.subscribe([topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) print("Consuming from kafka topic {}, group {}".format(topic_name, group)) return consumer