import sys import csv import datetime from typing import Iterable, Optional, Dict, Any, List from collections import Counter from dataclasses import dataclass, field import ftfy from chocula.util import clean_str, clean_issn, merge_spans from chocula.config import ChoculaConfig from chocula.database import DirectoryInfo, IssnDatabase, HomepageUrl # Portico files have weirdly large field sizes csv.field_size_limit(1310720) THIS_YEAR = datetime.date.today().year class DirectoryLoader(): source_slug: str = "GENERIC" def __init__(self, config: ChoculaConfig): self.config = config def open_file(self) -> Iterable: raise NotImplementedError() def parse_record(self, record) -> Optional[DirectoryInfo]: raise NotImplementedError() def index_file(self, db) -> Counter: print(f"##### Loading {self.source_slug}...", file=sys.stderr) counts: Counter = Counter() cur = db.db.cursor() for record in self.open_file(): counts['total'] += 1 info = self.parse_record(record) if info: status = db.insert_directory(info, cur=cur) counts[status] += 1 cur.close() db.db.commit() return counts @dataclass class KbartRecord: issnl: Optional[str] issne: Optional[str] issnp: Optional[str] title: Optional[str] publisher: Optional[str] start_year: Optional[int] end_year: Optional[int] start_volume: Optional[str] end_volume: Optional[str] url: Optional[HomepageUrl] embargo: Optional[str] year_spans: List[Any] class KbartLoader(): source_slug: str = "GENERIC" def __init__(self, config: ChoculaConfig): self.config = config def file_path(self) -> str: #return self.config.TEMPLATE.filepath) raise NotImplementedError() def open_file(self) -> Iterable: raw_file = open(self.file_path(), 'rb').read().decode(errors='replace') fixed_file = ftfy.fix_text(raw_file) reader = csv.DictReader(fixed_file.split('\n'), delimiter='\t') return reader def parse_record(self, row: dict, issn_db: IssnDatabase) -> Optional[KbartRecord]: issne: Optional[str] = clean_issn(row['online_identifier'] or "") issnp: Optional[str] = clean_issn(row['print_identifier'] or "") issnl: Optional[str] = None if issne: issnl = issn_db.issn2issnl(issne) if issnp and not issnl: issnl = issn_db.issn2issnl(issnp) start_year: Optional[int] = None end_year: Optional[int] = None if row['date_first_issue_online']: start_year = int(row['date_first_issue_online'][:4]) if row['date_last_issue_online']: end_year = int(row['date_last_issue_online'][:4]) end_volume = row['num_last_vol_online'] # hack to handle open-ended preservation if end_year is None and end_volume and '(present)' in end_volume: end_year = THIS_YEAR record = KbartRecord( issnl=issnl, issnp=issnp, issne=issne, title=clean_str(row['publication_title']), publisher=clean_str(row['publisher_name']), url=HomepageUrl.from_url(row['title_url']), embargo=clean_str(row['embargo_info']), start_year=start_year, end_year=end_year, start_volume=clean_str(row['num_first_vol_online']), end_volume=clean_str(row['num_last_vol_online']), year_spans=[], ) if record.start_volume == 'null': record.start_volume = None if record.end_volume == 'null': record.end_volume = None return record def index_file(self, db) -> Counter: """ Transforms a KBART file into a dict of dicts; but basically a list of JSON objects, one per journal. KBART files can have multiple rows per journal (eg, different year spans), which is why this pass is needed. """ print(f"##### Loading {self.source_slug} KBART...", file=sys.stderr) counts: Counter = Counter() kbart_dict: Dict[str, KbartRecord] = dict() for row in self.open_file(): counts['total'] += 1 record = self.parse_record(row, db.issn_db) if record is None: counts['skip-parse'] += 1 continue elif not record.issnl: counts['skip-issnl'] += 1 continue elif record.start_year is None or record.end_year is None: counts['partial-missing-years'] += 1 counts['parsed'] += 1 existing = kbart_dict.get(record.issnl, record) if record.start_year and record.end_year: old_spans = existing.year_spans or [] if not record.start_year <= record.end_year: new_spans = [[record.end_year, record.start_year]] else: new_spans = [[record.start_year, record.end_year]] record.year_spans = merge_spans(old_spans, new_spans) kbart_dict[record.issnl] = record counts['unique-issnl'] = len(kbart_dict) cur = db.db.cursor() for issnl, record in kbart_dict.items(): info = DirectoryInfo( directory_slug=self.source_slug, issnl=record.issnl, issne=record.issne, issnp=record.issnp, name=record.title, publisher=record.publisher, homepage_urls=[], extra=dict(year_spans=record.year_spans), ) if record.url: info.homepage_urls.append(record.url) status = db.insert_directory(info, cur=cur) counts[status] += 1 cur.close() db.db.commit() return counts