From 293d4b176855d400324559c814abd2e404cdf31e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 21:16:59 -0800 Subject: flush out minio helper, add to grobid persist --- python/sandcrawler/persist.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 86a1c22..b017a82 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -19,6 +19,8 @@ grobid - file_meta SQL push batch (on conflict update) """ +import os + from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient @@ -187,6 +189,13 @@ class PersistGrobidWorker(SandcrawlerWorker): self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() self.grobid = GrobidClient() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) def process(self, record): """ @@ -200,6 +209,7 @@ class PersistGrobidWorker(SandcrawlerWorker): # enhance with teixml2json metadata, if available for r in batch: if r['status_code'] != 200 or not r.get('tei_xml'): + self.counts['s3-skip'] += 1 continue metadata = self.grobid.metadata(r) if not metadata: @@ -212,15 +222,25 @@ class PersistGrobidWorker(SandcrawlerWorker): r['updated'] = metadata['grobid_timestamp'] r['metadata'] = metadata - grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] - resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") - self.counts['insert-grobid'] += resp - - file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] - resp = self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file-meta'] += resp - - # TODO: minio + assert len(r['key']) == 40 + resp = self.s3.put_blob( + folder="grobid", + blob=r['tei_xml'], + sha1hex=r['key'], + extension=".tei.xml", + ) + self.counts['s3-put'] += 1 + + if not self.s3_only: + grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] + resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") + self.counts['insert-grobid'] += resp[0] + self.counts['update-grobid'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] self.db.commit() return [] -- cgit v1.2.3