aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-01-18 14:29:41 -0800
committerBryan Newbold <bnewbold@archive.org>2021-01-18 14:29:45 -0800
commit5054106bb84ace99602f4a461ec1ac50d6f8f03a (patch)
treec36fb2ad05bcef4c6b379458342bbd10b75237dc /fatcat_scholar
parentead6d0cbbea5101ce9565b773938242348287eec (diff)
downloadfatcat-scholar-5054106bb84ace99602f4a461ec1ac50d6f8f03a.tar.gz
fatcat-scholar-5054106bb84ace99602f4a461ec1ac50d6f8f03a.zip
worker: switch to ES helper for bulk indexing
This seems to resolve the problems with index workers failing after a couple hundred docs.
Diffstat (limited to 'fatcat_scholar')
-rw-r--r--fatcat_scholar/worker.py18
1 files changed, 10 insertions, 8 deletions
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py
index 4c1b9d7..78b54d4 100644
--- a/fatcat_scholar/worker.py
+++ b/fatcat_scholar/worker.py
@@ -7,6 +7,7 @@ from typing import List, Any
import requests
import elasticsearch
+import elasticsearch.helpers
import fatcat_openapi_client
from fatcat_openapi_client import ReleaseEntity
@@ -138,19 +139,20 @@ class IndexDocsWorker(KafkaWorker):
if not es_doc:
continue
else:
- bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},}))
- bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True))
+ bulk_actions.append(
+ {
+ "_index": self.es_index,
+ "_op_type": "index",
+ "_id": es_doc.key,
+ "_source": es_doc.json(exclude_none=True, sort_keys=True),
+ }
+ )
self.counts["docs-indexed"] += 1
if not bulk_actions:
return
- resp = self.es_client.bulk(
- "\n".join(bulk_actions), self.es_index, timeout="30s",
- )
- if resp.get("errors"):
- print(resp["errors"], file=sys.stderr)
- raise Exception("elasticsearch index response errors")
+ elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s")
self.counts["batches-indexed"] += 1