diff options
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 65 | ||||
-rw-r--r-- | python/fatcat_tools/importers/crossref.py | 8 | ||||
-rw-r--r-- | python/fatcat_tools/importers/datacite.py | 10 | ||||
-rw-r--r-- | python/fatcat_tools/importers/jalc.py | 12 | ||||
-rw-r--r-- | python/fatcat_tools/importers/pubmed.py | 27 |
6 files changed, 117 insertions, 7 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 10557ef8..c26446fd 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug from .datacite import DataciteImporter from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 694ef359..da611ecb 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts +class KafkaBs4XmlPusher(RecordPusher): + """ + Fetch XML for an article from Kafka, parse via Bs4. + """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 25) + + def run(self): + count = 0 + last_push = datetime.datetime.now() + while True: + # Note: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). + batch = self.consumer.consume( + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) + if not batch: + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + self.importer.push_record(soup) + soup.decompose() + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + counts = self.importer.finish() + print(counts) + self.consumer.close() + return counts class KafkaJsonPusher(RecordPusher): diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 18703a1a..9617299c 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -163,6 +163,14 @@ class CrossrefImporter(EntityImporter): self.counts['skip-blank-title'] += 1 return False + # these are pre-registered DOIs before the actual record is ready + # title is a list of titles + if obj.get('title')[0].strip().lower() in [ + "OUP accepted manuscript".lower(), + ]: + self.counts['skip-stub-title'] += 1 + return False + # do most of these checks in-line below return True diff --git a/python/fatcat_tools/importers/datacite.py b/python/fatcat_tools/importers/datacite.py index 9250fc5e..81f00876 100644 --- a/python/fatcat_tools/importers/datacite.py +++ b/python/fatcat_tools/importers/datacite.py @@ -222,6 +222,7 @@ class DataciteImporter(EntityImporter): self.read_issn_map_file(issn_map_file) self.debug = debug self.insert_log_file = insert_log_file + self.this_year = datetime.datetime.now().year print('datacite with debug={}'.format(self.debug), file=sys.stderr) @@ -311,6 +312,12 @@ class DataciteImporter(EntityImporter): release_date, release_month, release_year = parse_datacite_dates( attributes.get('dates', [])) + # block bogus far-future years/dates + if release_year is not None and (release_year > (self.this_year + 5) or release_year < 1000): + release_date = None + release_month = None + release_year = None + # Some records do not use the "dates" field (e.g. micropub), but: # "attributes.published" or "attributes.publicationYear" if not any((release_date, release_month, release_year)): @@ -714,7 +721,8 @@ class DataciteImporter(EntityImporter): name_scheme = nid.get('nameIdentifierScheme', '') or '' if not name_scheme.lower() == "orcid": continue - orcid = nid.get('nameIdentifier', '').replace('https://orcid.org/', '') + orcid = nid.get('nameIdentifier') or '' + orcid = orcid.replace('https://orcid.org/', '') if not orcid: continue creator_id = self.lookup_orcid(orcid) diff --git a/python/fatcat_tools/importers/jalc.py b/python/fatcat_tools/importers/jalc.py index a0e0086b..351a20a3 100644 --- a/python/fatcat_tools/importers/jalc.py +++ b/python/fatcat_tools/importers/jalc.py @@ -209,10 +209,14 @@ class JalcImporter(EntityImporter): release_year = int(date) pages = None - if record.startingPage: - pages = record.startingPage.string - if record.endingPage: - pages = "{}-{}".format(pages, record.endingPage.string) + if record.startingPage and record.startingPage.string.strip(): + pages = record.startingPage.string.strip() + if record.endingPage and record.endingPage.string.strip(): + pages = "{}-{}".format(pages, record.endingPage.string.strip()) + # double check to prevent "-" as pages + if pages and pages.strip() == '-': + pages = None + volume = None if record.volume: volume = record.volume.string diff --git a/python/fatcat_tools/importers/pubmed.py b/python/fatcat_tools/importers/pubmed.py index c32ce34a..3ecf5ef4 100644 --- a/python/fatcat_tools/importers/pubmed.py +++ b/python/fatcat_tools/importers/pubmed.py @@ -616,7 +616,10 @@ class PubmedImporter(EntityImporter): ### References refs = [] if pubmed.ReferenceList: - for ref in pubmed.ReferenceList.find_all('Reference'): + # note that Reference always exists within a ReferenceList, but + # that there may be multiple ReferenceList (eg, sometimes one per + # Reference) + for ref in pubmed.find_all('Reference'): ref_extra = dict() ref_doi = ref.find("ArticleId", IdType="doi") if ref_doi: @@ -729,8 +732,29 @@ class PubmedImporter(EntityImporter): existing.ext_ids.doi = existing.ext_ids.doi or re.ext_ids.doi existing.ext_ids.pmid = existing.ext_ids.pmid or re.ext_ids.pmid existing.ext_ids.pmcid = existing.ext_ids.pmcid or re.ext_ids.pmcid + + existing.container_id = existing.container_id or re.container_id existing.refs = existing.refs or re.refs + existing.abstracts = existing.abstracts or re.abstracts existing.extra['pubmed'] = re.extra['pubmed'] + + # fix stub titles + if existing.title in [ + "OUP accepted manuscript", + ]: + existing.title = re.title + + existing.original_title = existing.original_title or re.original_title + existing.release_type = existing.release_type or re.release_type + existing.release_stage = existing.release_stage or re.release_stage + existing.release_date = existing.release_date or re.release_date + existing.release_year = existing.release_year or re.release_year + existing.withdrawn_status = existing.withdrawn_status or re.withdrawn_status + existing.volume = existing.volume or re.volume + existing.issue = existing.issue or re.issue + existing.pages = existing.pages or re.pages + existing.language = existing.language or re.language + # update subtitle in-place first if not existing.subtitle and existing.extra.get('subtitle'): subtitle = existing.extra.pop('subtitle') @@ -740,6 +764,7 @@ class PubmedImporter(EntityImporter): existing.subtitle = subtitle if not existing.subtitle: existing.subtitle = re.subtitle + try: self.api.update_release(self.get_editgroup_id(), existing.ident, existing) self.counts['update'] += 1 |