diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 198 | 
1 files changed, 106 insertions, 92 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index e33a2012..2639c85a 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -1,4 +1,3 @@ -  import csv  import datetime  import json @@ -34,7 +33,6 @@ SANE_MAX_URLS: int = 100  DOMAIN_REL_MAP: Dict[str, str] = {      "archive.org": "archive",      # LOCKSS, Portico, DuraSpace, etc would also be "archive" -      "arxiv.org": "repository",      "babel.hathitrust.org": "repository",      "cds.cern.ch": "repository", @@ -53,7 +51,6 @@ DOMAIN_REL_MAP: Dict[str, str] = {      "zenodo.org": "repository",      "www.biorxiv.org": "repository",      "www.medrxiv.org": "repository", -      "citeseerx.ist.psu.edu": "aggregator",      "publisher-connector.core.ac.uk": "aggregator",      "core.ac.uk": "aggregator", @@ -62,7 +59,6 @@ DOMAIN_REL_MAP: Dict[str, str] = {      "pdfs.semanticscholar.org": "aggregator",      "semanticscholar.org": "aggregator",      "www.semanticscholar.org": "aggregator", -      "academic.oup.com": "publisher",      "cdn.elifesciences.org": "publisher",      "cell.com": "publisher", @@ -86,15 +82,14 @@ DOMAIN_REL_MAP: Dict[str, str] = {      "ehp.niehs.nih.gov": "publisher",      "journals.tsu.ru": "publisher",      "www.cogentoa.com": "publisher", -      "www.researchgate.net": "academicsocial",      "academia.edu": "academicsocial", -      "wayback.archive-it.org": "webarchive",      "web.archive.org": "webarchive",      "archive.is": "webarchive",  } +  def make_rel_url(raw_url: str, default_link_rel: str = "web"):      # this is where we map specific domains to rel types, and also filter out      # bad domains, invalid URLs, etc @@ -105,12 +100,17 @@ def make_rel_url(raw_url: str, default_link_rel: str = "web"):              break      return (rel, raw_url) +  def test_make_rel_url():      assert make_rel_url("http://example.com/thing.pdf")[0] == "web"      assert make_rel_url("http://example.com/thing.pdf", default_link_rel="jeans")[0] == "jeans" -    assert make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] == "webarchive" +    assert ( +        make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] +        == "webarchive" +    )      assert make_rel_url("http://cell.com/thing.pdf")[0] == "publisher" +  class EntityImporter:      """      Base class for fatcat entity importers. @@ -147,23 +147,26 @@ class EntityImporter:      def __init__(self, api, **kwargs): -        eg_extra = kwargs.get('editgroup_extra', dict()) -        eg_extra['git_rev'] = eg_extra.get('git_rev', -            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') -        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') +        eg_extra = kwargs.get("editgroup_extra", dict()) +        eg_extra["git_rev"] = eg_extra.get( +            "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() +        ).decode("utf-8") +        eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityImporter")          self.api = api -        self.do_updates = bool(kwargs.get('do_updates', True)) -        self.do_fuzzy_match: bool = kwargs.get('do_fuzzy_match', True) -        self.bezerk_mode: bool = kwargs.get('bezerk_mode', False) -        self.submit_mode: bool = kwargs.get('submit_mode', False) -        self.edit_batch_size: int = kwargs.get('edit_batch_size', 100) -        self.editgroup_description: Optional[str] = kwargs.get('editgroup_description') +        self.do_updates = bool(kwargs.get("do_updates", True)) +        self.do_fuzzy_match: bool = kwargs.get("do_fuzzy_match", True) +        self.bezerk_mode: bool = kwargs.get("bezerk_mode", False) +        self.submit_mode: bool = kwargs.get("submit_mode", False) +        self.edit_batch_size: int = kwargs.get("edit_batch_size", 100) +        self.editgroup_description: Optional[str] = kwargs.get("editgroup_description")          self.editgroup_extra: Optional[Any] = eg_extra -        self.es_client = kwargs.get('es_client') +        self.es_client = kwargs.get("es_client")          if not self.es_client: -            self.es_client = elasticsearch.Elasticsearch("https://search.fatcat.wiki", timeout=120) +            self.es_client = elasticsearch.Elasticsearch( +                "https://search.fatcat.wiki", timeout=120 +            )          self._issnl_id_map: Dict[str, Any] = dict()          self._orcid_id_map: Dict[str, Any] = dict() @@ -174,7 +177,7 @@ class EntityImporter:          self.reset()      def reset(self) -> None: -        self.counts = Counter({'total': 0, 'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) +        self.counts = Counter({"total": 0, "skip": 0, "insert": 0, "update": 0, "exists": 0})          self._edit_count: int = 0          self._editgroup_id: Optional[str] = None          self._entity_queue: List[Any] = [] @@ -184,13 +187,13 @@ class EntityImporter:          """          Returns nothing.          """ -        self.counts['total'] += 1 +        self.counts["total"] += 1          if (not raw_record) or (not self.want(raw_record)): -            self.counts['skip'] += 1 +            self.counts["skip"] += 1              return          entity = self.parse_record(raw_record)          if not entity: -            self.counts['skip'] += 1 +            self.counts["skip"] += 1              return          if self.bezerk_mode:              self.push_entity(entity) @@ -230,7 +233,7 @@ class EntityImporter:          if self._entity_queue:              self.insert_batch(self._entity_queue) -            self.counts['insert'] += len(self._entity_queue) +            self.counts["insert"] += len(self._entity_queue)              self._entity_queue = []          return self.counts @@ -248,8 +251,9 @@ class EntityImporter:          if not self._editgroup_id:              eg = self.api.create_editgroup(                  fatcat_openapi_client.Editgroup( -                    description=self.editgroup_description, -                    extra=self.editgroup_extra)) +                    description=self.editgroup_description, extra=self.editgroup_extra +                ) +            )              self._editgroup_id = eg.editgroup_id          self._edit_count += edits @@ -257,30 +261,30 @@ class EntityImporter:      def create_container(self, entity):          eg_id = self.get_editgroup_id() -        self.counts['inserted.container'] += 1 +        self.counts["inserted.container"] += 1          return self.api.create_container(eg_id, entity)      def create_release(self, entity):          eg_id = self.get_editgroup_id() -        self.counts['inserted.release'] += 1 +        self.counts["inserted.release"] += 1          return self.api.create_release(eg_id, entity)      def create_file(self, entity):          eg_id = self.get_editgroup_id() -        self.counts['inserted.file'] += 1 +        self.counts["inserted.file"] += 1          return self.api.create_file(eg_id, entity)      def updated(self):          """          Implementations should call this from try_update() if the update was successful          """ -        self.counts['update'] += 1 +        self.counts["update"] += 1      def push_entity(self, entity):          self._entity_queue.append(entity)          if len(self._entity_queue) >= self.edit_batch_size:              self.insert_batch(self._entity_queue) -            self.counts['insert'] += len(self._entity_queue) +            self.counts["insert"] += len(self._entity_queue)              self._entity_queue = []      def want(self, raw_record: Any) -> bool: @@ -324,7 +328,7 @@ class EntityImporter:              # If anything other than a 404 (not found), something is wrong              if ae.status != 404:                  raise ae -        self._orcid_id_map[orcid] = creator_id # might be None +        self._orcid_id_map[orcid] = creator_id  # might be None          return creator_id      def is_doi(self, doi: str) -> bool: @@ -347,7 +351,7 @@ class EntityImporter:              # If anything other than a 404 (not found), something is wrong              if ae.status != 404:                  raise ae -        self._doi_id_map[doi] = release_id # might be None +        self._doi_id_map[doi] = release_id  # might be None          return release_id      def lookup_pmid(self, pmid: str): @@ -364,11 +368,11 @@ class EntityImporter:              # If anything other than a 404 (not found), something is wrong              if ae.status != 404:                  raise ae -        self._pmid_id_map[pmid] = release_id # might be None +        self._pmid_id_map[pmid] = release_id  # might be None          return release_id      def is_issnl(self, issnl: str) -> bool: -        return len(issnl) == 9 and issnl[4] == '-' +        return len(issnl) == 9 and issnl[4] == "-"      def lookup_issnl(self, issnl: str):          """Caches calls to the ISSN-L lookup API endpoint in a local dict""" @@ -382,7 +386,7 @@ class EntityImporter:              # If anything other than a 404 (not found), something is wrong              if ae.status != 404:                  raise ae -        self._issnl_id_map[issnl] = container_id # might be None +        self._issnl_id_map[issnl] = container_id  # might be None          return container_id      def read_issn_map_file(self, issn_map_file): @@ -417,26 +421,26 @@ class EntityImporter:          # update old/deprecated 'rel' on URLs          for i in range(len(existing.urls)):              u = existing.urls[i] -            if u.rel == 'repository' and '://archive.org/download/' in u.url: -                existing.urls[i].rel = 'archive' -            if u.rel == 'social': -                u.rel = 'academicsocial' +            if u.rel == "repository" and "://archive.org/download/" in u.url: +                existing.urls[i].rel = "archive" +            if u.rel == "social": +                u.rel = "academicsocial"          # remove URLs which are near-duplicates          redundant_urls = []          all_urls = [u.url for u in existing.urls] -        all_wayback_urls = [u.url for u in existing.urls if '://web.archive.org/web/' in u.url] +        all_wayback_urls = [u.url for u in existing.urls if "://web.archive.org/web/" in u.url]          for url in all_urls:              # https/http redundancy -            if url.startswith('http://') and url.replace('http://', 'https://', 1) in all_urls: +            if url.startswith("http://") and url.replace("http://", "https://", 1) in all_urls:                  redundant_urls.append(url)                  continue              # default HTTP port included and not included -            if ':80/' in url and url.replace(':80', '', 1) in all_urls: +            if ":80/" in url and url.replace(":80", "", 1) in all_urls:                  redundant_urls.append(url)                  continue              # partial and complete wayback timestamps -            if '://web.archive.org/web/2017/' in url: +            if "://web.archive.org/web/2017/" in url:                  original_url = "/".join(url.split("/")[5:])                  assert len(original_url) > 5                  for wb_url in all_wayback_urls: @@ -452,7 +456,9 @@ class EntityImporter:      def generic_fileset_cleanups(existing):          return existing -    def match_existing_release_fuzzy(self, release: ReleaseEntity) -> Optional[Tuple[str, str, ReleaseEntity]]: +    def match_existing_release_fuzzy( +        self, release: ReleaseEntity +    ) -> Optional[Tuple[str, str, ReleaseEntity]]:          """          This helper function uses fuzzycat (and elasticsearch) to look for          existing release entities with similar metadata. @@ -488,7 +494,15 @@ class EntityImporter:              return None          release_dict = entity_to_dict(release, api_client=self.api.api_client) -        verified = [(fuzzycat.verify.verify(release_dict, entity_to_dict(c, api_client=self.api.api_client)), c) for c in candidates] +        verified = [ +            ( +                fuzzycat.verify.verify( +                    release_dict, entity_to_dict(c, api_client=self.api.api_client) +                ), +                c, +            ) +            for c in candidates +        ]          # chose the "closest" match          closest = sorted(verified, key=lambda v: STATUS_SORT[v[0].status])[0] @@ -522,7 +536,6 @@ class RecordPusher:  class JsonLinePusher(RecordPusher): -      def __init__(self, importer, json_file, **kwargs):          self.importer = importer          self.json_file = json_file @@ -539,10 +552,9 @@ class JsonLinePusher(RecordPusher):  class CsvPusher(RecordPusher): -      def __init__(self, importer, csv_file, **kwargs):          self.importer = importer -        self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) +        self.reader = csv.DictReader(csv_file, delimiter=kwargs.get("delimiter", ","))      def run(self):          for line in self.reader: @@ -555,7 +567,6 @@ class CsvPusher(RecordPusher):  class LinePusher(RecordPusher): -      def __init__(self, importer, text_file, **kwargs):          self.importer = importer          self.text_file = text_file @@ -571,17 +582,15 @@ class LinePusher(RecordPusher):  class SqlitePusher(RecordPusher): -      def __init__(self, importer, db_file, table_name, where_clause="", **kwargs):          self.importer = importer -        self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') +        self.db = sqlite3.connect(db_file, isolation_level="EXCLUSIVE")          self.db.row_factory = sqlite3.Row          self.table_name = table_name          self.where_clause = where_clause      def run(self): -        cur = self.db.execute("SELECT * FROM {} {};".format( -            self.table_name, self.where_clause)) +        cur = self.db.execute("SELECT * FROM {} {};".format(self.table_name, self.where_clause))          for row in cur:              self.importer.push_record(row)          counts = self.importer.finish() @@ -590,7 +599,6 @@ class SqlitePusher(RecordPusher):  class Bs4XmlLinesPusher(RecordPusher): -      def __init__(self, importer, xml_file, prefix_filter=None, **kwargs):          self.importer = importer          self.xml_file = xml_file @@ -611,7 +619,6 @@ class Bs4XmlLinesPusher(RecordPusher):  class Bs4XmlFilePusher(RecordPusher): -      def __init__(self, importer, xml_file, record_tag, **kwargs):          self.importer = importer          self.xml_file = xml_file @@ -684,7 +691,6 @@ class Bs4XmlLargeFilePusher(RecordPusher):  class Bs4XmlFileListPusher(RecordPusher): -      def __init__(self, importer, list_file, record_tag, **kwargs):          self.importer = importer          self.list_file = list_file @@ -695,7 +701,7 @@ class Bs4XmlFileListPusher(RecordPusher):              xml_path = xml_path.strip()              if not xml_path or xml_path.startswith("#"):                  continue -            with open(xml_path, 'r') as xml_file: +            with open(xml_path, "r") as xml_file:                  soup = BeautifulSoup(xml_file, "xml")                  for record in soup.find_all(self.record_tag):                      self.importer.push_record(record) @@ -705,10 +711,12 @@ class Bs4XmlFileListPusher(RecordPusher):          print(counts)          return counts +  class KafkaBs4XmlPusher(RecordPusher):      """      Fetch XML for an article from Kafka, parse via Bs4.      """ +      def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):          self.importer = importer          self.consumer = make_kafka_consumer( @@ -716,10 +724,10 @@ class KafkaBs4XmlPusher(RecordPusher):              kafka_env,              topic_suffix,              group, -            kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') +            kafka_namespace=kwargs.get("kafka_namespace", "fatcat"),          ) -        self.poll_interval = kwargs.get('poll_interval', 5.0) -        self.consume_batch_size = kwargs.get('consume_batch_size', 25) +        self.poll_interval = kwargs.get("poll_interval", 5.0) +        self.consume_batch_size = kwargs.get("consume_batch_size", 25)      def run(self):          count = 0 @@ -735,16 +743,19 @@ class KafkaBs4XmlPusher(RecordPusher):              # outstanding editgroups every 5 minutes, but there is still that              # window when editgroups might be hanging (unsubmitted).              batch = self.consumer.consume( -                num_messages=self.consume_batch_size, -                timeout=self.poll_interval) -            print("... got {} kafka messages ({}sec poll interval) {}".format( -                len(batch), self.poll_interval, self.importer.counts)) +                num_messages=self.consume_batch_size, timeout=self.poll_interval +            ) +            print( +                "... got {} kafka messages ({}sec poll interval) {}".format( +                    len(batch), self.poll_interval, self.importer.counts +                ) +            )              if not batch:                  if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):                      # it has been some time, so flush any current editgroup                      self.importer.finish()                      last_push = datetime.datetime.now() -                    #print("Flushed any partial import batch: {}".format(self.importer.counts)) +                    # print("Flushed any partial import batch: {}".format(self.importer.counts))                  continue              # first check errors on entire batch...              for msg in batch: @@ -752,7 +763,7 @@ class KafkaBs4XmlPusher(RecordPusher):                      raise KafkaException(msg.error())              # ... then process              for msg in batch: -                soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") +                soup = BeautifulSoup(msg.value().decode("utf-8"), "xml")                  self.importer.push_record(soup)                  soup.decompose()                  count += 1 @@ -771,8 +782,8 @@ class KafkaBs4XmlPusher(RecordPusher):          self.consumer.close()          return counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher):      def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):          self.importer = importer          self.consumer = make_kafka_consumer( @@ -780,11 +791,11 @@ class KafkaJsonPusher(RecordPusher):              kafka_env,              topic_suffix,              group, -            kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') +            kafka_namespace=kwargs.get("kafka_namespace", "fatcat"),          ) -        self.poll_interval = kwargs.get('poll_interval', 5.0) -        self.consume_batch_size = kwargs.get('consume_batch_size', 100) -        self.force_flush = kwargs.get('force_flush', False) +        self.poll_interval = kwargs.get("poll_interval", 5.0) +        self.consume_batch_size = kwargs.get("consume_batch_size", 100) +        self.force_flush = kwargs.get("force_flush", False)      def run(self):          count = 0 @@ -801,10 +812,13 @@ class KafkaJsonPusher(RecordPusher):              # outstanding editgroups every 5 minutes, but there is still that              # window when editgroups might be hanging (unsubmitted).              batch = self.consumer.consume( -                num_messages=self.consume_batch_size, -                timeout=self.poll_interval) -            print("... got {} kafka messages ({}sec poll interval) {}".format( -                len(batch), self.poll_interval, self.importer.counts)) +                num_messages=self.consume_batch_size, timeout=self.poll_interval +            ) +            print( +                "... got {} kafka messages ({}sec poll interval) {}".format( +                    len(batch), self.poll_interval, self.importer.counts +                ) +            )              if self.force_flush:                  # this flushing happens even if there have been 'push' events                  # more recently. it is intended for, eg, importers off the @@ -821,7 +835,7 @@ class KafkaJsonPusher(RecordPusher):                      self.importer.finish()                      last_push = datetime.datetime.now()                      last_force_flush = datetime.datetime.now() -                    #print("Flushed any partial import batch: {}".format(self.importer.counts)) +                    # print("Flushed any partial import batch: {}".format(self.importer.counts))                  continue              # first check errors on entire batch...              for msg in batch: @@ -829,7 +843,7 @@ class KafkaJsonPusher(RecordPusher):                      raise KafkaException(msg.error())              # ... then process              for msg in batch: -                record = json.loads(msg.value().decode('utf-8')) +                record = json.loads(msg.value().decode("utf-8"))                  self.importer.push_record(record)                  count += 1                  if count % 500 == 0: @@ -864,25 +878,25 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat                  print("Bailing out...")                  # TODO: should it be sys.exit(-1)?                  raise KafkaException(p.error) -        #print("Kafka consumer commit successful") +        # print("Kafka consumer commit successful")          pass      # previously, using pykafka -    #auto_commit_enable=True, -    #auto_commit_interval_ms=30000, # 30 seconds +    # auto_commit_enable=True, +    # auto_commit_interval_ms=30000, # 30 seconds      conf = { -        'bootstrap.servers': hosts, -        'group.id': group, -        'on_commit': fail_fast, +        "bootstrap.servers": hosts, +        "group.id": group, +        "on_commit": fail_fast,          # messages don't have offset marked as stored until pushed to          # elastic, but we do auto-commit stored offsets to broker -        'enable.auto.offset.store': False, -        'enable.auto.commit': True, +        "enable.auto.offset.store": False, +        "enable.auto.commit": True,          # user code timeout; if no poll after this long, assume user code          # hung and rebalance (default: 5min) -        'max.poll.interval.ms': 120000, -        'default.topic.config': { -            'auto.offset.reset': 'latest', +        "max.poll.interval.ms": 120000, +        "default.topic.config": { +            "auto.offset.reset": "latest",          },      } @@ -890,13 +904,13 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat          for p in partitions:              if p.error:                  raise KafkaException(p.error) -        print("Kafka partitions rebalanced: {} / {}".format( -            consumer, partitions)) +        print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions))      consumer = Consumer(conf)      # NOTE: it's actually important that topic_name *not* be bytes (UTF-8      # encoded) -    consumer.subscribe([topic_name], +    consumer.subscribe( +        [topic_name],          on_assign=on_rebalance,          on_revoke=on_rebalance,      )  | 
