summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2020-02-14 13:40:53 +0100
committerMartin Czygan <martin.czygan@gmail.com>2020-02-14 13:40:53 +0100
commita1bbf612cef473af0410f9985d9e191a4000a0f5 (patch)
tree533f45d7b70eceb5d5d19d37e6a71adf0d5f124b /python/fatcat_tools/harvest
parent07fabec32aada55a75c064e5c1e01a46da30d854 (diff)
downloadfatcat-a1bbf612cef473af0410f9985d9e191a4000a0f5.tar.gz
fatcat-a1bbf612cef473af0410f9985d9e191a4000a0f5.zip
harvest: log state on startup and use stderr for diagnostics
Diffstat (limited to 'python/fatcat_tools/harvest')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py14
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py10
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py15
3 files changed, 22 insertions, 17 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 33f44600..d2d71d3c 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -70,8 +70,8 @@ class HarvestCrossrefWorker:
def fail_fast(err, msg):
if err is not None:
- print("Kafka producer delivery error: {}".format(err))
- print("Bailing out...")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -117,7 +117,7 @@ class HarvestCrossrefWorker:
if http_resp.status_code == 503:
# crude backoff; now redundant with session exponential
# backoff, but allows for longer backoff/downtime on remote end
- print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))
+ print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr)
# keep kafka producer connection alive
self.producer.poll(0)
time.sleep(30.0)
@@ -131,7 +131,7 @@ class HarvestCrossrefWorker:
items = self.extract_items(resp)
count += len(items)
print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
- self.extract_total(resp), http_resp.elapsed))
+ self.extract_total(resp), http_resp.elapsed), file=sys.stderr)
#print(json.dumps(resp))
for work in items:
self.producer.produce(
@@ -156,7 +156,7 @@ class HarvestCrossrefWorker:
while True:
current = self.state.next(continuous)
if current:
- print("Fetching DOIs updated on {} (UTC)".format(current))
+ print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
self.state.complete(current,
kafka_topic=self.state_topic,
@@ -164,11 +164,11 @@ class HarvestCrossrefWorker:
continue
if continuous:
- print("Sleeping {} seconds...".format(self.loop_sleep))
+ print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)
time.sleep(self.loop_sleep)
else:
break
- print("{} DOI ingest caught up".format(self.name))
+ print("{} DOI ingest caught up".format(self.name), file=sys.stderr)
class HarvestDataciteWorker(HarvestCrossrefWorker):
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index 78830a1c..310366bd 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -57,6 +57,10 @@ class HarvestState:
if catchup_days or start_date or end_date:
self.enqueue_period(start_date, end_date, catchup_days)
+ def __str__(self):
+ return '<HarvestState to_process={}, completed={}>'.format(
+ len(self.to_process), len(self.completed))
+
def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
"""
This function adds a time period to the "TODO" list, unless the dates
@@ -129,7 +133,7 @@ class HarvestState:
def fail_fast(err, msg):
if err:
raise KafkaException(err)
- print("Commiting status to Kafka: {}".format(kafka_topic))
+ print("Commiting status to Kafka: {}".format(kafka_topic), file=sys.stderr)
producer_conf = kafka_config.copy()
producer_conf.update({
'delivery.report.only.error': True,
@@ -154,7 +158,7 @@ class HarvestState:
if not kafka_topic:
return
- print("Fetching state from kafka topic: {}".format(kafka_topic))
+ print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)
def fail_fast(err, msg):
if err:
raise KafkaException(err)
@@ -191,4 +195,4 @@ class HarvestState:
# verify that we got at least to HWM
assert c >= hwm[1]
- print("... got {} state update messages, done".format(c))
+ print("... got {} state update messages, done".format(c), file=sys.stderr)
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index f908ba83..11b5fa0a 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -49,13 +49,14 @@ class HarvestOaiPmhWorker:
self.name = "unnamed"
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
+ print(self.state, file=sys.stderr)
def fetch_date(self, date):
def fail_fast(err, msg):
if err is not None:
- print("Kafka producer delivery error: {}".format(err))
- print("Bailing out...")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -79,14 +80,14 @@ class HarvestOaiPmhWorker:
'until': date_str,
})
except sickle.oaiexceptions.NoRecordsMatch:
- print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str))
+ print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr)
return
count = 0
for item in records:
count += 1
if count % 50 == 0:
- print("... up to {}".format(count))
+ print("... up to {}".format(count), file=sys.stderr)
producer.produce(
self.produce_topic,
item.raw.encode('utf-8'),
@@ -99,7 +100,7 @@ class HarvestOaiPmhWorker:
while True:
current = self.state.next(continuous)
if current:
- print("Fetching DOIs updated on {} (UTC)".format(current))
+ print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
self.state.complete(current,
kafka_topic=self.state_topic,
@@ -107,11 +108,11 @@ class HarvestOaiPmhWorker:
continue
if continuous:
- print("Sleeping {} seconds...".format(self.loop_sleep))
+ print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)
time.sleep(self.loop_sleep)
else:
break
- print("{} OAI-PMH ingest caught up".format(self.name))
+ print("{} OAI-PMH ingest caught up".format(self.name), file=sys.stderr)
class HarvestArxivWorker(HarvestOaiPmhWorker):