aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_ingest.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-12-11 16:54:57 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-12-11 16:54:59 -0800
commit4c716f9e39046fde3e98a3686a5f086f3d53315a (patch)
tree28e09a2a701198939d6bf87e12594db657a1e39d /python/fatcat_ingest.py
parent7838a3c15a82281eec435ef16aad63e97015bdfc (diff)
downloadfatcat-4c716f9e39046fde3e98a3686a5f086f3d53315a.tar.gz
fatcat-4c716f9e39046fde3e98a3686a5f086f3d53315a.zip
simplify ES scroll deletion using param()
This gets rid of some mess error handling code by properly configuring the elasticsearch client to just not clean up scroll iterators when accessing the public (prod or qa) search interfaces. Leaving the scroll state around isn't ideal, so we still delete them if possible (eg, connecting directly to elasticsearch). Thanks to Martin for pointing out this solution in review.
Diffstat (limited to 'python/fatcat_ingest.py')
-rwxr-xr-xpython/fatcat_ingest.py58
1 files changed, 29 insertions, 29 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
index 36bc530b..05b7e848 100755
--- a/python/fatcat_ingest.py
+++ b/python/fatcat_ingest.py
@@ -57,37 +57,37 @@ def run_ingest_container(args):
counts['estimate'] = s.count()
print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr)
- # TODO: handling the scroll DELETE with the exception pass below is messy
- # because it is usually accompanied with a generator cleanup that doesn't
- # work (?)
+ # don't try to clean up scroll if we are connected to public server (behind
+ # nginx proxy that disallows DELETE)
+ if args.elasticsearch_endpoint in (
+ 'https://search.fatcat.wiki',
+ 'https://search.qa.fatcat.wiki'):
+ s = s.params(clear_scroll=False)
+
results = s.scan()
- try:
- for esr in results:
- counts['elasticsearch_release'] += 1
- release = args.api.get_release(esr.ident)
- ingest_request = release_ingest_request(
- release,
- oa_only=False,
- ingest_request_source="fatcat-ingest-container",
- )
- if not ingest_request:
- continue
- if kafka_producer != None:
- kafka_producer.produce(
- ingest_file_request_topic,
- json.dumps(ingest_request).encode('utf-8'),
- #key=None,
- on_delivery=kafka_fail_fast,
- )
- counts['kafka'] += 1
- # also printing to stdout when in kafka mode; could skip?
- print(json.dumps(ingest_request))
- counts['ingest_request'] += 1
- except elasticsearch.exceptions.AuthorizationException:
- print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr)
- finally:
+ for esr in results:
+ counts['elasticsearch_release'] += 1
+ release = args.api.get_release(esr.ident)
+ ingest_request = release_ingest_request(
+ release,
+ oa_only=False,
+ ingest_request_source="fatcat-ingest-container",
+ )
+ if not ingest_request:
+ continue
if kafka_producer != None:
- kafka_producer.flush()
+ kafka_producer.produce(
+ ingest_file_request_topic,
+ json.dumps(ingest_request).encode('utf-8'),
+ #key=None,
+ on_delivery=kafka_fail_fast,
+ )
+ counts['kafka'] += 1
+ # also printing to stdout when in kafka mode; could skip?
+ print(json.dumps(ingest_request))
+ counts['ingest_request'] += 1
+ if kafka_producer != None:
+ kafka_producer.flush()
print(counts, file=sys.stderr)
def main():