aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'fatcat_scholar/worker.py')
-rw-r--r--fatcat_scholar/worker.py18
1 files changed, 10 insertions, 8 deletions
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py
index 4c1b9d7..78b54d4 100644
--- a/fatcat_scholar/worker.py
+++ b/fatcat_scholar/worker.py
@@ -7,6 +7,7 @@ from typing import List, Any
import requests
import elasticsearch
+import elasticsearch.helpers
import fatcat_openapi_client
from fatcat_openapi_client import ReleaseEntity
@@ -138,19 +139,20 @@ class IndexDocsWorker(KafkaWorker):
if not es_doc:
continue
else:
- bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},}))
- bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True))
+ bulk_actions.append(
+ {
+ "_index": self.es_index,
+ "_op_type": "index",
+ "_id": es_doc.key,
+ "_source": es_doc.json(exclude_none=True, sort_keys=True),
+ }
+ )
self.counts["docs-indexed"] += 1
if not bulk_actions:
return
- resp = self.es_client.bulk(
- "\n".join(bulk_actions), self.es_index, timeout="30s",
- )
- if resp.get("errors"):
- print(resp["errors"], file=sys.stderr)
- raise Exception("elasticsearch index response errors")
+ elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s")
self.counts["batches-indexed"] += 1