diff options
| author | Martin Czygan <martin.czygan@gmail.com> | 2020-02-14 13:40:53 +0100 | 
|---|---|---|
| committer | Martin Czygan <martin.czygan@gmail.com> | 2020-02-14 13:40:53 +0100 | 
| commit | a1bbf612cef473af0410f9985d9e191a4000a0f5 (patch) | |
| tree | 533f45d7b70eceb5d5d19d37e6a71adf0d5f124b /python/fatcat_tools/harvest | |
| parent | 07fabec32aada55a75c064e5c1e01a46da30d854 (diff) | |
| download | fatcat-a1bbf612cef473af0410f9985d9e191a4000a0f5.tar.gz fatcat-a1bbf612cef473af0410f9985d9e191a4000a0f5.zip | |
harvest: log state on startup and use stderr for diagnostics
Diffstat (limited to 'python/fatcat_tools/harvest')
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 14 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 10 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 15 | 
3 files changed, 22 insertions, 17 deletions
| diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 33f44600..d2d71d3c 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -70,8 +70,8 @@ class HarvestCrossrefWorker:          def fail_fast(err, msg):              if err is not None: -                print("Kafka producer delivery error: {}".format(err)) -                print("Bailing out...") +                print("Kafka producer delivery error: {}".format(err), file=sys.stderr) +                print("Bailing out...", file=sys.stderr)                  # TODO: should it be sys.exit(-1)?                  raise KafkaException(err) @@ -117,7 +117,7 @@ class HarvestCrossrefWorker:              if http_resp.status_code == 503:                  # crude backoff; now redundant with session exponential                  # backoff, but allows for longer backoff/downtime on remote end -                print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) +                print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr)                  # keep kafka producer connection alive                  self.producer.poll(0)                  time.sleep(30.0) @@ -131,7 +131,7 @@ class HarvestCrossrefWorker:              items = self.extract_items(resp)              count += len(items)              print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, -                self.extract_total(resp), http_resp.elapsed)) +                self.extract_total(resp), http_resp.elapsed), file=sys.stderr)              #print(json.dumps(resp))              for work in items:                  self.producer.produce( @@ -156,7 +156,7 @@ class HarvestCrossrefWorker:          while True:              current = self.state.next(continuous)              if current: -                print("Fetching DOIs updated on {} (UTC)".format(current)) +                print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)                  self.fetch_date(current)                  self.state.complete(current,                      kafka_topic=self.state_topic, @@ -164,11 +164,11 @@ class HarvestCrossrefWorker:                  continue              if continuous: -                print("Sleeping {} seconds...".format(self.loop_sleep)) +                print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)                  time.sleep(self.loop_sleep)              else:                  break -        print("{} DOI ingest caught up".format(self.name)) +        print("{} DOI ingest caught up".format(self.name), file=sys.stderr)  class HarvestDataciteWorker(HarvestCrossrefWorker): diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 78830a1c..310366bd 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -57,6 +57,10 @@ class HarvestState:          if catchup_days or start_date or end_date:              self.enqueue_period(start_date, end_date, catchup_days) +    def __str__(self): +        return '<HarvestState to_process={}, completed={}>'.format( +            len(self.to_process), len(self.completed)) +      def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):          """          This function adds a time period to the "TODO" list, unless the dates @@ -129,7 +133,7 @@ class HarvestState:              def fail_fast(err, msg):                  if err:                      raise KafkaException(err) -            print("Commiting status to Kafka: {}".format(kafka_topic)) +            print("Commiting status to Kafka: {}".format(kafka_topic), file=sys.stderr)              producer_conf = kafka_config.copy()              producer_conf.update({                  'delivery.report.only.error': True, @@ -154,7 +158,7 @@ class HarvestState:          if not kafka_topic:              return -        print("Fetching state from kafka topic: {}".format(kafka_topic)) +        print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)          def fail_fast(err, msg):              if err:                  raise KafkaException(err) @@ -191,4 +195,4 @@ class HarvestState:          # verify that we got at least to HWM          assert c >= hwm[1] -        print("... got {} state update messages, done".format(c)) +        print("... got {} state update messages, done".format(c), file=sys.stderr) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index f908ba83..11b5fa0a 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -49,13 +49,14 @@ class HarvestOaiPmhWorker:          self.name = "unnamed"          self.state = HarvestState(start_date, end_date)          self.state.initialize_from_kafka(self.state_topic, self.kafka_config) +        print(self.state, file=sys.stderr)      def fetch_date(self, date):          def fail_fast(err, msg):              if err is not None: -                print("Kafka producer delivery error: {}".format(err)) -                print("Bailing out...") +                print("Kafka producer delivery error: {}".format(err), file=sys.stderr) +                print("Bailing out...", file=sys.stderr)                  # TODO: should it be sys.exit(-1)?                  raise KafkaException(err) @@ -79,14 +80,14 @@ class HarvestOaiPmhWorker:                  'until': date_str,              })          except sickle.oaiexceptions.NoRecordsMatch: -            print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str)) +            print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr)              return          count = 0          for item in records:              count += 1              if count % 50 == 0: -                print("... up to {}".format(count)) +                print("... up to {}".format(count), file=sys.stderr)              producer.produce(                  self.produce_topic,                  item.raw.encode('utf-8'), @@ -99,7 +100,7 @@ class HarvestOaiPmhWorker:          while True:              current = self.state.next(continuous)              if current: -                print("Fetching DOIs updated on {} (UTC)".format(current)) +                print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)                  self.fetch_date(current)                  self.state.complete(current,                      kafka_topic=self.state_topic, @@ -107,11 +108,11 @@ class HarvestOaiPmhWorker:                  continue              if continuous: -                print("Sleeping {} seconds...".format(self.loop_sleep)) +                print("Sleeping {} seconds...".format(self.loop_sleep), file=sys.stderr)                  time.sleep(self.loop_sleep)              else:                  break -        print("{} OAI-PMH ingest caught up".format(self.name)) +        print("{} OAI-PMH ingest caught up".format(self.name), file=sys.stderr)  class HarvestArxivWorker(HarvestOaiPmhWorker): | 
