diff options
Diffstat (limited to 'python')
| -rwxr-xr-x | python/fatcat_ingest.py | 58 | 
1 files changed, 29 insertions, 29 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 36bc530b..05b7e848 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -57,37 +57,37 @@ def run_ingest_container(args):      counts['estimate'] = s.count()      print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) -    # TODO: handling the scroll DELETE with the exception pass below is messy -    # because it is usually accompanied with a generator cleanup that doesn't -    # work (?) +    # don't try to clean up scroll if we are connected to public server (behind +    # nginx proxy that disallows DELETE) +    if args.elasticsearch_endpoint in ( +            'https://search.fatcat.wiki', +            'https://search.qa.fatcat.wiki'): +        s = s.params(clear_scroll=False) +      results = s.scan() -    try: -        for esr in results: -            counts['elasticsearch_release'] += 1 -            release = args.api.get_release(esr.ident) -            ingest_request = release_ingest_request( -                release, -                oa_only=False, -                ingest_request_source="fatcat-ingest-container", -            ) -            if not ingest_request: -                continue -            if kafka_producer != None: -                kafka_producer.produce( -                    ingest_file_request_topic, -                    json.dumps(ingest_request).encode('utf-8'), -                    #key=None, -                    on_delivery=kafka_fail_fast, -                ) -                counts['kafka'] += 1 -            # also printing to stdout when in kafka mode; could skip? -            print(json.dumps(ingest_request)) -            counts['ingest_request'] += 1 -    except elasticsearch.exceptions.AuthorizationException: -        print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) -    finally: +    for esr in results: +        counts['elasticsearch_release'] += 1 +        release = args.api.get_release(esr.ident) +        ingest_request = release_ingest_request( +            release, +            oa_only=False, +            ingest_request_source="fatcat-ingest-container", +        ) +        if not ingest_request: +            continue          if kafka_producer != None: -            kafka_producer.flush() +            kafka_producer.produce( +                ingest_file_request_topic, +                json.dumps(ingest_request).encode('utf-8'), +                #key=None, +                on_delivery=kafka_fail_fast, +            ) +            counts['kafka'] += 1 +        # also printing to stdout when in kafka mode; could skip? +        print(json.dumps(ingest_request)) +        counts['ingest_request'] += 1 +    if kafka_producer != None: +        kafka_producer.flush()      print(counts, file=sys.stderr)  def main():  | 
