diff options
Diffstat (limited to 'fatcat_scholar/worker.py')
-rw-r--r-- | fatcat_scholar/worker.py | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py index 4c1b9d7..78b54d4 100644 --- a/fatcat_scholar/worker.py +++ b/fatcat_scholar/worker.py @@ -7,6 +7,7 @@ from typing import List, Any import requests import elasticsearch +import elasticsearch.helpers import fatcat_openapi_client from fatcat_openapi_client import ReleaseEntity @@ -138,19 +139,20 @@ class IndexDocsWorker(KafkaWorker): if not es_doc: continue else: - bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},})) - bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True)) + bulk_actions.append( + { + "_index": self.es_index, + "_op_type": "index", + "_id": es_doc.key, + "_source": es_doc.json(exclude_none=True, sort_keys=True), + } + ) self.counts["docs-indexed"] += 1 if not bulk_actions: return - resp = self.es_client.bulk( - "\n".join(bulk_actions), self.es_index, timeout="30s", - ) - if resp.get("errors"): - print(resp["errors"], file=sys.stderr) - raise Exception("elasticsearch index response errors") + elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s") self.counts["batches-indexed"] += 1 |