aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r--python/fatcat_tools/importers/common.py198
1 files changed, 106 insertions, 92 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index e33a2012..2639c85a 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -1,4 +1,3 @@
-
import csv
import datetime
import json
@@ -34,7 +33,6 @@ SANE_MAX_URLS: int = 100
DOMAIN_REL_MAP: Dict[str, str] = {
"archive.org": "archive",
# LOCKSS, Portico, DuraSpace, etc would also be "archive"
-
"arxiv.org": "repository",
"babel.hathitrust.org": "repository",
"cds.cern.ch": "repository",
@@ -53,7 +51,6 @@ DOMAIN_REL_MAP: Dict[str, str] = {
"zenodo.org": "repository",
"www.biorxiv.org": "repository",
"www.medrxiv.org": "repository",
-
"citeseerx.ist.psu.edu": "aggregator",
"publisher-connector.core.ac.uk": "aggregator",
"core.ac.uk": "aggregator",
@@ -62,7 +59,6 @@ DOMAIN_REL_MAP: Dict[str, str] = {
"pdfs.semanticscholar.org": "aggregator",
"semanticscholar.org": "aggregator",
"www.semanticscholar.org": "aggregator",
-
"academic.oup.com": "publisher",
"cdn.elifesciences.org": "publisher",
"cell.com": "publisher",
@@ -86,15 +82,14 @@ DOMAIN_REL_MAP: Dict[str, str] = {
"ehp.niehs.nih.gov": "publisher",
"journals.tsu.ru": "publisher",
"www.cogentoa.com": "publisher",
-
"www.researchgate.net": "academicsocial",
"academia.edu": "academicsocial",
-
"wayback.archive-it.org": "webarchive",
"web.archive.org": "webarchive",
"archive.is": "webarchive",
}
+
def make_rel_url(raw_url: str, default_link_rel: str = "web"):
# this is where we map specific domains to rel types, and also filter out
# bad domains, invalid URLs, etc
@@ -105,12 +100,17 @@ def make_rel_url(raw_url: str, default_link_rel: str = "web"):
break
return (rel, raw_url)
+
def test_make_rel_url():
assert make_rel_url("http://example.com/thing.pdf")[0] == "web"
assert make_rel_url("http://example.com/thing.pdf", default_link_rel="jeans")[0] == "jeans"
- assert make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] == "webarchive"
+ assert (
+ make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0]
+ == "webarchive"
+ )
assert make_rel_url("http://cell.com/thing.pdf")[0] == "publisher"
+
class EntityImporter:
"""
Base class for fatcat entity importers.
@@ -147,23 +147,26 @@ class EntityImporter:
def __init__(self, api, **kwargs):
- eg_extra = kwargs.get('editgroup_extra', dict())
- eg_extra['git_rev'] = eg_extra.get('git_rev',
- subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
- eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter')
+ eg_extra = kwargs.get("editgroup_extra", dict())
+ eg_extra["git_rev"] = eg_extra.get(
+ "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip()
+ ).decode("utf-8")
+ eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityImporter")
self.api = api
- self.do_updates = bool(kwargs.get('do_updates', True))
- self.do_fuzzy_match: bool = kwargs.get('do_fuzzy_match', True)
- self.bezerk_mode: bool = kwargs.get('bezerk_mode', False)
- self.submit_mode: bool = kwargs.get('submit_mode', False)
- self.edit_batch_size: int = kwargs.get('edit_batch_size', 100)
- self.editgroup_description: Optional[str] = kwargs.get('editgroup_description')
+ self.do_updates = bool(kwargs.get("do_updates", True))
+ self.do_fuzzy_match: bool = kwargs.get("do_fuzzy_match", True)
+ self.bezerk_mode: bool = kwargs.get("bezerk_mode", False)
+ self.submit_mode: bool = kwargs.get("submit_mode", False)
+ self.edit_batch_size: int = kwargs.get("edit_batch_size", 100)
+ self.editgroup_description: Optional[str] = kwargs.get("editgroup_description")
self.editgroup_extra: Optional[Any] = eg_extra
- self.es_client = kwargs.get('es_client')
+ self.es_client = kwargs.get("es_client")
if not self.es_client:
- self.es_client = elasticsearch.Elasticsearch("https://search.fatcat.wiki", timeout=120)
+ self.es_client = elasticsearch.Elasticsearch(
+ "https://search.fatcat.wiki", timeout=120
+ )
self._issnl_id_map: Dict[str, Any] = dict()
self._orcid_id_map: Dict[str, Any] = dict()
@@ -174,7 +177,7 @@ class EntityImporter:
self.reset()
def reset(self) -> None:
- self.counts = Counter({'total': 0, 'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
+ self.counts = Counter({"total": 0, "skip": 0, "insert": 0, "update": 0, "exists": 0})
self._edit_count: int = 0
self._editgroup_id: Optional[str] = None
self._entity_queue: List[Any] = []
@@ -184,13 +187,13 @@ class EntityImporter:
"""
Returns nothing.
"""
- self.counts['total'] += 1
+ self.counts["total"] += 1
if (not raw_record) or (not self.want(raw_record)):
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
return
entity = self.parse_record(raw_record)
if not entity:
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
return
if self.bezerk_mode:
self.push_entity(entity)
@@ -230,7 +233,7 @@ class EntityImporter:
if self._entity_queue:
self.insert_batch(self._entity_queue)
- self.counts['insert'] += len(self._entity_queue)
+ self.counts["insert"] += len(self._entity_queue)
self._entity_queue = []
return self.counts
@@ -248,8 +251,9 @@ class EntityImporter:
if not self._editgroup_id:
eg = self.api.create_editgroup(
fatcat_openapi_client.Editgroup(
- description=self.editgroup_description,
- extra=self.editgroup_extra))
+ description=self.editgroup_description, extra=self.editgroup_extra
+ )
+ )
self._editgroup_id = eg.editgroup_id
self._edit_count += edits
@@ -257,30 +261,30 @@ class EntityImporter:
def create_container(self, entity):
eg_id = self.get_editgroup_id()
- self.counts['inserted.container'] += 1
+ self.counts["inserted.container"] += 1
return self.api.create_container(eg_id, entity)
def create_release(self, entity):
eg_id = self.get_editgroup_id()
- self.counts['inserted.release'] += 1
+ self.counts["inserted.release"] += 1
return self.api.create_release(eg_id, entity)
def create_file(self, entity):
eg_id = self.get_editgroup_id()
- self.counts['inserted.file'] += 1
+ self.counts["inserted.file"] += 1
return self.api.create_file(eg_id, entity)
def updated(self):
"""
Implementations should call this from try_update() if the update was successful
"""
- self.counts['update'] += 1
+ self.counts["update"] += 1
def push_entity(self, entity):
self._entity_queue.append(entity)
if len(self._entity_queue) >= self.edit_batch_size:
self.insert_batch(self._entity_queue)
- self.counts['insert'] += len(self._entity_queue)
+ self.counts["insert"] += len(self._entity_queue)
self._entity_queue = []
def want(self, raw_record: Any) -> bool:
@@ -324,7 +328,7 @@ class EntityImporter:
# If anything other than a 404 (not found), something is wrong
if ae.status != 404:
raise ae
- self._orcid_id_map[orcid] = creator_id # might be None
+ self._orcid_id_map[orcid] = creator_id # might be None
return creator_id
def is_doi(self, doi: str) -> bool:
@@ -347,7 +351,7 @@ class EntityImporter:
# If anything other than a 404 (not found), something is wrong
if ae.status != 404:
raise ae
- self._doi_id_map[doi] = release_id # might be None
+ self._doi_id_map[doi] = release_id # might be None
return release_id
def lookup_pmid(self, pmid: str):
@@ -364,11 +368,11 @@ class EntityImporter:
# If anything other than a 404 (not found), something is wrong
if ae.status != 404:
raise ae
- self._pmid_id_map[pmid] = release_id # might be None
+ self._pmid_id_map[pmid] = release_id # might be None
return release_id
def is_issnl(self, issnl: str) -> bool:
- return len(issnl) == 9 and issnl[4] == '-'
+ return len(issnl) == 9 and issnl[4] == "-"
def lookup_issnl(self, issnl: str):
"""Caches calls to the ISSN-L lookup API endpoint in a local dict"""
@@ -382,7 +386,7 @@ class EntityImporter:
# If anything other than a 404 (not found), something is wrong
if ae.status != 404:
raise ae
- self._issnl_id_map[issnl] = container_id # might be None
+ self._issnl_id_map[issnl] = container_id # might be None
return container_id
def read_issn_map_file(self, issn_map_file):
@@ -417,26 +421,26 @@ class EntityImporter:
# update old/deprecated 'rel' on URLs
for i in range(len(existing.urls)):
u = existing.urls[i]
- if u.rel == 'repository' and '://archive.org/download/' in u.url:
- existing.urls[i].rel = 'archive'
- if u.rel == 'social':
- u.rel = 'academicsocial'
+ if u.rel == "repository" and "://archive.org/download/" in u.url:
+ existing.urls[i].rel = "archive"
+ if u.rel == "social":
+ u.rel = "academicsocial"
# remove URLs which are near-duplicates
redundant_urls = []
all_urls = [u.url for u in existing.urls]
- all_wayback_urls = [u.url for u in existing.urls if '://web.archive.org/web/' in u.url]
+ all_wayback_urls = [u.url for u in existing.urls if "://web.archive.org/web/" in u.url]
for url in all_urls:
# https/http redundancy
- if url.startswith('http://') and url.replace('http://', 'https://', 1) in all_urls:
+ if url.startswith("http://") and url.replace("http://", "https://", 1) in all_urls:
redundant_urls.append(url)
continue
# default HTTP port included and not included
- if ':80/' in url and url.replace(':80', '', 1) in all_urls:
+ if ":80/" in url and url.replace(":80", "", 1) in all_urls:
redundant_urls.append(url)
continue
# partial and complete wayback timestamps
- if '://web.archive.org/web/2017/' in url:
+ if "://web.archive.org/web/2017/" in url:
original_url = "/".join(url.split("/")[5:])
assert len(original_url) > 5
for wb_url in all_wayback_urls:
@@ -452,7 +456,9 @@ class EntityImporter:
def generic_fileset_cleanups(existing):
return existing
- def match_existing_release_fuzzy(self, release: ReleaseEntity) -> Optional[Tuple[str, str, ReleaseEntity]]:
+ def match_existing_release_fuzzy(
+ self, release: ReleaseEntity
+ ) -> Optional[Tuple[str, str, ReleaseEntity]]:
"""
This helper function uses fuzzycat (and elasticsearch) to look for
existing release entities with similar metadata.
@@ -488,7 +494,15 @@ class EntityImporter:
return None
release_dict = entity_to_dict(release, api_client=self.api.api_client)
- verified = [(fuzzycat.verify.verify(release_dict, entity_to_dict(c, api_client=self.api.api_client)), c) for c in candidates]
+ verified = [
+ (
+ fuzzycat.verify.verify(
+ release_dict, entity_to_dict(c, api_client=self.api.api_client)
+ ),
+ c,
+ )
+ for c in candidates
+ ]
# chose the "closest" match
closest = sorted(verified, key=lambda v: STATUS_SORT[v[0].status])[0]
@@ -522,7 +536,6 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
-
def __init__(self, importer, json_file, **kwargs):
self.importer = importer
self.json_file = json_file
@@ -539,10 +552,9 @@ class JsonLinePusher(RecordPusher):
class CsvPusher(RecordPusher):
-
def __init__(self, importer, csv_file, **kwargs):
self.importer = importer
- self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ','))
+ self.reader = csv.DictReader(csv_file, delimiter=kwargs.get("delimiter", ","))
def run(self):
for line in self.reader:
@@ -555,7 +567,6 @@ class CsvPusher(RecordPusher):
class LinePusher(RecordPusher):
-
def __init__(self, importer, text_file, **kwargs):
self.importer = importer
self.text_file = text_file
@@ -571,17 +582,15 @@ class LinePusher(RecordPusher):
class SqlitePusher(RecordPusher):
-
def __init__(self, importer, db_file, table_name, where_clause="", **kwargs):
self.importer = importer
- self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE')
+ self.db = sqlite3.connect(db_file, isolation_level="EXCLUSIVE")
self.db.row_factory = sqlite3.Row
self.table_name = table_name
self.where_clause = where_clause
def run(self):
- cur = self.db.execute("SELECT * FROM {} {};".format(
- self.table_name, self.where_clause))
+ cur = self.db.execute("SELECT * FROM {} {};".format(self.table_name, self.where_clause))
for row in cur:
self.importer.push_record(row)
counts = self.importer.finish()
@@ -590,7 +599,6 @@ class SqlitePusher(RecordPusher):
class Bs4XmlLinesPusher(RecordPusher):
-
def __init__(self, importer, xml_file, prefix_filter=None, **kwargs):
self.importer = importer
self.xml_file = xml_file
@@ -611,7 +619,6 @@ class Bs4XmlLinesPusher(RecordPusher):
class Bs4XmlFilePusher(RecordPusher):
-
def __init__(self, importer, xml_file, record_tag, **kwargs):
self.importer = importer
self.xml_file = xml_file
@@ -684,7 +691,6 @@ class Bs4XmlLargeFilePusher(RecordPusher):
class Bs4XmlFileListPusher(RecordPusher):
-
def __init__(self, importer, list_file, record_tag, **kwargs):
self.importer = importer
self.list_file = list_file
@@ -695,7 +701,7 @@ class Bs4XmlFileListPusher(RecordPusher):
xml_path = xml_path.strip()
if not xml_path or xml_path.startswith("#"):
continue
- with open(xml_path, 'r') as xml_file:
+ with open(xml_path, "r") as xml_file:
soup = BeautifulSoup(xml_file, "xml")
for record in soup.find_all(self.record_tag):
self.importer.push_record(record)
@@ -705,10 +711,12 @@ class Bs4XmlFileListPusher(RecordPusher):
print(counts)
return counts
+
class KafkaBs4XmlPusher(RecordPusher):
"""
Fetch XML for an article from Kafka, parse via Bs4.
"""
+
def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
self.importer = importer
self.consumer = make_kafka_consumer(
@@ -716,10 +724,10 @@ class KafkaBs4XmlPusher(RecordPusher):
kafka_env,
topic_suffix,
group,
- kafka_namespace=kwargs.get('kafka_namespace', 'fatcat')
+ kafka_namespace=kwargs.get("kafka_namespace", "fatcat"),
)
- self.poll_interval = kwargs.get('poll_interval', 5.0)
- self.consume_batch_size = kwargs.get('consume_batch_size', 25)
+ self.poll_interval = kwargs.get("poll_interval", 5.0)
+ self.consume_batch_size = kwargs.get("consume_batch_size", 25)
def run(self):
count = 0
@@ -735,16 +743,19 @@ class KafkaBs4XmlPusher(RecordPusher):
# outstanding editgroups every 5 minutes, but there is still that
# window when editgroups might be hanging (unsubmitted).
batch = self.consumer.consume(
- num_messages=self.consume_batch_size,
- timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval) {}".format(
- len(batch), self.poll_interval, self.importer.counts))
+ num_messages=self.consume_batch_size, timeout=self.poll_interval
+ )
+ print(
+ "... got {} kafka messages ({}sec poll interval) {}".format(
+ len(batch), self.poll_interval, self.importer.counts
+ )
+ )
if not batch:
if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
# it has been some time, so flush any current editgroup
self.importer.finish()
last_push = datetime.datetime.now()
- #print("Flushed any partial import batch: {}".format(self.importer.counts))
+ # print("Flushed any partial import batch: {}".format(self.importer.counts))
continue
# first check errors on entire batch...
for msg in batch:
@@ -752,7 +763,7 @@ class KafkaBs4XmlPusher(RecordPusher):
raise KafkaException(msg.error())
# ... then process
for msg in batch:
- soup = BeautifulSoup(msg.value().decode('utf-8'), "xml")
+ soup = BeautifulSoup(msg.value().decode("utf-8"), "xml")
self.importer.push_record(soup)
soup.decompose()
count += 1
@@ -771,8 +782,8 @@ class KafkaBs4XmlPusher(RecordPusher):
self.consumer.close()
return counts
-class KafkaJsonPusher(RecordPusher):
+class KafkaJsonPusher(RecordPusher):
def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
self.importer = importer
self.consumer = make_kafka_consumer(
@@ -780,11 +791,11 @@ class KafkaJsonPusher(RecordPusher):
kafka_env,
topic_suffix,
group,
- kafka_namespace=kwargs.get('kafka_namespace', 'fatcat')
+ kafka_namespace=kwargs.get("kafka_namespace", "fatcat"),
)
- self.poll_interval = kwargs.get('poll_interval', 5.0)
- self.consume_batch_size = kwargs.get('consume_batch_size', 100)
- self.force_flush = kwargs.get('force_flush', False)
+ self.poll_interval = kwargs.get("poll_interval", 5.0)
+ self.consume_batch_size = kwargs.get("consume_batch_size", 100)
+ self.force_flush = kwargs.get("force_flush", False)
def run(self):
count = 0
@@ -801,10 +812,13 @@ class KafkaJsonPusher(RecordPusher):
# outstanding editgroups every 5 minutes, but there is still that
# window when editgroups might be hanging (unsubmitted).
batch = self.consumer.consume(
- num_messages=self.consume_batch_size,
- timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval) {}".format(
- len(batch), self.poll_interval, self.importer.counts))
+ num_messages=self.consume_batch_size, timeout=self.poll_interval
+ )
+ print(
+ "... got {} kafka messages ({}sec poll interval) {}".format(
+ len(batch), self.poll_interval, self.importer.counts
+ )
+ )
if self.force_flush:
# this flushing happens even if there have been 'push' events
# more recently. it is intended for, eg, importers off the
@@ -821,7 +835,7 @@ class KafkaJsonPusher(RecordPusher):
self.importer.finish()
last_push = datetime.datetime.now()
last_force_flush = datetime.datetime.now()
- #print("Flushed any partial import batch: {}".format(self.importer.counts))
+ # print("Flushed any partial import batch: {}".format(self.importer.counts))
continue
# first check errors on entire batch...
for msg in batch:
@@ -829,7 +843,7 @@ class KafkaJsonPusher(RecordPusher):
raise KafkaException(msg.error())
# ... then process
for msg in batch:
- record = json.loads(msg.value().decode('utf-8'))
+ record = json.loads(msg.value().decode("utf-8"))
self.importer.push_record(record)
count += 1
if count % 500 == 0:
@@ -864,25 +878,25 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat
print("Bailing out...")
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
- #print("Kafka consumer commit successful")
+ # print("Kafka consumer commit successful")
pass
# previously, using pykafka
- #auto_commit_enable=True,
- #auto_commit_interval_ms=30000, # 30 seconds
+ # auto_commit_enable=True,
+ # auto_commit_interval_ms=30000, # 30 seconds
conf = {
- 'bootstrap.servers': hosts,
- 'group.id': group,
- 'on_commit': fail_fast,
+ "bootstrap.servers": hosts,
+ "group.id": group,
+ "on_commit": fail_fast,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
- 'enable.auto.offset.store': False,
- 'enable.auto.commit': True,
+ "enable.auto.offset.store": False,
+ "enable.auto.commit": True,
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 5min)
- 'max.poll.interval.ms': 120000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
+ "max.poll.interval.ms": 120000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
},
}
@@ -890,13 +904,13 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions))
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions))
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
# encoded)
- consumer.subscribe([topic_name],
+ consumer.subscribe(
+ [topic_name],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)