From 86107e39b761e5b799562af662219fda04ade1be Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 30 Sep 2021 15:09:42 -0700 Subject: refactoring; progress on filesets --- python/ingest_tool.py | 28 ++++++++++++++++++++-------- python/sandcrawler/__init__.py | 3 ++- python/sandcrawler/ingest_file.py | 5 +++++ 3 files changed, 27 insertions(+), 9 deletions(-) (limited to 'python') diff --git a/python/ingest_tool.py b/python/ingest_tool.py index 20b6d67..2a7a1a2 100755 --- a/python/ingest_tool.py +++ b/python/ingest_tool.py @@ -5,7 +5,8 @@ import json import argparse from http.server import HTTPServer -from sandcrawler.ingest import IngestFileRequestHandler, IngestFileWorker +from sandcrawler.ingest_file import IngestFileRequestHandler, IngestFileWorker +from sandcrawler.ingest_fileset import IngestFilesetWorker def run_single_ingest(args): @@ -17,23 +18,34 @@ def run_single_ingest(args): ) if args.force_recrawl: request['force_recrawl'] = True - ingester = IngestFileWorker( - try_spn2=not args.no_spn2, - html_quick_mode=args.html_quick_mode, - ) + if request['ingest_type'] in ['dataset',]: + ingester = IngestFilesetWorker( + try_spn2=not args.no_spn2, + ) + else: + ingester = IngestFileWorker( + try_spn2=not args.no_spn2, + html_quick_mode=args.html_quick_mode, + ) result = ingester.process(request) print(json.dumps(result, sort_keys=True)) return result def run_requests(args): # TODO: switch to using JsonLinePusher - ingester = IngestFileWorker( + file_worker = IngestFileWorker( try_spn2=not args.no_spn2, html_quick_mode=args.html_quick_mode, ) + fileset_worker = IngestFilesetWorker( + try_spn2=not args.no_spn2, + ) for l in args.json_file: request = json.loads(l.strip()) - result = ingester.process(request) + if request['ingest_type'] in ['dataset',]: + result = fileset_worker.process(request) + else: + result = file_worker.process(request) print(json.dumps(result, sort_keys=True)) def run_api(args): @@ -48,7 +60,7 @@ def main(): subparsers = parser.add_subparsers() sub_single= subparsers.add_parser('single', - help="ingests a single file URL") + help="ingests a single base URL") sub_single.set_defaults(func=run_single_ingest) sub_single.add_argument('--release-id', help="(optional) existing release ident to match to") diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index e461462..724a39c 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -4,7 +4,8 @@ from .pdftrio import PdfTrioClient, PdfTrioWorker, PdfTrioBlobWorker from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime, clean_url from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper from .ia import WaybackClient, WaybackError, WaybackContentError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow -from .ingest import IngestFileWorker +from .ingest_file import IngestFileWorker +from .ingest_fileset import IngestFilesetWorker from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient from .pdfextract import PdfExtractWorker, PdfExtractBlobWorker diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py index b852c69..a02e923 100644 --- a/python/sandcrawler/ingest_file.py +++ b/python/sandcrawler/ingest_file.py @@ -25,6 +25,8 @@ from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.xml import xml_reserialize +from sandcrawler.platforms.generic import DirectFileHelper + MAX_BODY_SIZE_BYTES = 128*1024*1024 @@ -520,6 +522,9 @@ class IngestFileWorker(SandcrawlerWorker): return True def process(self, request: dict, key: Any = None) -> dict: + return self.process_file(request, key=key) + + def process_file(self, request: dict, key: Any = None) -> dict: # old backwards compatibility if request.get('ingest_type') == 'file': -- cgit v1.2.3