diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-11 16:54:57 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-11 16:54:59 -0800 | 
| commit | 4c716f9e39046fde3e98a3686a5f086f3d53315a (patch) | |
| tree | 28e09a2a701198939d6bf87e12594db657a1e39d /python | |
| parent | 7838a3c15a82281eec435ef16aad63e97015bdfc (diff) | |
| download | fatcat-4c716f9e39046fde3e98a3686a5f086f3d53315a.tar.gz fatcat-4c716f9e39046fde3e98a3686a5f086f3d53315a.zip | |
simplify ES scroll deletion using param()
This gets rid of some mess error handling code by properly configuring
the elasticsearch client to just not clean up scroll iterators when
accessing the public (prod or qa) search interfaces.
Leaving the scroll state around isn't ideal, so we still delete them if
possible (eg, connecting directly to elasticsearch).
Thanks to Martin for pointing out this solution in review.
Diffstat (limited to 'python')
| -rwxr-xr-x | python/fatcat_ingest.py | 58 | 
1 files changed, 29 insertions, 29 deletions
| diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 36bc530b..05b7e848 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -57,37 +57,37 @@ def run_ingest_container(args):      counts['estimate'] = s.count()      print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) -    # TODO: handling the scroll DELETE with the exception pass below is messy -    # because it is usually accompanied with a generator cleanup that doesn't -    # work (?) +    # don't try to clean up scroll if we are connected to public server (behind +    # nginx proxy that disallows DELETE) +    if args.elasticsearch_endpoint in ( +            'https://search.fatcat.wiki', +            'https://search.qa.fatcat.wiki'): +        s = s.params(clear_scroll=False) +      results = s.scan() -    try: -        for esr in results: -            counts['elasticsearch_release'] += 1 -            release = args.api.get_release(esr.ident) -            ingest_request = release_ingest_request( -                release, -                oa_only=False, -                ingest_request_source="fatcat-ingest-container", -            ) -            if not ingest_request: -                continue -            if kafka_producer != None: -                kafka_producer.produce( -                    ingest_file_request_topic, -                    json.dumps(ingest_request).encode('utf-8'), -                    #key=None, -                    on_delivery=kafka_fail_fast, -                ) -                counts['kafka'] += 1 -            # also printing to stdout when in kafka mode; could skip? -            print(json.dumps(ingest_request)) -            counts['ingest_request'] += 1 -    except elasticsearch.exceptions.AuthorizationException: -        print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) -    finally: +    for esr in results: +        counts['elasticsearch_release'] += 1 +        release = args.api.get_release(esr.ident) +        ingest_request = release_ingest_request( +            release, +            oa_only=False, +            ingest_request_source="fatcat-ingest-container", +        ) +        if not ingest_request: +            continue          if kafka_producer != None: -            kafka_producer.flush() +            kafka_producer.produce( +                ingest_file_request_topic, +                json.dumps(ingest_request).encode('utf-8'), +                #key=None, +                on_delivery=kafka_fail_fast, +            ) +            counts['kafka'] += 1 +        # also printing to stdout when in kafka mode; could skip? +        print(json.dumps(ingest_request)) +        counts['ingest_request'] += 1 +    if kafka_producer != None: +        kafka_producer.flush()      print(counts, file=sys.stderr)  def main(): | 
