diff options
-rwxr-xr-x | python/fatcat_ingest.py | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 36bc530b..05b7e848 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -57,37 +57,37 @@ def run_ingest_container(args): counts['estimate'] = s.count() print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) - # TODO: handling the scroll DELETE with the exception pass below is messy - # because it is usually accompanied with a generator cleanup that doesn't - # work (?) + # don't try to clean up scroll if we are connected to public server (behind + # nginx proxy that disallows DELETE) + if args.elasticsearch_endpoint in ( + 'https://search.fatcat.wiki', + 'https://search.qa.fatcat.wiki'): + s = s.params(clear_scroll=False) + results = s.scan() - try: - for esr in results: - counts['elasticsearch_release'] += 1 - release = args.api.get_release(esr.ident) - ingest_request = release_ingest_request( - release, - oa_only=False, - ingest_request_source="fatcat-ingest-container", - ) - if not ingest_request: - continue - if kafka_producer != None: - kafka_producer.produce( - ingest_file_request_topic, - json.dumps(ingest_request).encode('utf-8'), - #key=None, - on_delivery=kafka_fail_fast, - ) - counts['kafka'] += 1 - # also printing to stdout when in kafka mode; could skip? - print(json.dumps(ingest_request)) - counts['ingest_request'] += 1 - except elasticsearch.exceptions.AuthorizationException: - print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) - finally: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue if kafka_producer != None: - kafka_producer.flush() + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + if kafka_producer != None: + kafka_producer.flush() print(counts, file=sys.stderr) def main(): |