From 5054106bb84ace99602f4a461ec1ac50d6f8f03a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 18 Jan 2021 14:29:41 -0800 Subject: worker: switch to ES helper for bulk indexing This seems to resolve the problems with index workers failing after a couple hundred docs. --- fatcat_scholar/worker.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py index 4c1b9d7..78b54d4 100644 --- a/fatcat_scholar/worker.py +++ b/fatcat_scholar/worker.py @@ -7,6 +7,7 @@ from typing import List, Any import requests import elasticsearch +import elasticsearch.helpers import fatcat_openapi_client from fatcat_openapi_client import ReleaseEntity @@ -138,19 +139,20 @@ class IndexDocsWorker(KafkaWorker): if not es_doc: continue else: - bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},})) - bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True)) + bulk_actions.append( + { + "_index": self.es_index, + "_op_type": "index", + "_id": es_doc.key, + "_source": es_doc.json(exclude_none=True, sort_keys=True), + } + ) self.counts["docs-indexed"] += 1 if not bulk_actions: return - resp = self.es_client.bulk( - "\n".join(bulk_actions), self.es_index, timeout="30s", - ) - if resp.get("errors"): - print(resp["errors"], file=sys.stderr) - raise Exception("elasticsearch index response errors") + elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s") self.counts["batches-indexed"] += 1 -- cgit v1.2.3