From 0c4b686cf6c536087683d7982558bec3c5696c7f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 15 Jan 2020 13:54:29 -0800 Subject: persist worker: implement updated ingest result semantics --- python/sandcrawler/db.py | 2 +- python/sandcrawler/persist.py | 27 ++++++++++++++++----------- 2 files changed, 17 insertions(+), 12 deletions(-) (limited to 'python/sandcrawler') diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 1a47b0b..a2407b5 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -156,7 +156,7 @@ class SandcrawlerPostgresClient: d.get('grobid_version') or None, d['status_code'], d['status'], - d.get('fatcat_release') or None, + d.get('fatcat_release') or d.get('metadata', {}).get('fatcat_release') or None, d.get('updated') or datetime.datetime.now(), d.get('metadata') or None , ) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 7cb4f8d..2463afa 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -43,7 +43,11 @@ class PersistCdxWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) - resp = self.db.insert_cdx(self.cur, batch) + # filter to full CDX lines, no liveweb + cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])] + resp = self.db.insert_cdx(self.cur, cdx_batch) + if len(cdx_batch) < len(batch): + self.counts['skip'] += len(batch) - len(cdx_batch) self.counts['insert-cdx'] += resp[0] self.counts['update-cdx'] += resp[1] self.db.commit() @@ -144,14 +148,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): } terminal = raw.get('terminal') if terminal: - result['terminal_url'] = terminal['url'] - if terminal.get('status_code') == None and terminal.get('http_status'): - terminal['status_code'] = terminal['http_status'] - result['terminal_status_code'] = int(terminal['status_code']) - if raw.get('file_meta'): - result['terminal_sha1hex'] = raw['file_meta']['sha1hex'] - if raw.get('cdx') and raw['cdx']['url'] == terminal['url']: - result['terminal_dt'] = raw['cdx']['datetime'] + result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') + result['terminal_dt'] = terminal.get('terminal_dt') + result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') + if result['terminal_status_code']: + result['terminal_status_code'] = int(result['terminal_status_code']) + result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result def push_batch(self, batch): @@ -176,8 +178,11 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['update-results'] += resp[1] # these schemas match, so can just pass through - # TODO: need to include warc_path etc in ingest worker, when possible - cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] + cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] + revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] + cdx_batch.extend(revisit_cdx_batch) + # filter to full CDX lines, with full warc_paths (not liveweb) + cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] if cdx_batch: resp = self.db.insert_cdx(self.cur, cdx_batch) self.counts['insert-cdx'] += resp[0] -- cgit v1.2.3