summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--proposals/2020_sql_size_reduction.md16
-rwxr-xr-xpython/fatcat_ingest.py2
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py14
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py10
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py15
-rw-r--r--python/fatcat_tools/importers/ingest.py14
-rw-r--r--python/fatcat_tools/workers/changelog.py2
7 files changed, 52 insertions, 21 deletions
diff --git a/proposals/2020_sql_size_reduction.md b/proposals/2020_sql_size_reduction.md
index f421e455..2fa39873 100644
--- a/proposals/2020_sql_size_reduction.md
+++ b/proposals/2020_sql_size_reduction.md
@@ -52,6 +52,8 @@ Other growth is expected to be much smaller, let's say a few GB of disk.
This works out to a bit over 600 GByte total disk size.
+NOTE: math was wrong? 470 + 80 + 100 -> 650 GByte, call it 700 GByte
+
## Idea: finish `ext_id` migration and drop columns+index from `release_rev`
@@ -172,3 +174,17 @@ would drop ~20% of data size and ~20% of index size.
Would it make more sense to use {ident, editgroup} as the primary key and UNIQ,
then have a separate index on `editgroup`? On the assumption that `editgroup`
cardinality is much smaller, thus the index disk usage would be smaller.
+
+## Idea: use binary for hashes
+
+We currently store file hashes (SHA-1, SHA-256, MD5) and abstracts/`ref_blobs`
+keys as TEXT in lower-case hex encoding. Using binary instead could be as much
+as a 50% size savings for both column and index storage. The difference becomes
+more apparent when all files have all hashes populated.
+
+base32 encoded strings would be smaller (but non-negligable) savings.
+
+This change has a reasonable migration path, is entirely internal to postgres
+and fatcatd, and would be no change to API schema. Postgres also allows `hex`
+encoding on `bytea` data type, which can make reading/debugging reasonable.
+
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
index 0715da6e..6fda74c5 100755
--- a/python/fatcat_ingest.py
+++ b/python/fatcat_ingest.py
@@ -86,7 +86,7 @@ def _run_search_dump(args, search):
release = args.api.get_release(esr.ident)
ingest_request = release_ingest_request(
release,
- ingest_request_source="fatcat-ingest-container",
+ ingest_request_source="fatcat-ingest",
)
if not ingest_request:
continue
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 33f44600..d2d71d3c 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -70,8 +70,8 @@ class HarvestCrossrefWorker:
def fail_fast(err, msg):
if err is not None:
- print("Kafka producer delivery error: {}".format(err))
- print("Bailing out...")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -117,7 +117,7 @@ class HarvestCrossrefWorker:
if http_resp.status_code == 503:
# crude backoff; now redundant with session exponential
# backoff, but allows for longer backoff/downtime on remote end
- print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))
+ print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr)
# keep kafka producer connection alive
self.producer.poll(0)
time.sleep(30.0)
@@ -131,7 +131,7 @@ class HarvestCrossrefWorker:
items = self.extract_items(resp)
count += len(items)
print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
- self.extract_total(resp), http_resp.elapsed))
+ self.extract_total(resp), http_resp.elapsed), file=sys.stderr)
#print(json.dumps(resp))
for work in items:
self.producer.produce(
@@ -156,7 +156,7 @@ class HarvestCrossrefWorker:
while True:
current = self.state.next(continuous)
if current:
- print("Fetching DOIs updated on {} (UTC)".format(current))
+ print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
self.state.complete(current,
kafka_topic=self.state_topic,
@@ -164,11 +164,11 @@ class HarvestCrossrefWorker:
continue
if continuous:
- print("Sleeping {} seconds...".format(self.loop_sleep))
+ print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)
time.sleep(self.loop_sleep)
else:
break
- print("{} DOI ingest caught up".format(self.name))
+ print("{} DOI ingest caught up".format(self.name), file=sys.stderr)
class HarvestDataciteWorker(HarvestCrossrefWorker):
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index 78830a1c..310366bd 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -57,6 +57,10 @@ class HarvestState:
if catchup_days or start_date or end_date:
self.enqueue_period(start_date, end_date, catchup_days)
+ def __str__(self):
+ return '<HarvestState to_process={}, completed={}>'.format(
+ len(self.to_process), len(self.completed))
+
def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
"""
This function adds a time period to the "TODO" list, unless the dates
@@ -129,7 +133,7 @@ class HarvestState:
def fail_fast(err, msg):
if err:
raise KafkaException(err)
- print("Commiting status to Kafka: {}".format(kafka_topic))
+ print("Commiting status to Kafka: {}".format(kafka_topic), file=sys.stderr)
producer_conf = kafka_config.copy()
producer_conf.update({
'delivery.report.only.error': True,
@@ -154,7 +158,7 @@ class HarvestState:
if not kafka_topic:
return
- print("Fetching state from kafka topic: {}".format(kafka_topic))
+ print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)
def fail_fast(err, msg):
if err:
raise KafkaException(err)
@@ -191,4 +195,4 @@ class HarvestState:
# verify that we got at least to HWM
assert c >= hwm[1]
- print("... got {} state update messages, done".format(c))
+ print("... got {} state update messages, done".format(c), file=sys.stderr)
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index f908ba83..11b5fa0a 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -49,13 +49,14 @@ class HarvestOaiPmhWorker:
self.name = "unnamed"
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
+ print(self.state, file=sys.stderr)
def fetch_date(self, date):
def fail_fast(err, msg):
if err is not None:
- print("Kafka producer delivery error: {}".format(err))
- print("Bailing out...")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -79,14 +80,14 @@ class HarvestOaiPmhWorker:
'until': date_str,
})
except sickle.oaiexceptions.NoRecordsMatch:
- print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str))
+ print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr)
return
count = 0
for item in records:
count += 1
if count % 50 == 0:
- print("... up to {}".format(count))
+ print("... up to {}".format(count), file=sys.stderr)
producer.produce(
self.produce_topic,
item.raw.encode('utf-8'),
@@ -99,7 +100,7 @@ class HarvestOaiPmhWorker:
while True:
current = self.state.next(continuous)
if current:
- print("Fetching DOIs updated on {} (UTC)".format(current))
+ print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
self.state.complete(current,
kafka_topic=self.state_topic,
@@ -107,11 +108,11 @@ class HarvestOaiPmhWorker:
continue
if continuous:
- print("Sleeping {} seconds...".format(self.loop_sleep))
+ print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)
time.sleep(self.loop_sleep)
else:
break
- print("{} OAI-PMH ingest caught up".format(self.name))
+ print("{} OAI-PMH ingest caught up".format(self.name), file=sys.stderr)
class HarvestArxivWorker(HarvestOaiPmhWorker):
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index fdaba176..4772bfaa 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -32,8 +32,11 @@ class IngestFileResultImporter(EntityImporter):
'fatcat-ingest',
'arabesque',
'mag-corpus',
+ 'mag',
'unpaywall-corpus',
+ 'unpaywall',
's2-corpus',
+ 's2',
]
if kwargs.get('skip_source_whitelist', False):
self.ingest_request_source_whitelist = []
@@ -137,7 +140,12 @@ class IngestFileResultImporter(EntityImporter):
if not 'terminal_dt' in terminal:
terminal['terminal_dt'] = terminal['dt']
assert len(terminal['terminal_dt']) == 14
- url = make_rel_url(terminal['terminal_url'], self.default_link_rel)
+
+ default_rel = self.default_link_rel
+ if request.get('link_source') == 'doi':
+ default_rel = 'publisher'
+ default_rel = request.get('rel', default_rel)
+ url = make_rel_url(terminal['terminal_url'], default_rel)
if not url:
self.counts['skip-url'] += 1
@@ -158,8 +166,8 @@ class IngestFileResultImporter(EntityImporter):
release_ids=[release_ident],
urls=urls,
)
- if fatcat and fatcat.get('edit_extra'):
- fe.edit_extra = fatcat['edit_extra']
+ if request.get('edit_extra'):
+ fe.edit_extra = request['edit_extra']
else:
fe.edit_extra = dict()
if request.get('ingest_request_source'):
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 7a9a585d..745ee85a 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -105,6 +105,8 @@ class EntityUpdatesWorker(FatcatWorker):
self.live_pdf_ingest_doi_prefix_acceptlist = [
# biorxiv and medrxiv
"10.1101/",
+ # researchgate
+ "10.13140/",
]
def want_live_ingest(self, release, ingest_request):