From 97ffdc17941b1272a8c7f05c0d1353cd28761280 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 5 Mar 2020 00:41:14 -0800 Subject: persist: ingest_request tool (with no ingest_file_result) --- python/sandcrawler/__init__.py | 2 +- python/sandcrawler/persist.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) (limited to 'python/sandcrawler') diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index b52d039..3d49096 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow from .ingest import IngestFileWorker -from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker +from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 88ac6b5..5960756 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -198,6 +198,35 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.db.commit() return [] +class PersistIngestRequestWorker(PersistIngestFileResultWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + if not batch: + return [] + + requests = [self.request_to_row(raw) for raw in batch] + requests = [r for r in requests if r] + + if requests: + resp = self.db.insert_ingest_request(self.cur, requests) + self.counts['insert-requests'] += resp[0] + self.counts['update-requests'] += resp[1] + + self.db.commit() + return [] class PersistGrobidWorker(SandcrawlerWorker): -- cgit v1.2.3