diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 71 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 40 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 41 | 
3 files changed, 104 insertions, 48 deletions
| diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 802e0e22..1483266c 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -7,7 +7,7 @@ import time  import itertools  import datetime  import requests -from pykafka import KafkaClient +from confluent_kafka import Producer  from fatcat_tools.workers import most_recent_message  from .harvest_common import HarvestState, requests_retry_session @@ -53,11 +53,16 @@ class HarvestCrossrefWorker:          self.produce_topic = produce_topic          self.state_topic = state_topic          self.contact_email = contact_email -        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")          self.is_update_filter = is_update_filter +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'delivery.report.only.error': True, +            'default.topic.config': +                {'request.required.acks': 'all'}, +        }          self.state = HarvestState(start_date, end_date) -        self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) +        self.state.initialize_from_kafka(self.state_topic, self.kafka_config)          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks          self.api_batch_size = 50 @@ -81,9 +86,16 @@ class HarvestCrossrefWorker:      def extract_key(self, obj):          return obj['DOI'].encode('utf-8') +    def kafka_produce_delivery_callback(err, msg): +        if err is not None: +            print("Kafka producer delivery error: {}".format(err)) +            print("Bailing out...") +            # TODO: should it be sys.exit(-1)? +            raise KafkaException(err) +      def fetch_date(self, date): -        produce_topic = self.kafka.topics[self.produce_topic] +        producer = Producer(self.kafka_config)          date_str = date.isoformat()          params = self.params(date_str) @@ -93,27 +105,32 @@ class HarvestCrossrefWorker:                  self.contact_email),          })          count = 0 -        with produce_topic.get_producer() as producer: -            while True: -                http_resp = http_session.get(self.api_host_url, params=params) -                if http_resp.status_code == 503: -                    # crude backoff; now redundant with session exponential -                    # backoff, but allows for longer backoff/downtime on remote end -                    print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) -                    time.sleep(30.0) -                    continue -                http_resp.raise_for_status() -                resp = http_resp.json() -                items = self.extract_items(resp) -                count += len(items) -                print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, -                    self.extract_total(resp), http_resp.elapsed)) -                #print(json.dumps(resp)) -                for work in items: -                    producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work)) -                if len(items) < self.api_batch_size: -                    break -                params = self.update_params(params, resp) +        while True: +            http_resp = http_session.get(self.api_host_url, params=params) +            if http_resp.status_code == 503: +                # crude backoff; now redundant with session exponential +                # backoff, but allows for longer backoff/downtime on remote end +                print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) +                time.sleep(30.0) +                continue +            http_resp.raise_for_status() +            resp = http_resp.json() +            items = self.extract_items(resp) +            count += len(items) +            print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, +                self.extract_total(resp), http_resp.elapsed)) +            #print(json.dumps(resp)) +            for work in items: +                producer.produce( +                    self.produce_topic, +                    json.dumps(work).encode('utf-8'), +                    key=self.extract_key(work), +                    on_delivery=self.kafka_produce_delivery_callback) +            producer.poll(0) +            if len(items) < self.api_batch_size: +                break +            params = self.update_params(params, resp) +        producer.flush()      def extract_items(self, resp):          return resp['message']['items'] @@ -128,7 +145,9 @@ class HarvestCrossrefWorker:              if current:                  print("Fetching DOIs updated on {} (UTC)".format(current))                  self.fetch_date(current) -                self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) +                self.state.complete(current, +                    kafka_topic=self.state_topic, +                    kafka_config=self.kafka_config)                  continue              if continuous: diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index da06275f..90f499da 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -8,6 +8,8 @@ from requests.adapters import HTTPAdapter  # unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are  # in Pipenv.lock, and there are no errors in QA  from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error +from confluent_kafka import Producer, Consumer, TopicPartition, KafkaException, \ +    OFFSET_BEGINNING  # Used for parsing ISO date format (YYYY-MM-DD) @@ -104,14 +106,14 @@ class HarvestState:              date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()              self.complete(date) -    def complete(self, date, kafka_topic=None): +    def complete(self, date, kafka_topic=None, kafka_config=None):          """          Records that a date has been processed successfully.          Updates internal state and returns a JSON representation to be          serialized. Will publish to a kafka topic if passed as an argument. -        kafka_topic should have type pykafka.Topic (not str) +        kafka_topic should be a string. A producer will be created and destroyed.          """          try:              self.to_process.remove(date) @@ -123,25 +125,41 @@ class HarvestState:              'completed-date': str(date),          }).encode('utf-8')          if kafka_topic: -            with kafka_topic.get_sync_producer() as producer: -                producer.produce(state_json) +            assert(kafka_config) +            def fail_fast(err, msg): +                if err: +                    raise KafkaException(err) +            print("Commiting status to Kafka: {}".format(kafka_topic)) +            producer = Producer(kafka_config) +            producer.produce(kafka_topic, state_json, on_delivery=fail_fast) +            producer.flush()          return state_json -    def initialize_from_kafka(self, kafka_topic): +    def initialize_from_kafka(self, kafka_topic, kafka_config):          """ -        kafka_topic should have type pykafka.Topic (not str) +        kafka_topic should have type str          """          if not kafka_topic:              return -        print("Fetching state from kafka topic: {}".format(kafka_topic.name)) -        consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) +        print("Fetching state from kafka topic: {}".format(kafka_topic)) +        conf = kafka_config.copy() +        conf.update({ +            'auto.offset.reset': 'earliest', +            'session.timeout.ms': 10000, +            'group.id': kafka_topic + "-init", +        }) +        consumer = Consumer(conf) +        consumer.assign([TopicPartition(kafka_topic, 0, OFFSET_BEGINNING)])          c = 0          while True: -            msg = consumer.consume(block=True) +            msg = consumer.poll(timeout=1.0)              if not msg:                  break -            #sys.stdout.write('.') -            self.update(msg.value.decode('utf-8')) +            if msg.error(): +                raise KafkaException(msg.error()) +            sys.stdout.write('.') # XXX: +            self.update(msg.value().decode('utf-8'))              c += 1 +        consumer.close()          print("... got {} state update messages, done".format(c)) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0b482924..ab424482 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -7,8 +7,8 @@ import time  import itertools  import datetime  import requests -from pykafka import KafkaClient  import sickle +from confluent_kafka import Producer  from fatcat_tools.workers import most_recent_message  from .harvest_common import HarvestState @@ -37,7 +37,12 @@ class HarvestOaiPmhWorker:          self.produce_topic = produce_topic          self.state_topic = state_topic -        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") +        self.kafka_config = { +            'bootstrap.servers': kafka_hosts, +            'delivery.report.only.error': True, +            'default.topic.config': +                {'request.required.acks': 'all'}, +        }          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -45,14 +50,21 @@ class HarvestOaiPmhWorker:          self.metadata_prefix = None  # needs override          self.name = "unnamed"          self.state = HarvestState(start_date, end_date) -        self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) +        self.state.initialize_from_kafka(self.state_topic, self.kafka_config) +    def kafka_produce_delivery_callback(err, msg): +        if err is not None: +            print("Kafka producer delivery error: {}".format(err)) +            print("Bailing out...") +            # TODO: should it be sys.exit(-1)? +            raise KafkaException(err)      def fetch_date(self, date): +        producer = Producer(self.kafka_config) +          api = sickle.Sickle(self.endpoint_url)          date_str = date.isoformat() -        produce_topic = self.kafka.topics[self.produce_topic]          # this dict kwargs hack is to work around 'from' as a reserved python keyword          # recommended by sickle docs          try: @@ -66,12 +78,17 @@ class HarvestOaiPmhWorker:              return          count = 0 -        with produce_topic.get_producer() as producer: -            for item in records: -                count += 1 -                if count % 50 == 0: -                    print("... up to {}".format(count)) -                producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) +        for item in records: +            count += 1 +            if count % 50 == 0: +                print("... up to {}".format(count)) +            producer.produce( +                self.produce_topic, +                item.raw.encode('utf-8'), +                key=item.header.identifier.encode('utf-8'), +                on_delivery=self.kafka_produce_delivery_callback) +            producer.poll(0) +        producer.flush()      def run(self, continuous=False): @@ -80,7 +97,9 @@ class HarvestOaiPmhWorker:              if current:                  print("Fetching DOIs updated on {} (UTC)".format(current))                  self.fetch_date(current) -                self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) +                self.state.complete(current, +                    kafka_topic=self.state_topic, +                    kafka_config=self.kafka_config)                  continue              if continuous: | 
