aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-03-05 00:41:14 -0800
committerBryan Newbold <bnewbold@archive.org>2020-03-05 00:41:14 -0800
commit97ffdc17941b1272a8c7f05c0d1353cd28761280 (patch)
tree07309077043139328c9be809ac5f26971a2729c9 /python
parent173e5e88de4160a63949ff6e263123c4a25b2017 (diff)
downloadsandcrawler-97ffdc17941b1272a8c7f05c0d1353cd28761280.tar.gz
sandcrawler-97ffdc17941b1272a8c7f05c0d1353cd28761280.zip
persist: ingest_request tool (with no ingest_file_result)
Diffstat (limited to 'python')
-rwxr-xr-xpython/persist_tool.py18
-rw-r--r--python/sandcrawler/__init__.py2
-rw-r--r--python/sandcrawler/persist.py29
3 files changed, 48 insertions, 1 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py
index 80b1156..f0beef8 100755
--- a/python/persist_tool.py
+++ b/python/persist_tool.py
@@ -85,6 +85,17 @@ def run_ingest_file_result(args):
)
pusher.run()
+def run_ingest_request(args):
+ worker = PersistIngestRequestWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=200,
+ )
+ pusher.run()
+
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -149,6 +160,13 @@ def main():
help="ingest_file_result file to import from (or '-' for stdin)",
type=argparse.FileType('r'))
+ sub_ingest_request = subparsers.add_parser('ingest-request',
+ help="backfill a ingest_request JSON dump into postgresql")
+ sub_ingest_request.set_defaults(func=run_ingest_file_result)
+ sub_ingest_request.add_argument('json_file',
+ help="ingest_request to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("Tell me what to do!", file=sys.stderr)
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index b52d039..3d49096 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 88ac6b5..5960756 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -198,6 +198,35 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.db.commit()
return []
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ requests = [self.request_to_row(raw) for raw in batch]
+ requests = [r for r in requests if r]
+
+ if requests:
+ resp = self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
+
+ self.db.commit()
+ return []
class PersistGrobidWorker(SandcrawlerWorker):