diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/fatcat_ingest.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 14 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 10 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 15 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 14 | ||||
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 2 |
6 files changed, 36 insertions, 21 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 0715da6e..6fda74c5 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -86,7 +86,7 @@ def _run_search_dump(args, search): release = args.api.get_release(esr.ident) ingest_request = release_ingest_request( release, - ingest_request_source="fatcat-ingest-container", + ingest_request_source="fatcat-ingest", ) if not ingest_request: continue diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 33f44600..d2d71d3c 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -70,8 +70,8 @@ class HarvestCrossrefWorker: def fail_fast(err, msg): if err is not None: - print("Kafka producer delivery error: {}".format(err)) - print("Bailing out...") + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) @@ -117,7 +117,7 @@ class HarvestCrossrefWorker: if http_resp.status_code == 503: # crude backoff; now redundant with session exponential # backoff, but allows for longer backoff/downtime on remote end - print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) + print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr) # keep kafka producer connection alive self.producer.poll(0) time.sleep(30.0) @@ -131,7 +131,7 @@ class HarvestCrossrefWorker: items = self.extract_items(resp) count += len(items) print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, - self.extract_total(resp), http_resp.elapsed)) + self.extract_total(resp), http_resp.elapsed), file=sys.stderr) #print(json.dumps(resp)) for work in items: self.producer.produce( @@ -156,7 +156,7 @@ class HarvestCrossrefWorker: while True: current = self.state.next(continuous) if current: - print("Fetching DOIs updated on {} (UTC)".format(current)) + print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) self.state.complete(current, kafka_topic=self.state_topic, @@ -164,11 +164,11 @@ class HarvestCrossrefWorker: continue if continuous: - print("Sleeping {} seconds...".format(self.loop_sleep)) + print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr) time.sleep(self.loop_sleep) else: break - print("{} DOI ingest caught up".format(self.name)) + print("{} DOI ingest caught up".format(self.name), file=sys.stderr) class HarvestDataciteWorker(HarvestCrossrefWorker): diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 78830a1c..310366bd 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -57,6 +57,10 @@ class HarvestState: if catchup_days or start_date or end_date: self.enqueue_period(start_date, end_date, catchup_days) + def __str__(self): + return '<HarvestState to_process={}, completed={}>'.format( + len(self.to_process), len(self.completed)) + def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): """ This function adds a time period to the "TODO" list, unless the dates @@ -129,7 +133,7 @@ class HarvestState: def fail_fast(err, msg): if err: raise KafkaException(err) - print("Commiting status to Kafka: {}".format(kafka_topic)) + print("Commiting status to Kafka: {}".format(kafka_topic), file=sys.stderr) producer_conf = kafka_config.copy() producer_conf.update({ 'delivery.report.only.error': True, @@ -154,7 +158,7 @@ class HarvestState: if not kafka_topic: return - print("Fetching state from kafka topic: {}".format(kafka_topic)) + print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr) def fail_fast(err, msg): if err: raise KafkaException(err) @@ -191,4 +195,4 @@ class HarvestState: # verify that we got at least to HWM assert c >= hwm[1] - print("... got {} state update messages, done".format(c)) + print("... got {} state update messages, done".format(c), file=sys.stderr) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index f908ba83..11b5fa0a 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -49,13 +49,14 @@ class HarvestOaiPmhWorker: self.name = "unnamed" self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) + print(self.state, file=sys.stderr) def fetch_date(self, date): def fail_fast(err, msg): if err is not None: - print("Kafka producer delivery error: {}".format(err)) - print("Bailing out...") + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) @@ -79,14 +80,14 @@ class HarvestOaiPmhWorker: 'until': date_str, }) except sickle.oaiexceptions.NoRecordsMatch: - print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str)) + print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr) return count = 0 for item in records: count += 1 if count % 50 == 0: - print("... up to {}".format(count)) + print("... up to {}".format(count), file=sys.stderr) producer.produce( self.produce_topic, item.raw.encode('utf-8'), @@ -99,7 +100,7 @@ class HarvestOaiPmhWorker: while True: current = self.state.next(continuous) if current: - print("Fetching DOIs updated on {} (UTC)".format(current)) + print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) self.state.complete(current, kafka_topic=self.state_topic, @@ -107,11 +108,11 @@ class HarvestOaiPmhWorker: continue if continuous: - print("Sleeping {} seconds...".format(self.loop_sleep)) + print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr) time.sleep(self.loop_sleep) else: break - print("{} OAI-PMH ingest caught up".format(self.name)) + print("{} OAI-PMH ingest caught up".format(self.name), file=sys.stderr) class HarvestArxivWorker(HarvestOaiPmhWorker): diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index fdaba176..4772bfaa 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -32,8 +32,11 @@ class IngestFileResultImporter(EntityImporter): 'fatcat-ingest', 'arabesque', 'mag-corpus', + 'mag', 'unpaywall-corpus', + 'unpaywall', 's2-corpus', + 's2', ] if kwargs.get('skip_source_whitelist', False): self.ingest_request_source_whitelist = [] @@ -137,7 +140,12 @@ class IngestFileResultImporter(EntityImporter): if not 'terminal_dt' in terminal: terminal['terminal_dt'] = terminal['dt'] assert len(terminal['terminal_dt']) == 14 - url = make_rel_url(terminal['terminal_url'], self.default_link_rel) + + default_rel = self.default_link_rel + if request.get('link_source') == 'doi': + default_rel = 'publisher' + default_rel = request.get('rel', default_rel) + url = make_rel_url(terminal['terminal_url'], default_rel) if not url: self.counts['skip-url'] += 1 @@ -158,8 +166,8 @@ class IngestFileResultImporter(EntityImporter): release_ids=[release_ident], urls=urls, ) - if fatcat and fatcat.get('edit_extra'): - fe.edit_extra = fatcat['edit_extra'] + if request.get('edit_extra'): + fe.edit_extra = request['edit_extra'] else: fe.edit_extra = dict() if request.get('ingest_request_source'): diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 7a9a585d..745ee85a 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -105,6 +105,8 @@ class EntityUpdatesWorker(FatcatWorker): self.live_pdf_ingest_doi_prefix_acceptlist = [ # biorxiv and medrxiv "10.1101/", + # researchgate + "10.13140/", ] def want_live_ingest(self, release, ingest_request): |