aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-04-12 10:35:53 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-04-12 10:35:53 -0700
commitb61e2e020a9249ab04be7484f4159a9dc1ca4a83 (patch)
tree0d6224a9374b1f6c4fffa1717788950f30998a79 /python/fatcat_tools/workers
parent7dd548bff79cdb90bf04f5fbb431f9122ff297d6 (diff)
downloadfatcat-b61e2e020a9249ab04be7484f4159a9dc1ca4a83.tar.gz
fatcat-b61e2e020a9249ab04be7484f4159a9dc1ca4a83.zip
ES indexing: skip 'wip' entities with a warning
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py27
1 files changed, 16 insertions, 11 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 13409eb5..3e4753fb 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,4 +1,5 @@
+import sys
import json
import requests
@@ -48,15 +49,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def fail_fast(err, partitions):
if err is not None:
- print("Kafka consumer commit error: {}".format(err))
- print("Bailing out...")
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
for p in partitions:
# check for partition-specific commit errors
if p.error:
- print("Kafka consumer commit error: {}".format(p.error))
- print("Bailing out...")
+ print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
#print("Kafka consumer commit successful")
@@ -67,7 +68,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
if p.error:
raise KafkaException(p.error)
print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions))
+ consumer, partitions), file=sys.stderr)
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
@@ -96,10 +97,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
timeout=self.poll_interval)
if not batch:
if not consumer.assignment():
- print("... no Kafka consumer partitions assigned yet")
- print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval))
+ print("... no Kafka consumer partitions assigned yet", file=sys.stderr)
+ print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval), file=sys.stderr)
continue
- print("... got {} kafka messages".format(len(batch)))
+ print("... got {} kafka messages".format(len(batch)), file=sys.stderr)
# first check errors on entire batch...
for msg in batch:
if msg.error():
@@ -117,6 +118,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
else:
key = entity.ident
+ if entity.state == 'wip':
+ print(f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", file=sys.stderr)
+ continue
+
if self.entity_type == ContainerEntity and self.query_stats:
stats = get_elastic_container_stats(
entity.ident,
@@ -134,7 +139,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
}))
bulk_actions.append(json.dumps(doc_dict))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr)
elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index)
@@ -144,8 +149,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
resp.raise_for_status()
if resp.json()['errors']:
desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint)
- print(desc)
- print(resp.content)
+ print(desc, file=sys.stderr)
+ print(resp.content, file=sys.stderr)
raise Exception(desc)
for msg in batch:
# offsets are *committed* (to brokers) automatically, but need