aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py2
-rw-r--r--python/sandcrawler/persist.py29
2 files changed, 30 insertions, 1 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index b52d039..3d49096 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 88ac6b5..5960756 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -198,6 +198,35 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.db.commit()
return []
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ requests = [self.request_to_row(raw) for raw in batch]
+ requests = [r for r in requests if r]
+
+ if requests:
+ resp = self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
+
+ self.db.commit()
+ return []
class PersistGrobidWorker(SandcrawlerWorker):