import requests import time import warnings import dateutil.parser import json from .exceptions import * from .constants import * __version__ = (0, 0, 0) BOT_DEV_URL = "https://github.com/bnewbold/basebasebase" BOT_DEV_EMAIL = "bnewbold@robocracy.org" BOT_USER_AGENT = "basebasebase/%d.%d (%s; %s)" % ( __version__[0], __version__[1], BOT_DEV_URL, BOT_DEV_EMAIL) DEFAULT_LANG = "en" DEFAULT_SITE = "enwiki" class WikibaseServer: ''' This class represents a Wikibase API endpoint. It isn't called a "Site" because that term is ambiguous in this context. ''' def __init__(self, api_url, lang=DEFAULT_LANG, auth=None, is_bot=False, user_agent=BOT_USER_AGENT, maxlag=5, site=DEFAULT_SITE, throttle_delay=0.): self.api_url = str(api_url) self.lang = DEFAULT_LANG self.throttle_delay = throttle_delay self.site = site self.is_bot = is_bot self.session = requests.Session() assert(auth is None or len(auth) == 2) self.session.auth = auth self.session.headers.update({ 'User-Agent': user_agent, 'Api-User-Agent': user_agent }) if maxlag is not None: self.session.params['maxlag'] = int(maxlag) self.session.params['format'] = 'json' self.session.params['uselang'] = 'user' def __repr__(self): return "" % self.api_url def _check_api_err(self, action, resp): if 'warnings' in resp: for k in resp['warnings']: warnings.warn(str((k, resp['warnings'][k])), Warning) if 'error' in resp: try: raise WikibaseAPIError(resp['error']['code'], resp['error']['info'], action) except KeyError: raise WikibaseException(resp['error']) def _api_call(self, method, action, params): params['action'] = action if self.throttle_delay: time.sleep(self.throttle_delay) if method.upper() == "GET": resp = self.session.get(self.api_url, params=params) elif method.upper() == "POST": resp = self.session.post(self.api_url, params=params) else: raise ValueError("method must be GET or POST") resp_json = resp.json() self._check_api_err(action, resp_json) return resp_json def _post(self, action, params): return self._api_call("POST", action, params) def _get(self, action, params=dict()): return self._api_call("GET", action, params) def _get_csrf_token(self): resp = self._get('query', dict(meta="tokens", type="csrf")) return resp['query']['tokens']['csrftoken'] def check(self): # Check that wikibase API calls work (instead of just "action=query") self._get("wbparsevalue", dict(datatype="time", values="1999-12-31|now")) def login(self, user=None, passwd=None, is_bot=None, force_http=False): if user is None or passwd is None: raise WikibaseException("Need user and pass to attempt log-in") if not force_http and not self.api_url.lower().startswith("https:"): raise WikibaseException("Cowardly refusing to log in without https") if is_bot is not None: self.is_bot = bool(is_bot) self.user = user # not keeping password around; don't need it # First partially log-in to get a token... self.session.params.pop('assert', None) resp = self._post("login", dict(lgname=self.user, lgpassword=passwd)) token = resp['login']['token'] # Then really log-in resp = self._post("login", dict(lgname=self.user, lgpassword=passwd, lgtoken=token)) result = resp['login']['result'] if result != 'Success': raise WikibaseAccountError(user, result) if self.is_bot: self.session.params['assert'] = 'bot' else: self.session.params['assert'] = 'user' # Simple ping to check that we are actually logged in self._get("query") def logout(self): self.user = None self.session.params.pop('assert', None) self._get("logout") def _get_entities(self, query, expected, site=None, lang=None, redirects=True, as_titles=False): """ NB: doesn't handle case of multiple sites, single title """ if len(query) == 0: return [] params = { 'sites': site or self.site, 'languages': lang or self.lang, 'redirects': redirects and "yes" or "no", } if as_titles: params['titles'] = '|'.join(query) else: params['ids'] = '|'.join(query) try: resp = self._get("wbgetentities", params) except WikibaseAPIError as wae: if wae.code == "no-such-entity": raise MissingEntityError(info=wae.info) else: raise wae if not 'success' in resp: raise WikibaseException("Expected 'success' in wbgetentities response") entities = resp['entities'].values() for e in entities: if 'missing' in e or e['type'] != expected: if 'title' in e: raise MissingEntityError(title=e['title']) elif 'id' in e: raise MissingEntityError(id=e['id']) else: raise MissingEntityError() return entities def get_items(self, query, **kwargs): if len(query) == 0: return [] as_titles = False if type(query[0]) is int: # Convert list of ints to QIDs query = ["Q%d" % x for x in query] elif type(query[0]) is str and query[0][0] in "P" and query[0][1:].isdigit(): raise ValueError("query must be a list of QIDs, not PIDs") elif not (type(query[0]) is str and query[0][0] in "Q" and query[0][1:].isdigit()): # Must be list of titles as_titles = True try: entities = self._get_entities(query, as_titles=as_titles, expected='item', **kwargs) except MissingItemError as wee: # Case entity error to item error raise MissingItemError(id=wee.id, title=wee.title) items = [WikibaseItem.from_dict(e) for e in entities] return items def get_item(self, query, **kwargs): return self.get_items((query, ), **kwargs)[0] def get_properties(self, query, **kwargs): if len(query) == 0: return [] if type(query[0]) is int: # Convert list of ints to PIDs query = ["P%d" % x for x in query] elif not (type(query[0]) is str and query[0][0] in "P" and query[0][1:].isdigit()): raise ValueError("query must be a list of PIDs") try: entities = self._get_entities(query, as_titles=False, expected='property', **kwargs) except MissingEntityError as wee: # Case entity error to item error raise MissingPropertyError(id=wee.id, title=wee.title) items = [WikibaseProperty.from_dict(e) for e in entities] return items def get_property(self, query, **kwargs): return self.get_properties((query, ), **kwargs) def _search_entities(self, query, etype, limit=7, lang=None): resp = self._get("wbsearchentities", dict(search=query, language=lang or self.lang, type=etype, limit=limit)) if not 'success' in resp: raise WikibaseException("Expected 'success'") results = resp['search'] return [dict(id=r['id'], label=r['label'], #description=r['description'], url=r['url']) for r in results] def search_items(self, query, fetch=False, lang=None, **kwargs): lang = lang or self.lang results = self._search_entities(query, 'item', lang=lang, **kwargs) if not fetch: return results return self.get_items([short['id'] for short in results], lang=lang) def search_properties(self, query, fetch=False, lang=None, **kwargs): lang = lang or self.lang results = self._search_entities(query, 'property', lang=lang, **kwargs) if not fetch: return results return get_properties([short['id'] for short in results], lang=lang) def create(self, thing): if not isinstance(thing, (WikibaseEntity, WikibaseStatement)): raise ValueError( "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement") thing.create(self) def save(self, thing): if not isinstance(thing, (WikibaseEntity, WikibaseStatement)): raise ValueError( "Takes one of: WikibaseItem, WikibaseProperty, WikibaseStatement") thing.save(self) class WikibaseEntity: ''' Base class for WikibaseItem and WikibaseProperty ''' def __init__(self, labels=[], descriptions=[], aliases=[], dbid=None, rev=None, rev_timestamp=None, statements=[], lang=DEFAULT_LANG): if type(labels) is str: labels = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": labels}} if type(descriptions) is str: descriptions = {DEFAULT_LANG: {"language": DEFAULT_LANG, "value": descriptions}} if type(aliases) is str: aliases = (aliases, ) if type(aliases) in (list, tuple): aliases = {DEFAULT_LANG: [{"language": DEFAULT_LANG, "value": al} for al in aliases] } self.labels = labels self.descriptions = descriptions self.aliases = aliases self.dbid = dbid self.rev = rev self.rev_timestamp = rev_timestamp self.statements = statements self.entity_type = None @classmethod def from_dict(cls, j): we = cls( dbid=j['id'], rev=j['lastrevid'], rev_timestamp=dateutil.parser.parse(j['modified']), aliases=j['aliases'], descriptions=j['descriptions'], ) claims = j['claims'] for prop in claims: for c in claims[prop]: we.statements.append(WikibaseStatement.from_dict(c)) return we def to_dict(self, new=False): d = dict() d['labels'] = self.labels d['descriptions'] = self.descriptions d['aliases'] = self.aliases if self.entity_type: d['type'] = self.entity_type d['statements'] = [s.to_dict() for s in self.statements] if not new: if self.dbid: d['id'] = self.dbid if self.rev: d['lastrevid'] = self.rev return d def add_statement(self, statement): raise NotImplementedError def add_label(self, label): raise NotImplementedError def add_alias(self, label): raise NotImplementedError def create(self, srv, summary=None): # TODO: first have to traverse statements and find/sync any properties? if summary is None: summary = "Created new Entity" data = self.to_dict(new=True) srv._post("wbeditentity", { "bot": srv.is_bot, "new": self.entity_type, "token": srv._get_csrf_token(), "summary": summary, "data": data, }) def save(self, srv, summary=None): if summary is None: summary = "Changes to existing Entity" data = self.to_dict(new=True) srv._post("wbeditentity", { "bot": srv.is_bot, "new": self.entity_type, "token": srv._get_csrf_token(), "summary": summary, "data": data, }) class WikibaseItem(WikibaseEntity): def __init__(self, *args, **kwargs): self.sitelinks = kwargs.pop('sitelinks', []) super().__init__(*args, **kwargs) self.entity_type = "item" def __repr__(self): return "" % self.qid() @classmethod def from_dict(cls, d): wi = super().from_dict(d) wi.sitelinks = d['sitelinks'] return wi def to_dict(self): d = super().to_dict() d['sitelinks'] = self.sitelinks return d def qid(self): if self.dbid: assert self.dbid.startswith("Q") return self.dbid class WikibaseProperty(WikibaseEntity): def __init__(self,*args, **kwargs): self.datatype = kwargs.pop('datatype', None) super().__init__(*args, **kwargs) self.entity_type = "property" def __repr__(self): return "" % self.pid() @classmethod def from_dict(cls, d): wp = super().from_dict(d) wp.datatype = d['datatype'] return wp def to_dict(self): d = super().to_dict() d['datatype'] = self.datatype return d def pid(self): if self.dbid: assert self.dbid.startswith("P") return self.dbid class WikibaseStatement: def __init__(self, qualifiers=[], references=[], rank='normal', mainsnak=None, guid=None): self.guid = guid self.mainsnak = mainsnak self.qualifiers = qualifiers self.references = references self.rank = rank def __repr__(self): if self.property: return "" % (self.property.pid, self.value) else: return "" @classmethod def from_dict(cls, d): if not d['type'] == 'statement': raise WikibaseException("Tried to parse a non-statement claim?") quals = [] for prop_set in d.get('qualifiers', {}).values(): quals.extend([WikibaseSnak.from_dict(q) for q in prop_set]) # TODO: handle references hashes refs = [] for ref in d.get("references", []): # We get a list of lists; deep! snaks_lists = ref.get("snaks", {}).values() snaks = [] for sl in snaks_lists: snaks.extend(sl) refs.append([WikibaseSnak.from_dict(q) for q in snaks]) ws = cls( mainsnak=WikibaseSnak.from_dict(d['mainsnak']), guid=d['id'], rank=d['rank'], qualifiers=quals, references=refs, ) return ws def to_dict(self): d = { "type": "statement", "references": [[snak.to_dict() for snak in snaklist] for snaklist in self.references], "qualifiers": [snak.to_dict() for snak in self.qualifiers], } if self.mainsnak is not None: d['mainsnak'] = self.mainsnak.to_dict() if self.rank is not None: d['rank'] = self.rank if self.guid is not None: d['guid'] = self.guid return d class WikibaseSnak: def __init__(self, property=None, value=None, snaktype=None, hash=None, datatype=None): if snaktype is None: if value is None: snaktype = "novalue" else: snaktype = "somevalue" self.property = property self.snaktype = snaktype self.datatype = datatype self.value = value self.hash = None def __repr__(self): return "" % ( self.property, self.snaktype) @classmethod def from_dict(cls, d): ws = cls( property=d['property'], snaktype=d['snaktype'], hash=d.get('hash', None), ) if ws.snaktype == 'value': ws.datatype = d['datatype'] if ws.datatype == "wikibase-item": ws.value = "Q%d" % int(d['datavalue']['value']['numeric-id']) if ws.datatype == "wikibase-property": ws.value = "P%d" % int(d['datavalue']['value']['numeric-id']) else: ws.value = d['datavalue']['value'] return ws def to_dict(self): d = {'snaktype': self.snaktype} if self.hash: d['hash'] = self.hash if self.value is not None: d['value'] = self.value if self.datatype is not None: d['datatype'] = self.datatype if self.property is not None: d['property'] = self.property return {}