diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-01-18 14:29:41 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-01-18 14:29:45 -0800 |
commit | 5054106bb84ace99602f4a461ec1ac50d6f8f03a (patch) | |
tree | c36fb2ad05bcef4c6b379458342bbd10b75237dc /fatcat_scholar | |
parent | ead6d0cbbea5101ce9565b773938242348287eec (diff) | |
download | fatcat-scholar-5054106bb84ace99602f4a461ec1ac50d6f8f03a.tar.gz fatcat-scholar-5054106bb84ace99602f4a461ec1ac50d6f8f03a.zip |
worker: switch to ES helper for bulk indexing
This seems to resolve the problems with index workers failing after a
couple hundred docs.
Diffstat (limited to 'fatcat_scholar')
-rw-r--r-- | fatcat_scholar/worker.py | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py index 4c1b9d7..78b54d4 100644 --- a/fatcat_scholar/worker.py +++ b/fatcat_scholar/worker.py @@ -7,6 +7,7 @@ from typing import List, Any import requests import elasticsearch +import elasticsearch.helpers import fatcat_openapi_client from fatcat_openapi_client import ReleaseEntity @@ -138,19 +139,20 @@ class IndexDocsWorker(KafkaWorker): if not es_doc: continue else: - bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},})) - bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True)) + bulk_actions.append( + { + "_index": self.es_index, + "_op_type": "index", + "_id": es_doc.key, + "_source": es_doc.json(exclude_none=True, sort_keys=True), + } + ) self.counts["docs-indexed"] += 1 if not bulk_actions: return - resp = self.es_client.bulk( - "\n".join(bulk_actions), self.es_index, timeout="30s", - ) - if resp.get("errors"): - print(resp["errors"], file=sys.stderr) - raise Exception("elasticsearch index response errors") + elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s") self.counts["batches-indexed"] += 1 |