From 8e9106885bc736648c0bf0151a29d4bea9b72650 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 29 Oct 2020 15:22:47 -0700 Subject: better default CLI output (show usage) --- python/sandcrawler_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 77c0704..a653771 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -304,7 +304,7 @@ def main(): args = parser.parse_args() if not args.__dict__.get("func"): - print("tell me what to do!") + parser.print_help(file=sys.stderr) sys.exit(-1) args.func(args) -- cgit v1.2.3 From 3adcaf9802928346dda597cefd4b66b2e62fa942 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:12:14 -0800 Subject: refactor 'minio' to 'seaweedfs'; and BLOB env vars This goes along with changes to ansible deployment to use the correct key names and values. --- python/example.env | 4 ++-- python/persist_tool.py | 18 +++++++++--------- python/sandcrawler/minio.py | 4 ++-- python/sandcrawler/persist.py | 6 ++++-- python/sandcrawler_worker.py | 20 ++++++++++---------- 5 files changed, 27 insertions(+), 25 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/example.env b/python/example.env index 4d3baa0..5064c96 100644 --- a/python/example.env +++ b/python/example.env @@ -1,5 +1,5 @@ -MINIO_ACCESS_KEY="minioadmin" -MINIO_SECRET_KEY="minioadmin" +SANDCRAWLER_BLOB_ACCESS_KEY="minioadmin" +SANDCRAWLER_BLOB_SECRET_KEY="minioadmin" IA_ACCESS_KEY="dummy" IA_SECRET_KEY="dummy" CDX_AUTH_TOKEN="dummy" diff --git a/python/persist_tool.py b/python/persist_tool.py index 66e02aa..69e9374 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -Commands for backfilling content from bulk files into postgresql and s3 (minio). +Commands for backfilling content from bulk files into postgresql and s3 (seaweedfs). Normally this is done by workers (in sandcrawler_worker.py) consuming from Kafka feeds, but sometimes we have bulk processing output we want to backfill. @@ -120,16 +120,16 @@ def main(): help="postgresql database connection string", default="postgres:///sandcrawler") parser.add_argument('--s3-url', - help="S3 (minio) backend URL", + help="S3 (seaweedfs) backend URL", default="localhost:9000") parser.add_argument('--s3-access-key', - help="S3 (minio) credential", - default=os.environ.get('MINIO_ACCESS_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) parser.add_argument('--s3-secret-key', - help="S3 (minio) credential", - default=os.environ.get('MINIO_SECRET_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_SECRET_KEY')) parser.add_argument('--s3-bucket', - help="S3 (minio) bucket to persist into", + help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev") subparsers = parser.add_subparsers() @@ -144,7 +144,7 @@ def main(): help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") sub_grobid = subparsers.add_parser('grobid', - help="backfill a grobid JSON ('pg') dump into postgresql and s3 (minio)") + help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)") sub_grobid.set_defaults(func=run_grobid) sub_grobid.add_argument('json_file', help="grobid file to import from (or '-' for stdin)", @@ -180,7 +180,7 @@ def main(): type=str) sub_pdftrio = subparsers.add_parser('pdftrio', - help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (minio)") + help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)") sub_pdftrio.set_defaults(func=run_pdftrio) sub_pdftrio.add_argument('json_file', help="pdftrio file to import from (or '-' for stdin)", diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index 8b02211..c7deea1 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -17,8 +17,8 @@ class SandcrawlerMinioClient(object): Example config: host="localhost:9000", - access_key=os.environ['MINIO_ACCESS_KEY'], - secret_key=os.environ['MINIO_SECRET_KEY'], + access_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], + secret_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], """ self.mc = minio.Minio( host_url, diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index fbc5273..aa05195 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -452,9 +452,11 @@ class PersistPdfTextWorker(SandcrawlerWorker): class PersistThumbnailWorker(SandcrawlerWorker): """ - Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL + table. - This worker *must* be used with raw kakfa mode. + This worker *must* be used with raw kakfa mode; thumbnails are *not* + wrapped in JSON like most sandcrawler kafka messages. """ def __init__(self, **kwargs): diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index a653771..537398e 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -3,7 +3,7 @@ """ These are generally for continuously running workers that consume from Kafka. Outputs might either be pushed back into Kafka, or directly into sandcrawler-db -or minio. +or S3 (SeaweedFS). """ import os @@ -242,16 +242,16 @@ def main(): help="postgresql database connection string", default="postgres:///sandcrawler") parser.add_argument('--s3-url', - help="S3 (minio) backend URL", + help="S3 (seaweedfs) backend URL", default="localhost:9000") parser.add_argument('--s3-access-key', - help="S3 (minio) credential", - default=os.environ.get('MINIO_ACCESS_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) parser.add_argument('--s3-secret-key', - help="S3 (minio) credential", - default=os.environ.get('MINIO_SECRET_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY')) parser.add_argument('--s3-bucket', - help="S3 (minio) bucket to persist into", + help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev") subparsers = parser.add_subparsers() @@ -264,7 +264,7 @@ def main(): sub_pdf_extract.set_defaults(func=run_pdf_extract) sub_persist_grobid = subparsers.add_parser('persist-grobid', - help="daemon that consumes GROBID output from Kafka and pushes to minio and postgres") + help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres") sub_persist_grobid.add_argument('--s3-only', action='store_true', help="only upload TEI-XML to S3 (don't write to database)") @@ -274,7 +274,7 @@ def main(): sub_persist_grobid.set_defaults(func=run_persist_grobid) sub_persist_pdftext = subparsers.add_parser('persist-pdftext', - help="daemon that consumes pdftext output from Kafka and pushes to minio and postgres") + help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres") sub_persist_pdftext.add_argument('--s3-only', action='store_true', help="only upload TEI-XML to S3 (don't write to database)") @@ -284,7 +284,7 @@ def main(): sub_persist_pdftext.set_defaults(func=run_persist_pdftext) sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', - help="daemon that consumes thumbnail output from Kafka and pushes to minio and postgres") + help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', -- cgit v1.2.3 From 2885b34ab3e4c862f9e895a237108d42793efb1d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:27:03 -0800 Subject: ingest: handle publishing XML docs to kafka --- python/sandcrawler/ingest.py | 24 +++++++++++++++++++++--- python/sandcrawler_worker.py | 6 ++++++ 2 files changed, 27 insertions(+), 3 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 7ad0124..1a42b6a 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -66,6 +66,7 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) @@ -242,8 +243,9 @@ class IngestFileWorker(SandcrawlerWorker): 'pdf_meta': self.process_pdfextract(resource, file_meta), } elif ingest_type == "xml": - # TODO - return {} + return { + 'xml_meta': self.process_xml(resource, file_meta), + } else: raise NotImplementedError(f"process {ingest_type} hit") @@ -300,12 +302,28 @@ class IngestFileWorker(SandcrawlerWorker): if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict()) + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) result.page0_thumbnail = None result.text = None result.file_meta = None return result.to_pdftext_dict() + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=resource.body.encode('utf-8'), + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 537398e..b62fa80 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -174,6 +174,7 @@ def run_ingest_file(args): grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) + xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.env) sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, @@ -193,12 +194,17 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=thumbnail_topic, ) + xmldoc_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=xmldoc_topic, + ) worker = IngestFileWorker( grobid_client=grobid_client, sink=sink, grobid_sink=grobid_sink, thumbnail_sink=thumbnail_sink, pdftext_sink=pdftext_sink, + xmldoc_sink=xmldoc_sink, # don't SPNv2 for --bulk backfill try_spn2=not args.bulk, ) -- cgit v1.2.3 From 7dc382519187e32856f494af76248fa93aef7770 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:28:21 -0800 Subject: persist: XML and HTML persist workers --- python/sandcrawler/persist.py | 77 +++++++++++++++++++++++++++++++++++++++++-- python/sandcrawler_worker.py | 47 ++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 3 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index aa05195..81cf664 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -95,7 +95,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not k in raw: self.counts['skip-request-fields'] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml'): + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): print(raw['ingest_type']) self.counts['skip-ingest-type'] += 1 return None @@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): return request - def file_result_to_row(self, raw): + def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml'): + if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -159,6 +159,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result + def result_to_html_meta(self, record: dict) -> Optional[dict]: + raise NotImplementedError() + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -197,6 +200,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_meta')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + self.db.commit() return [] @@ -489,3 +498,65 @@ class PersistThumbnailWorker(SandcrawlerWorker): ) self.counts['s3-put'] += 1 + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[str] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None and len(key) == 40 and isinstance(key, str) + if 'sha1hex' in record: + assert key == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index b62fa80..24dbdd0 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -13,6 +13,7 @@ import datetime import raven from sandcrawler import * +from sandcrawler.persist import PersistXmlDocWorker, PersistHtmlTeiXmlWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable try: @@ -148,6 +149,44 @@ def run_persist_thumbnail(args): ) pusher.run() +def run_persist_xml_doc(args: argparse.Namespace) -> None: + consume_topic = f"sandcrawler-{args.env}.xml-doc" + worker = PersistXmlDocWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-xml-doc", + push_batches=False, + raw_records=True, + batch_size=25, + ) + pusher.run() + +def run_persist_html_teixml(args: argparse.Namespace) -> None: + consume_topic = f"sandcrawler-{args.env}.html-teixml" + worker = PersistHtmlTeiXmlWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-html-teixml", + push_batches=False, + raw_records=True, + batch_size=25, + ) + pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) worker = PersistPdfTrioWorker( @@ -293,6 +332,14 @@ def main(): help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) + sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc', + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) + + sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml', + help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) + sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) -- cgit v1.2.3 From 8f964b9b48572ac71f27ba64207816dfd3a6dc36 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 22:40:57 -0800 Subject: small fixes from local testing for XML ingest --- python/sandcrawler/ingest.py | 2 +- python/sandcrawler/persist.py | 11 ++++++++--- python/sandcrawler_worker.py | 2 -- 3 files changed, 9 insertions(+), 6 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 363485e..2e227bf 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -336,7 +336,7 @@ class IngestFileWorker(SandcrawlerWorker): ) def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('file', 'pdf'): + if not request.get('ingest_type') in ('file', 'pdf', 'xml'): return False return True diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 81cf664..c225d5a 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -20,7 +20,7 @@ grobid """ import os -from typing import Optional +from typing import Optional, AnyStr import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker @@ -518,12 +518,17 @@ class GenericPersistDocWorker(SandcrawlerWorker): self.s3_folder = kwargs.get('s3_folder', "unknown") self.doc_key = "unknown" - def process(self, record: dict, key: Optional[str] = None) -> None: + def process(self, record: dict, raw_key: Optional[AnyStr] = None) -> None: if record.get('status') != 'success' or not record.get(self.doc_key): return - assert key is not None and len(key) == 40 and isinstance(key, str) + assert raw_key is not None + if isinstance(raw_key, bytes): + key = raw_key.decode('utf-8') + elif isinstance(raw_key, str): + key = raw_key + assert len(key) == 40 if 'sha1hex' in record: assert key == record['sha1hex'] diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 24dbdd0..3681d7f 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -163,7 +163,6 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None: consume_topic=consume_topic, group="persist-xml-doc", push_batches=False, - raw_records=True, batch_size=25, ) pusher.run() @@ -182,7 +181,6 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: consume_topic=consume_topic, group="persist-html-teixml", push_batches=False, - raw_records=True, batch_size=25, ) pusher.run() -- cgit v1.2.3 From de71aa92d4c7c9d14dfccc0188032d4e7b10090f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 4 Nov 2020 18:10:00 -0800 Subject: html: actually publish HTML TEI-XML to body; fix dataflow though ingest a bit --- python/sandcrawler/ingest.py | 30 +++++++++++++++++++++++++----- python/sandcrawler_worker.py | 6 ++++++ 2 files changed, 31 insertions(+), 5 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index cc64fa5..e0778d2 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -73,6 +73,7 @@ class IngestFileWorker(SandcrawlerWorker): self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') self.xmldoc_sink = kwargs.get('xmldoc_sink') + self.htmlteixml_sink = kwargs.get('htmlteixml_sink') self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) @@ -82,6 +83,7 @@ class IngestFileWorker(SandcrawlerWorker): self.try_spn2 = kwargs.get('try_spn2', True) self.html_quick_mode = False self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 self.base_url_blocklist = [ # robot blocking @@ -339,13 +341,26 @@ class IngestFileWorker(SandcrawlerWorker): html_doc = HTMLParser(resource.body) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio html_body = html_extract_body_teixml(resource.body) html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('tei_xml')) - assert html_biblio + if html_scope not in ('article-fulltext', 'unknown'): + html_body.pop("tei_xml", None) + return dict( + status="html-body-wrong-scope", + html_biblio=html_biblio, + html_scope=html_scope, + ) raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) - assert len(raw_resources) <= 200 + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio, + html_scope=html_scope, + ) when = parse_cdx_datetime(resource.cdx.datetime) @@ -355,6 +370,11 @@ class IngestFileWorker(SandcrawlerWorker): else: full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + if self.htmlteixml_sink and html_body['status'] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + + html_body.pop("tei_xml", None) + return dict( html_body=html_body, html_biblio=json.loads(html_biblio.json(exclude_none=True)), @@ -587,9 +607,9 @@ class IngestFileWorker(SandcrawlerWorker): info = self.process_hit(ingest_type, resource, file_meta) result.update(info) - # scope is getting calculated in process_hit() - if result.get('scope') and result['scope'] not in ('article-fulltext', 'unknown'): - result['status'] = "wrong-scope" + # check if processing turned up an error + if info.get('status') not in ('success', None): + result['status'] = info['status'] return result result['status'] = "success" diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 3681d7f..6be8bac 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -212,6 +212,7 @@ def run_ingest_file(args): pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.env) + htmlteixml_topic = "sandcrawler-{}.html-teixml".format(args.env) sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, @@ -235,6 +236,10 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=xmldoc_topic, ) + htmlteixml_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=htmlteixml_topic, + ) worker = IngestFileWorker( grobid_client=grobid_client, sink=sink, @@ -242,6 +247,7 @@ def run_ingest_file(args): thumbnail_sink=thumbnail_sink, pdftext_sink=pdftext_sink, xmldoc_sink=xmldoc_sink, + htmlteixml_sink=htmlteixml_sink, # don't SPNv2 for --bulk backfill try_spn2=not args.bulk, ) -- cgit v1.2.3