import elasticsearch import fatcat_openapi_client import pytest from dotenv import load_dotenv from fatcat_openapi_client import * import fatcat_web from fatcat_tools import authenticated_api ES_CONTAINER_STATS_RESP = { 'timed_out': False, 'aggregations': { 'container_stats': {'buckets': { 'is_preserved': {'doc_count': 461939}, 'in_kbart': {'doc_count': 461939}, 'in_web': {'doc_count': 2797}, }}, 'preservation': { 'buckets': [ {'key': 'bright', 'doc_count': 444}, {'key': 'dark', 'doc_count': 111}, ], 'sum_other_doc_count': 0, }, 'release_type': { 'buckets': [ {'key': 'article-journal', 'doc_count': 456}, {'key': 'book', 'doc_count': 123}, ], 'sum_other_doc_count': 0, }, }, 'hits': {'total': 461939, 'hits': [], 'max_score': 0.0}, '_shards': {'successful': 5, 'total': 5, 'skipped': 0, 'failed': 0}, 'took': 50 } # TODO: this should not be empty ES_CONTAINER_RANDOM_RESP = { 'timed_out': False, 'hits': {'total': 461939, 'hits': [], 'max_score': 0.0}, '_shards': {'successful': 5, 'total': 5, 'skipped': 0, 'failed': 0}, 'took': 50 } ES_RELEASE_EMPTY_RESP = { 'timed_out': False, 'hits': {'total': 0, 'hits': [], 'max_score': 0.0}, '_shards': {'successful': 5, 'total': 5, 'skipped': 0, 'failed': 0}, 'took': 50 } @pytest.fixture def full_app(mocker): load_dotenv(dotenv_path="./example.env") fatcat_web.app.testing = True fatcat_web.app.debug = False fatcat_web.app.config['WTF_CSRF_ENABLED'] = False # mock out ES client requests, so they at least fail fast fatcat_web.app.es_client = elasticsearch.Elasticsearch("mockbackend") mocker.patch('elasticsearch.connection.Urllib3HttpConnection.perform_request') return fatcat_web.app @pytest.fixture def app(full_app): return full_app.test_client() @pytest.fixture def app_admin(app): ADMIN_DEV_TOKEN = "AgEPZGV2LmZhdGNhdC53aWtpAhYyMDE5MDEwMS1kZXYtZHVtbXkta2V5AAImZWRpdG9yX2lkID0gYWFhYWFhYWFhYWFhYmt2a2FhYWFhYWFhYWkAAht0aW1lID4gMjAxOS0wNC0wNFQyMzozMjo0NloAAAYgrN3jjy0mgEqIydTFfsOLYSS55dz6Fh2d1CGMNQFLwcQ=" rv = app.post('/auth/token_login', data=dict(token=ADMIN_DEV_TOKEN), follow_redirects=True) assert rv.status_code == 200 return app @pytest.fixture def api(): load_dotenv(dotenv_path="./example.env") api_client = authenticated_api("http://localhost:9411/v0") api_client.editor_id = "aaaaaaaaaaaabkvkaaaaaaaaae" return api_client @pytest.fixture def api_dummy_entities(api): """ This is a sort of bleh fixture. Should probably create an actual object/class with create/accept/clean-up methods? """ c1 = CreatorEntity(display_name="test creator deletion") j1 = ContainerEntity(name="test journal deletion") r1 = ReleaseEntity(title="test release creator deletion", ext_ids=ReleaseExtIds()) f1 = FileEntity() fs1 = FilesetEntity() wc1 = WebcaptureEntity( timestamp="2000-01-01T12:34:56Z", original_url="http://example.com/", ) eg = quick_eg(api) c1 = api.get_creator(api.create_creator(eg.editgroup_id, c1).ident) j1 = api.get_container(api.create_container(eg.editgroup_id, j1).ident) r1 = api.get_release(api.create_release(eg.editgroup_id, r1).ident) w1 = api.get_work(r1.work_id) f1 = api.get_file(api.create_file(eg.editgroup_id, f1).ident) fs1 = api.get_fileset(api.create_fileset(eg.editgroup_id, fs1).ident) wc1 = api.get_webcapture(api.create_webcapture(eg.editgroup_id, wc1).ident) return { "api": api, "editgroup": eg, "creator": c1, "container": j1, "file": f1, "fileset": fs1, "webcapture": wc1, "release": r1, "work": w1, } def test_get_changelog_entry(api): """Check that fixture is working""" cl = api.get_changelog_entry(1) assert cl ## Helpers ################################################################## def quick_eg(api_inst): eg = api_inst.create_editgroup(fatcat_openapi_client.Editgroup()) return eg