aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/fatcat_ingest.py58
1 files changed, 29 insertions, 29 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
index 36bc530b..05b7e848 100755
--- a/python/fatcat_ingest.py
+++ b/python/fatcat_ingest.py
@@ -57,37 +57,37 @@ def run_ingest_container(args):
counts['estimate'] = s.count()
print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr)
- # TODO: handling the scroll DELETE with the exception pass below is messy
- # because it is usually accompanied with a generator cleanup that doesn't
- # work (?)
+ # don't try to clean up scroll if we are connected to public server (behind
+ # nginx proxy that disallows DELETE)
+ if args.elasticsearch_endpoint in (
+ 'https://search.fatcat.wiki',
+ 'https://search.qa.fatcat.wiki'):
+ s = s.params(clear_scroll=False)
+
results = s.scan()
- try:
- for esr in results:
- counts['elasticsearch_release'] += 1
- release = args.api.get_release(esr.ident)
- ingest_request = release_ingest_request(
- release,
- oa_only=False,
- ingest_request_source="fatcat-ingest-container",
- )
- if not ingest_request:
- continue
- if kafka_producer != None:
- kafka_producer.produce(
- ingest_file_request_topic,
- json.dumps(ingest_request).encode('utf-8'),
- #key=None,
- on_delivery=kafka_fail_fast,
- )
- counts['kafka'] += 1
- # also printing to stdout when in kafka mode; could skip?
- print(json.dumps(ingest_request))
- counts['ingest_request'] += 1
- except elasticsearch.exceptions.AuthorizationException:
- print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr)
- finally:
+ for esr in results:
+ counts['elasticsearch_release'] += 1
+ release = args.api.get_release(esr.ident)
+ ingest_request = release_ingest_request(
+ release,
+ oa_only=False,
+ ingest_request_source="fatcat-ingest-container",
+ )
+ if not ingest_request:
+ continue
if kafka_producer != None:
- kafka_producer.flush()
+ kafka_producer.produce(
+ ingest_file_request_topic,
+ json.dumps(ingest_request).encode('utf-8'),
+ #key=None,
+ on_delivery=kafka_fail_fast,
+ )
+ counts['kafka'] += 1
+ # also printing to stdout when in kafka mode; could skip?
+ print(json.dumps(ingest_request))
+ counts['ingest_request'] += 1
+ if kafka_producer != None:
+ kafka_producer.flush()
print(counts, file=sys.stderr)
def main():