summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/common.py65
-rw-r--r--python/fatcat_tools/importers/crossref.py8
-rw-r--r--python/fatcat_tools/importers/datacite.py10
-rw-r--r--python/fatcat_tools/importers/jalc.py12
-rw-r--r--python/fatcat_tools/importers/pubmed.py27
6 files changed, 117 insertions, 7 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 10557ef8..c26446fd 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:
"""
-from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC
+from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug
from .datacite import DataciteImporter
from .jalc import JalcImporter
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 694ef359..da611ecb 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher):
print(counts)
return counts
+class KafkaBs4XmlPusher(RecordPusher):
+ """
+ Fetch XML for an article from Kafka, parse via Bs4.
+ """
+ def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ self.importer = importer
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ kafka_env,
+ topic_suffix,
+ group,
+ kafka_namespace=kwargs.get('kafka_namespace', 'fatcat')
+ )
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.consume_batch_size = kwargs.get('consume_batch_size', 25)
+
+ def run(self):
+ count = 0
+ last_push = datetime.datetime.now()
+ while True:
+ # Note: this is batch-oriented, because underlying importer is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in importer, and if importer crashed
+ # never created.
+ # This is partially mitigated for the worker case by flushing any
+ # outstanding editgroups every 5 minutes, but there is still that
+ # window when editgroups might be hanging (unsubmitted).
+ batch = self.consumer.consume(
+ num_messages=self.consume_batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval))
+ if not batch:
+ if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
+ # it has been some time, so flush any current editgroup
+ self.importer.finish()
+ last_push = datetime.datetime.now()
+ #print("Flushed any partial import batch: {}".format(self.importer.counts))
+ continue
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ for msg in batch:
+ soup = BeautifulSoup(msg.value().decode('utf-8'), "xml")
+ self.importer.push_record(soup)
+ soup.decompose()
+ count += 1
+ if count % 500 == 0:
+ print("Import counts: {}".format(self.importer.counts))
+ last_push = datetime.datetime.now()
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ counts = self.importer.finish()
+ print(counts)
+ self.consumer.close()
+ return counts
class KafkaJsonPusher(RecordPusher):
diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py
index 18703a1a..9617299c 100644
--- a/python/fatcat_tools/importers/crossref.py
+++ b/python/fatcat_tools/importers/crossref.py
@@ -163,6 +163,14 @@ class CrossrefImporter(EntityImporter):
self.counts['skip-blank-title'] += 1
return False
+ # these are pre-registered DOIs before the actual record is ready
+ # title is a list of titles
+ if obj.get('title')[0].strip().lower() in [
+ "OUP accepted manuscript".lower(),
+ ]:
+ self.counts['skip-stub-title'] += 1
+ return False
+
# do most of these checks in-line below
return True
diff --git a/python/fatcat_tools/importers/datacite.py b/python/fatcat_tools/importers/datacite.py
index 9250fc5e..81f00876 100644
--- a/python/fatcat_tools/importers/datacite.py
+++ b/python/fatcat_tools/importers/datacite.py
@@ -222,6 +222,7 @@ class DataciteImporter(EntityImporter):
self.read_issn_map_file(issn_map_file)
self.debug = debug
self.insert_log_file = insert_log_file
+ self.this_year = datetime.datetime.now().year
print('datacite with debug={}'.format(self.debug), file=sys.stderr)
@@ -311,6 +312,12 @@ class DataciteImporter(EntityImporter):
release_date, release_month, release_year = parse_datacite_dates(
attributes.get('dates', []))
+ # block bogus far-future years/dates
+ if release_year is not None and (release_year > (self.this_year + 5) or release_year < 1000):
+ release_date = None
+ release_month = None
+ release_year = None
+
# Some records do not use the "dates" field (e.g. micropub), but:
# "attributes.published" or "attributes.publicationYear"
if not any((release_date, release_month, release_year)):
@@ -714,7 +721,8 @@ class DataciteImporter(EntityImporter):
name_scheme = nid.get('nameIdentifierScheme', '') or ''
if not name_scheme.lower() == "orcid":
continue
- orcid = nid.get('nameIdentifier', '').replace('https://orcid.org/', '')
+ orcid = nid.get('nameIdentifier') or ''
+ orcid = orcid.replace('https://orcid.org/', '')
if not orcid:
continue
creator_id = self.lookup_orcid(orcid)
diff --git a/python/fatcat_tools/importers/jalc.py b/python/fatcat_tools/importers/jalc.py
index a0e0086b..351a20a3 100644
--- a/python/fatcat_tools/importers/jalc.py
+++ b/python/fatcat_tools/importers/jalc.py
@@ -209,10 +209,14 @@ class JalcImporter(EntityImporter):
release_year = int(date)
pages = None
- if record.startingPage:
- pages = record.startingPage.string
- if record.endingPage:
- pages = "{}-{}".format(pages, record.endingPage.string)
+ if record.startingPage and record.startingPage.string.strip():
+ pages = record.startingPage.string.strip()
+ if record.endingPage and record.endingPage.string.strip():
+ pages = "{}-{}".format(pages, record.endingPage.string.strip())
+ # double check to prevent "-" as pages
+ if pages and pages.strip() == '-':
+ pages = None
+
volume = None
if record.volume:
volume = record.volume.string
diff --git a/python/fatcat_tools/importers/pubmed.py b/python/fatcat_tools/importers/pubmed.py
index c32ce34a..3ecf5ef4 100644
--- a/python/fatcat_tools/importers/pubmed.py
+++ b/python/fatcat_tools/importers/pubmed.py
@@ -616,7 +616,10 @@ class PubmedImporter(EntityImporter):
### References
refs = []
if pubmed.ReferenceList:
- for ref in pubmed.ReferenceList.find_all('Reference'):
+ # note that Reference always exists within a ReferenceList, but
+ # that there may be multiple ReferenceList (eg, sometimes one per
+ # Reference)
+ for ref in pubmed.find_all('Reference'):
ref_extra = dict()
ref_doi = ref.find("ArticleId", IdType="doi")
if ref_doi:
@@ -729,8 +732,29 @@ class PubmedImporter(EntityImporter):
existing.ext_ids.doi = existing.ext_ids.doi or re.ext_ids.doi
existing.ext_ids.pmid = existing.ext_ids.pmid or re.ext_ids.pmid
existing.ext_ids.pmcid = existing.ext_ids.pmcid or re.ext_ids.pmcid
+
+ existing.container_id = existing.container_id or re.container_id
existing.refs = existing.refs or re.refs
+ existing.abstracts = existing.abstracts or re.abstracts
existing.extra['pubmed'] = re.extra['pubmed']
+
+ # fix stub titles
+ if existing.title in [
+ "OUP accepted manuscript",
+ ]:
+ existing.title = re.title
+
+ existing.original_title = existing.original_title or re.original_title
+ existing.release_type = existing.release_type or re.release_type
+ existing.release_stage = existing.release_stage or re.release_stage
+ existing.release_date = existing.release_date or re.release_date
+ existing.release_year = existing.release_year or re.release_year
+ existing.withdrawn_status = existing.withdrawn_status or re.withdrawn_status
+ existing.volume = existing.volume or re.volume
+ existing.issue = existing.issue or re.issue
+ existing.pages = existing.pages or re.pages
+ existing.language = existing.language or re.language
+
# update subtitle in-place first
if not existing.subtitle and existing.extra.get('subtitle'):
subtitle = existing.extra.pop('subtitle')
@@ -740,6 +764,7 @@ class PubmedImporter(EntityImporter):
existing.subtitle = subtitle
if not existing.subtitle:
existing.subtitle = re.subtitle
+
try:
self.api.update_release(self.get_editgroup_id(), existing.ident, existing)
self.counts['update'] += 1