python_arango_ogm.db.pao_database

  1import os
  2from typing import Any, Dict, Sequence, Literal
  3import uuid
  4
  5from arango import ArangoClient
  6from loguru import logger
  7
  8from python_arango_ogm.db.pao_db_base import PAODBBase
  9from python_arango_ogm.db.pao_model_discovery import PAOModelDiscovery
 10from python_arango_ogm.db.pao_queries import PAOQueries
 11from python_arango_ogm.db.pao_migration_model import PAOMigrationModel
 12
 13from python_arango_ogm.utils.singleton import Singleton
 14
 15
 16class PAODatabase(PAODBBase):
 17    __metaclass__ = Singleton
 18    VALID_SORT_VALUES = ["ASC", "DESC", ""]
 19
 20    def __init__(self, delete_db: bool = False):
 21        # TODO: These probably don't need to be members:
 22        print("CONSTRUCTING DB")
 23        self.app_db_name = os.getenv('PAO_APP_DB_NAME')
 24        host = os.getenv('PAO_DB_HOST', 'localhost')
 25        port = os.getenv('PAO_DB_PORT', 8529)
 26        protocol = os.getenv('PAO_DB_PROTOCOL', 'http')
 27        root_user = os.getenv('PAO_DB_ROOT_USER')
 28        root_password = os.getenv('PAO_DB_ROOT_PASS')
 29
 30        if self.app_db_name is None:
 31            raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.")
 32
 33        if root_user is None:
 34            raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.")
 35
 36        if root_password is None:
 37            raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.')
 38
 39        self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}")
 40
 41        # Connect to "_system" database as root user.
 42        # This returns an API wrapper for "_system" database.
 43        # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...")
 44        self.sys_db = self.client.db('_system', username=root_user, password=root_password)
 45
 46        self.db = None
 47
 48        # Create a new database if it does not exist.
 49        self.setup_app_database(delete_db)
 50
 51    def setup_app_database(self, delete_db):
 52        """
 53        Setup app databases; deleting if specified by delete_db
 54        This is called by constructor, but since this based on a
 55        Singleton metaclass, it might neccesary to call it manually
 56        after migrations have been appplied and whatnot.
 57        """
 58        app_user = os.getenv('PAO_APP_DB_USER', 'root')
 59        app_pass = os.getenv('PAO_APP_DB_PASS')
 60
 61        if app_pass is None:
 62            raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.')
 63
 64        create_db = True
 65        if self.sys_db.has_database(self.app_db_name):
 66            if delete_db:
 67                self.sys_db.delete_database(self.app_db_name, ignore_missing=True)
 68            else:
 69                create_db = False
 70        if create_db:
 71            was_created = self.sys_db.create_database(self.app_db_name, [{
 72                'username': app_user,
 73                'password': app_pass,
 74                'active': True,
 75            }])
 76            if not was_created:
 77                raise ValueError(f"Database {self.app_db_name} was not created.")
 78
 79        self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass)
 80        self.inject_into_models()
 81
 82    def inject_into_models(self):
 83        """ Inject database into models, as PAODatabase is where the functionality is implemented."""
 84        discoverer = PAOModelDiscovery()
 85        model_hash: Dict[str, any] = discoverer.discover()
 86        for m, model in model_hash.items():
 87            logger.debug(f"Injecting DB into model {m}")
 88            model.db = self
 89
 90        # Inject into built-in models:
 91        PAOMigrationModel.db = self
 92
 93    def get_db(self):
 94        """ Return underlying python-arango database"""
 95        return self.db
 96
 97    def find_by_key(self, collection_name: str, key: Any):
 98        """
 99          Find document on collection by given key value:
100        """
101        return self.db.collection(collection_name).get(key)
102
103    def get_related_edges(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict):
104        """
105        Gets `association_collection_name` edges of `collection_name`;
106        looking up by the keys and values in `lookup_key_dict`:
107        """
108        lookup_filter = self._format_lookup_filter(lookup_key_dict)
109        aql = PAOQueries.AQL_QUERY_RELATED_EDGES.format(lookup_filter=lookup_filter)
110        edge_collection_name = f"{collection_name}__{association_collection_name}"
111        logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]")
112        cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={
113            '@collection': collection_name,
114            '@edge_collection': edge_collection_name
115        })
116
117        return self._cursor_doc_generator(cursor)
118
119    def get_related_vertices(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict):
120        """
121        Lookup associated vertices (`association_collection_name`) through edges,
122        from given collection_name, using keys and values in lookup_key_dict:
123        """
124        lookup_filter = self._format_lookup_filter(lookup_key_dict)
125        aql = PAOQueries.AQL_QUERY_RELATED_VERTICES.format(lookup_filter=lookup_filter)
126        edge_collection_name = f"{collection_name}__{association_collection_name}"
127        logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]")
128        cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={
129            '@collection': collection_name,
130            '@edge_collection': edge_collection_name,
131            '@association_collection': association_collection_name
132        })
133
134        return self._cursor_doc_generator(cursor)
135
136    def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None):
137        """
138        Find a single document by given collection_name,
139        looking up by the keys and values in lookup_key_dict:
140        """
141        docs = self.get_by_attributes(collection_name, lookup_key_dict)
142        try:
143            result = docs.__next__()
144        except StopIteration:
145            result = None
146
147        return result
148
149    def remove_by_key(self, collection_name: str, key: str):
150        """ Remove a document identified by `key` from collection """
151        lookup_filter = self._format_lookup_filter({"_key": key})
152        aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter)
153
154        logger.debug(f"REMOVE query: [{aql}]")
155        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
156        return self._cursor_doc_generator(cursor)
157
158    def get_by_attributes(
159            self,
160            collection_name: str,
161            lookup_key_dict: Dict[str, Any] = None,
162            sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None
163    ):
164        """
165        Gets documents from given collection_name, looking up by the keys and values
166        in lookup_key_dict, sorting by keys and direction
167
168        :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values
169        (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents.  Values specify
170        direction: [ASC, DESC, '']
171        """
172        logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}")
173        lookup_filter = self._format_lookup_filter(lookup_key_dict)
174        sort_by = self._format_sort(sort_key_dict)
175        aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by)
176        logger.debug(f"LOOKUP query: {aql}")
177        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
178        return self._cursor_doc_generator(cursor)
179
180    def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key):
181        """
182          Insert edge document using keys (_from and _to are generated using collection name).
183          Collection inferred from collection_name and association_collection_name.
184          TODO: Add attributes to set on edge
185        """
186        edge_collection_name = f"{collection_name}__{association_collection_name}"
187        doc = {
188            "_from": f"{collection_name}/{from_key}",
189            "_to": f"{association_collection_name}/{to_key}"
190        }
191        return self.insert_doc(edge_collection_name, doc)
192
193    def insert_doc(self, collection_name: str, doc: Dict):
194        """
195          Insert a new doc in collection:
196        """
197        print(f"Inserting into collection {collection_name}: ", doc)
198        new_doc = self.__autogen_keys(collection_name, doc)
199        insert_attrs = self._format_query_attrs(new_doc)
200        aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs)
201        logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}")
202        inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
203        logger.debug(f"inserted_docs: {inserted_docs.count()}")
204        if not inserted_docs.count():
205            raise RuntimeError(f"Error: collection_name document {doc} was not inserted.")
206
207        return inserted_docs.next()
208
209    def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]):
210        """ Insert given documents into collection with a single query"""
211        new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs]
212        new_docs = [self.__format_query_values(doc) for doc in new_docs]
213        cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={
214            '@collection': collection_name,
215            'docs': new_docs
216        })
217
218        return self._cursor_doc_generator(cursor)
219
220    def upsert_doc(
221            self,
222            collection_name: str,
223            doc: Dict,
224            lookup_keys: Sequence[str] = None,
225            insert_dict=None,
226            update_dict=None
227    ):
228        """
229          Upsert a doc in collection using given lookup_keys
230          collection_name: Name of DB collection
231          doc:Dict: Document containing keys and values to insert and update
232          lookup_keys: Sequence of keys to lookup document to update
233          insert_dict: Dictionary of values to insert only
234          update_dict: Dictionary of values to update only
235        """
236        if update_dict is None:
237            update_dict = {}
238        if insert_dict is None:
239            insert_dict = {}
240        lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys}
241
242        new_doc = self.__autogen_keys(collection_name, doc)
243        new_doc.update(insert_dict)
244        update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys}
245        update_doc.update(update_dict)
246
247        # Create key:val pairs (dictionary without braces):
248        key_attrs = self._format_query_attrs(lookup_key_dict)  # str(lookup_key_dict)[1:-1]
249        insert_attrs = self._format_query_attrs(new_doc)  # str(new_doc)[1:-1]
250        update_attrs = self._format_query_attrs(update_doc)  # str(update_attrs)[1:-1]
251
252        logger.debug(f"key_attrs: {key_attrs}")
253        logger.debug(f"insert_attrs: {insert_attrs}")
254        logger.debug(f"update_attrs: {update_attrs}")
255
256        upsert = PAOQueries.AQL_UPSERT_DOC.format(
257            key_attrs=key_attrs,
258            insert_attrs=insert_attrs,
259            update_attrs=update_attrs)
260
261        logger.debug(f"UPSERT QUERY: {upsert}")
262        upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name})
263        logger.debug(f"upserted_docs: {upserted_docs.count()}")
264        if not upserted_docs.count():
265            raise RuntimeError(f"Error: collection_name document {doc} was not upserted.")
266
267        return upserted_docs.next()
268
269    def _format_sort(self, sort_list: Dict[str, str]) -> str:
270        """
271          Format a lookup filter from keys and values in lookup_key_dict
272          Returns string in format "doc.active == true AND doc.gender == 'f'"
273        """
274        result = ''
275        if sort_list and len(sort_list):
276            sorts = []
277            for k, v in sort_list.items():
278                if v not in self.VALID_SORT_VALUES:
279                    raise ValueError(f"Sort value for {k} is should be one of {self.VALID_SORT_VALUES}")
280                sorts.append(f"doc.{k} {v}")
281
282            sort_by = " ,".join(sorts)
283            result = f"SORT {sort_by}" if sort_by else ""
284        return result
285
286    def _format_lookup_filter(self, lookup_key_dict: Dict):
287        """
288          Format a lookup filter from keys and values in lookup_key_dict
289          Returns string in format "doc.active == true AND doc.gender == 'f'"
290        """
291        result = ''
292        if lookup_key_dict and len(lookup_key_dict) > 0:
293            attrs = {f"doc.{k} == {self.__format_query_value(v)}" for (k, v) in lookup_key_dict.items()}
294            filter_by = " AND ".join(attrs)
295            result = f"FILTER {filter_by}" if filter_by else ""
296        return result
297
298    def _format_query_attrs(self, doc: Dict) -> str:
299        """
300          Format keys and values for query, allowing for literals.
301          Returns a string with format:
302            'key1':'val1', 'key2':'val2", 'key3': literal_expression...
303        """
304        attrs = {f"'{k}': {self.__format_query_value(v)}" for (k, v) in doc.items()}
305        return ", ".join(attrs)
306
307    @staticmethod
308    def _cursor_doc_generator(cursor):
309        while True:
310            # batch_items = cursor.batch()
311            # logger.debug(f"batch_items", batch_items)
312            for doc in cursor.batch():
313                yield doc
314            if not cursor.has_more():
315                break
316            cursor.next()
317
318    def __format_query_values(self, doc: Dict[str, Any]):
319        return {k: self.__format_query_value(v) for k, v in doc.items()}
320
321    @staticmethod
322    def __format_query_value(value: Any):
323        """ Format value, allowing for literals """
324        if isinstance(value, str):
325            if value.startswith('`') and value.endswith('`'):
326                # Literal, no quotes:
327                query_expression = value.replace('`', '')
328            else:
329                # Non-literal; quote it:
330                query_expression = f"'{value}'"
331        else:
332            query_expression = value
333
334        return query_expression
335
336    def __autogen_keys(self, collection_name: str, doc: dict):
337        """ Autogenerate key & id using UUID.  Can probably be changed to a DB function.  """
338        new_doc = dict(doc)
339        new_doc["_key"] = self.__new_uuid()
340        new_doc["_id"] = f"{collection_name}/{new_doc['_key']}"
341        return new_doc
342
343    @staticmethod
344    def __new_uuid():
345        """ Return a new UUID  """
346        return str(uuid.uuid1())
class PAODatabase(python_arango_ogm.db.pao_db_base.PAODBBase):
 17class PAODatabase(PAODBBase):
 18    __metaclass__ = Singleton
 19    VALID_SORT_VALUES = ["ASC", "DESC", ""]
 20
 21    def __init__(self, delete_db: bool = False):
 22        # TODO: These probably don't need to be members:
 23        print("CONSTRUCTING DB")
 24        self.app_db_name = os.getenv('PAO_APP_DB_NAME')
 25        host = os.getenv('PAO_DB_HOST', 'localhost')
 26        port = os.getenv('PAO_DB_PORT', 8529)
 27        protocol = os.getenv('PAO_DB_PROTOCOL', 'http')
 28        root_user = os.getenv('PAO_DB_ROOT_USER')
 29        root_password = os.getenv('PAO_DB_ROOT_PASS')
 30
 31        if self.app_db_name is None:
 32            raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.")
 33
 34        if root_user is None:
 35            raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.")
 36
 37        if root_password is None:
 38            raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.')
 39
 40        self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}")
 41
 42        # Connect to "_system" database as root user.
 43        # This returns an API wrapper for "_system" database.
 44        # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...")
 45        self.sys_db = self.client.db('_system', username=root_user, password=root_password)
 46
 47        self.db = None
 48
 49        # Create a new database if it does not exist.
 50        self.setup_app_database(delete_db)
 51
 52    def setup_app_database(self, delete_db):
 53        """
 54        Setup app databases; deleting if specified by delete_db
 55        This is called by constructor, but since this based on a
 56        Singleton metaclass, it might neccesary to call it manually
 57        after migrations have been appplied and whatnot.
 58        """
 59        app_user = os.getenv('PAO_APP_DB_USER', 'root')
 60        app_pass = os.getenv('PAO_APP_DB_PASS')
 61
 62        if app_pass is None:
 63            raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.')
 64
 65        create_db = True
 66        if self.sys_db.has_database(self.app_db_name):
 67            if delete_db:
 68                self.sys_db.delete_database(self.app_db_name, ignore_missing=True)
 69            else:
 70                create_db = False
 71        if create_db:
 72            was_created = self.sys_db.create_database(self.app_db_name, [{
 73                'username': app_user,
 74                'password': app_pass,
 75                'active': True,
 76            }])
 77            if not was_created:
 78                raise ValueError(f"Database {self.app_db_name} was not created.")
 79
 80        self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass)
 81        self.inject_into_models()
 82
 83    def inject_into_models(self):
 84        """ Inject database into models, as PAODatabase is where the functionality is implemented."""
 85        discoverer = PAOModelDiscovery()
 86        model_hash: Dict[str, any] = discoverer.discover()
 87        for m, model in model_hash.items():
 88            logger.debug(f"Injecting DB into model {m}")
 89            model.db = self
 90
 91        # Inject into built-in models:
 92        PAOMigrationModel.db = self
 93
 94    def get_db(self):
 95        """ Return underlying python-arango database"""
 96        return self.db
 97
 98    def find_by_key(self, collection_name: str, key: Any):
 99        """
100          Find document on collection by given key value:
101        """
102        return self.db.collection(collection_name).get(key)
103
104    def get_related_edges(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict):
105        """
106        Gets `association_collection_name` edges of `collection_name`;
107        looking up by the keys and values in `lookup_key_dict`:
108        """
109        lookup_filter = self._format_lookup_filter(lookup_key_dict)
110        aql = PAOQueries.AQL_QUERY_RELATED_EDGES.format(lookup_filter=lookup_filter)
111        edge_collection_name = f"{collection_name}__{association_collection_name}"
112        logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]")
113        cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={
114            '@collection': collection_name,
115            '@edge_collection': edge_collection_name
116        })
117
118        return self._cursor_doc_generator(cursor)
119
120    def get_related_vertices(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict):
121        """
122        Lookup associated vertices (`association_collection_name`) through edges,
123        from given collection_name, using keys and values in lookup_key_dict:
124        """
125        lookup_filter = self._format_lookup_filter(lookup_key_dict)
126        aql = PAOQueries.AQL_QUERY_RELATED_VERTICES.format(lookup_filter=lookup_filter)
127        edge_collection_name = f"{collection_name}__{association_collection_name}"
128        logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]")
129        cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={
130            '@collection': collection_name,
131            '@edge_collection': edge_collection_name,
132            '@association_collection': association_collection_name
133        })
134
135        return self._cursor_doc_generator(cursor)
136
137    def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None):
138        """
139        Find a single document by given collection_name,
140        looking up by the keys and values in lookup_key_dict:
141        """
142        docs = self.get_by_attributes(collection_name, lookup_key_dict)
143        try:
144            result = docs.__next__()
145        except StopIteration:
146            result = None
147
148        return result
149
150    def remove_by_key(self, collection_name: str, key: str):
151        """ Remove a document identified by `key` from collection """
152        lookup_filter = self._format_lookup_filter({"_key": key})
153        aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter)
154
155        logger.debug(f"REMOVE query: [{aql}]")
156        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
157        return self._cursor_doc_generator(cursor)
158
159    def get_by_attributes(
160            self,
161            collection_name: str,
162            lookup_key_dict: Dict[str, Any] = None,
163            sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None
164    ):
165        """
166        Gets documents from given collection_name, looking up by the keys and values
167        in lookup_key_dict, sorting by keys and direction
168
169        :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values
170        (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents.  Values specify
171        direction: [ASC, DESC, '']
172        """
173        logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}")
174        lookup_filter = self._format_lookup_filter(lookup_key_dict)
175        sort_by = self._format_sort(sort_key_dict)
176        aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by)
177        logger.debug(f"LOOKUP query: {aql}")
178        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
179        return self._cursor_doc_generator(cursor)
180
181    def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key):
182        """
183          Insert edge document using keys (_from and _to are generated using collection name).
184          Collection inferred from collection_name and association_collection_name.
185          TODO: Add attributes to set on edge
186        """
187        edge_collection_name = f"{collection_name}__{association_collection_name}"
188        doc = {
189            "_from": f"{collection_name}/{from_key}",
190            "_to": f"{association_collection_name}/{to_key}"
191        }
192        return self.insert_doc(edge_collection_name, doc)
193
194    def insert_doc(self, collection_name: str, doc: Dict):
195        """
196          Insert a new doc in collection:
197        """
198        print(f"Inserting into collection {collection_name}: ", doc)
199        new_doc = self.__autogen_keys(collection_name, doc)
200        insert_attrs = self._format_query_attrs(new_doc)
201        aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs)
202        logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}")
203        inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
204        logger.debug(f"inserted_docs: {inserted_docs.count()}")
205        if not inserted_docs.count():
206            raise RuntimeError(f"Error: collection_name document {doc} was not inserted.")
207
208        return inserted_docs.next()
209
210    def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]):
211        """ Insert given documents into collection with a single query"""
212        new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs]
213        new_docs = [self.__format_query_values(doc) for doc in new_docs]
214        cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={
215            '@collection': collection_name,
216            'docs': new_docs
217        })
218
219        return self._cursor_doc_generator(cursor)
220
221    def upsert_doc(
222            self,
223            collection_name: str,
224            doc: Dict,
225            lookup_keys: Sequence[str] = None,
226            insert_dict=None,
227            update_dict=None
228    ):
229        """
230          Upsert a doc in collection using given lookup_keys
231          collection_name: Name of DB collection
232          doc:Dict: Document containing keys and values to insert and update
233          lookup_keys: Sequence of keys to lookup document to update
234          insert_dict: Dictionary of values to insert only
235          update_dict: Dictionary of values to update only
236        """
237        if update_dict is None:
238            update_dict = {}
239        if insert_dict is None:
240            insert_dict = {}
241        lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys}
242
243        new_doc = self.__autogen_keys(collection_name, doc)
244        new_doc.update(insert_dict)
245        update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys}
246        update_doc.update(update_dict)
247
248        # Create key:val pairs (dictionary without braces):
249        key_attrs = self._format_query_attrs(lookup_key_dict)  # str(lookup_key_dict)[1:-1]
250        insert_attrs = self._format_query_attrs(new_doc)  # str(new_doc)[1:-1]
251        update_attrs = self._format_query_attrs(update_doc)  # str(update_attrs)[1:-1]
252
253        logger.debug(f"key_attrs: {key_attrs}")
254        logger.debug(f"insert_attrs: {insert_attrs}")
255        logger.debug(f"update_attrs: {update_attrs}")
256
257        upsert = PAOQueries.AQL_UPSERT_DOC.format(
258            key_attrs=key_attrs,
259            insert_attrs=insert_attrs,
260            update_attrs=update_attrs)
261
262        logger.debug(f"UPSERT QUERY: {upsert}")
263        upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name})
264        logger.debug(f"upserted_docs: {upserted_docs.count()}")
265        if not upserted_docs.count():
266            raise RuntimeError(f"Error: collection_name document {doc} was not upserted.")
267
268        return upserted_docs.next()
269
270    def _format_sort(self, sort_list: Dict[str, str]) -> str:
271        """
272          Format a lookup filter from keys and values in lookup_key_dict
273          Returns string in format "doc.active == true AND doc.gender == 'f'"
274        """
275        result = ''
276        if sort_list and len(sort_list):
277            sorts = []
278            for k, v in sort_list.items():
279                if v not in self.VALID_SORT_VALUES:
280                    raise ValueError(f"Sort value for {k} is should be one of {self.VALID_SORT_VALUES}")
281                sorts.append(f"doc.{k} {v}")
282
283            sort_by = " ,".join(sorts)
284            result = f"SORT {sort_by}" if sort_by else ""
285        return result
286
287    def _format_lookup_filter(self, lookup_key_dict: Dict):
288        """
289          Format a lookup filter from keys and values in lookup_key_dict
290          Returns string in format "doc.active == true AND doc.gender == 'f'"
291        """
292        result = ''
293        if lookup_key_dict and len(lookup_key_dict) > 0:
294            attrs = {f"doc.{k} == {self.__format_query_value(v)}" for (k, v) in lookup_key_dict.items()}
295            filter_by = " AND ".join(attrs)
296            result = f"FILTER {filter_by}" if filter_by else ""
297        return result
298
299    def _format_query_attrs(self, doc: Dict) -> str:
300        """
301          Format keys and values for query, allowing for literals.
302          Returns a string with format:
303            'key1':'val1', 'key2':'val2", 'key3': literal_expression...
304        """
305        attrs = {f"'{k}': {self.__format_query_value(v)}" for (k, v) in doc.items()}
306        return ", ".join(attrs)
307
308    @staticmethod
309    def _cursor_doc_generator(cursor):
310        while True:
311            # batch_items = cursor.batch()
312            # logger.debug(f"batch_items", batch_items)
313            for doc in cursor.batch():
314                yield doc
315            if not cursor.has_more():
316                break
317            cursor.next()
318
319    def __format_query_values(self, doc: Dict[str, Any]):
320        return {k: self.__format_query_value(v) for k, v in doc.items()}
321
322    @staticmethod
323    def __format_query_value(value: Any):
324        """ Format value, allowing for literals """
325        if isinstance(value, str):
326            if value.startswith('`') and value.endswith('`'):
327                # Literal, no quotes:
328                query_expression = value.replace('`', '')
329            else:
330                # Non-literal; quote it:
331                query_expression = f"'{value}'"
332        else:
333            query_expression = value
334
335        return query_expression
336
337    def __autogen_keys(self, collection_name: str, doc: dict):
338        """ Autogenerate key & id using UUID.  Can probably be changed to a DB function.  """
339        new_doc = dict(doc)
340        new_doc["_key"] = self.__new_uuid()
341        new_doc["_id"] = f"{collection_name}/{new_doc['_key']}"
342        return new_doc
343
344    @staticmethod
345    def __new_uuid():
346        """ Return a new UUID  """
347        return str(uuid.uuid1())

Helper class that provides a standard way to create an ABC using inheritance.

PAODatabase(delete_db: bool = False)
21    def __init__(self, delete_db: bool = False):
22        # TODO: These probably don't need to be members:
23        print("CONSTRUCTING DB")
24        self.app_db_name = os.getenv('PAO_APP_DB_NAME')
25        host = os.getenv('PAO_DB_HOST', 'localhost')
26        port = os.getenv('PAO_DB_PORT', 8529)
27        protocol = os.getenv('PAO_DB_PROTOCOL', 'http')
28        root_user = os.getenv('PAO_DB_ROOT_USER')
29        root_password = os.getenv('PAO_DB_ROOT_PASS')
30
31        if self.app_db_name is None:
32            raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.")
33
34        if root_user is None:
35            raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.")
36
37        if root_password is None:
38            raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.')
39
40        self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}")
41
42        # Connect to "_system" database as root user.
43        # This returns an API wrapper for "_system" database.
44        # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...")
45        self.sys_db = self.client.db('_system', username=root_user, password=root_password)
46
47        self.db = None
48
49        # Create a new database if it does not exist.
50        self.setup_app_database(delete_db)
VALID_SORT_VALUES = ['ASC', 'DESC', '']
app_db_name
client
sys_db
db
def setup_app_database(self, delete_db):
52    def setup_app_database(self, delete_db):
53        """
54        Setup app databases; deleting if specified by delete_db
55        This is called by constructor, but since this based on a
56        Singleton metaclass, it might neccesary to call it manually
57        after migrations have been appplied and whatnot.
58        """
59        app_user = os.getenv('PAO_APP_DB_USER', 'root')
60        app_pass = os.getenv('PAO_APP_DB_PASS')
61
62        if app_pass is None:
63            raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.')
64
65        create_db = True
66        if self.sys_db.has_database(self.app_db_name):
67            if delete_db:
68                self.sys_db.delete_database(self.app_db_name, ignore_missing=True)
69            else:
70                create_db = False
71        if create_db:
72            was_created = self.sys_db.create_database(self.app_db_name, [{
73                'username': app_user,
74                'password': app_pass,
75                'active': True,
76            }])
77            if not was_created:
78                raise ValueError(f"Database {self.app_db_name} was not created.")
79
80        self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass)
81        self.inject_into_models()

Setup app databases; deleting if specified by delete_db This is called by constructor, but since this based on a Singleton metaclass, it might neccesary to call it manually after migrations have been appplied and whatnot.

def inject_into_models(self):
83    def inject_into_models(self):
84        """ Inject database into models, as PAODatabase is where the functionality is implemented."""
85        discoverer = PAOModelDiscovery()
86        model_hash: Dict[str, any] = discoverer.discover()
87        for m, model in model_hash.items():
88            logger.debug(f"Injecting DB into model {m}")
89            model.db = self
90
91        # Inject into built-in models:
92        PAOMigrationModel.db = self

Inject database into models, as PAODatabase is where the functionality is implemented.

def get_db(self):
94    def get_db(self):
95        """ Return underlying python-arango database"""
96        return self.db

Return underlying python-arango database

def find_by_key(self, collection_name: str, key: Any):
 98    def find_by_key(self, collection_name: str, key: Any):
 99        """
100          Find document on collection by given key value:
101        """
102        return self.db.collection(collection_name).get(key)

Find document on collection by given key value:

def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None):
137    def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None):
138        """
139        Find a single document by given collection_name,
140        looking up by the keys and values in lookup_key_dict:
141        """
142        docs = self.get_by_attributes(collection_name, lookup_key_dict)
143        try:
144            result = docs.__next__()
145        except StopIteration:
146            result = None
147
148        return result

Find a single document by given collection_name, looking up by the keys and values in lookup_key_dict:

def remove_by_key(self, collection_name: str, key: str):
150    def remove_by_key(self, collection_name: str, key: str):
151        """ Remove a document identified by `key` from collection """
152        lookup_filter = self._format_lookup_filter({"_key": key})
153        aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter)
154
155        logger.debug(f"REMOVE query: [{aql}]")
156        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
157        return self._cursor_doc_generator(cursor)

Remove a document identified by key from collection

def get_by_attributes( self, collection_name: str, lookup_key_dict: Dict[str, Any] = None, sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None):
159    def get_by_attributes(
160            self,
161            collection_name: str,
162            lookup_key_dict: Dict[str, Any] = None,
163            sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None
164    ):
165        """
166        Gets documents from given collection_name, looking up by the keys and values
167        in lookup_key_dict, sorting by keys and direction
168
169        :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values
170        (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents.  Values specify
171        direction: [ASC, DESC, '']
172        """
173        logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}")
174        lookup_filter = self._format_lookup_filter(lookup_key_dict)
175        sort_by = self._format_sort(sort_key_dict)
176        aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by)
177        logger.debug(f"LOOKUP query: {aql}")
178        cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
179        return self._cursor_doc_generator(cursor)

Gets documents from given collection_name, looking up by the keys and values in lookup_key_dict, sorting by keys and direction

Parameters
  • lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents. Values specify direction: [ASC, DESC, '']
def insert_edge( self, collection_name: str, association_collection_name: str, from_key, to_key):
181    def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key):
182        """
183          Insert edge document using keys (_from and _to are generated using collection name).
184          Collection inferred from collection_name and association_collection_name.
185          TODO: Add attributes to set on edge
186        """
187        edge_collection_name = f"{collection_name}__{association_collection_name}"
188        doc = {
189            "_from": f"{collection_name}/{from_key}",
190            "_to": f"{association_collection_name}/{to_key}"
191        }
192        return self.insert_doc(edge_collection_name, doc)

Insert edge document using keys (_from and _to are generated using collection name). Collection inferred from collection_name and association_collection_name. TODO: Add attributes to set on edge

def insert_doc(self, collection_name: str, doc: Dict):
194    def insert_doc(self, collection_name: str, doc: Dict):
195        """
196          Insert a new doc in collection:
197        """
198        print(f"Inserting into collection {collection_name}: ", doc)
199        new_doc = self.__autogen_keys(collection_name, doc)
200        insert_attrs = self._format_query_attrs(new_doc)
201        aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs)
202        logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}")
203        inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name})
204        logger.debug(f"inserted_docs: {inserted_docs.count()}")
205        if not inserted_docs.count():
206            raise RuntimeError(f"Error: collection_name document {doc} was not inserted.")
207
208        return inserted_docs.next()

Insert a new doc in collection:

def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]):
210    def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]):
211        """ Insert given documents into collection with a single query"""
212        new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs]
213        new_docs = [self.__format_query_values(doc) for doc in new_docs]
214        cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={
215            '@collection': collection_name,
216            'docs': new_docs
217        })
218
219        return self._cursor_doc_generator(cursor)

Insert given documents into collection with a single query

def upsert_doc( self, collection_name: str, doc: Dict, lookup_keys: Sequence[str] = None, insert_dict=None, update_dict=None):
221    def upsert_doc(
222            self,
223            collection_name: str,
224            doc: Dict,
225            lookup_keys: Sequence[str] = None,
226            insert_dict=None,
227            update_dict=None
228    ):
229        """
230          Upsert a doc in collection using given lookup_keys
231          collection_name: Name of DB collection
232          doc:Dict: Document containing keys and values to insert and update
233          lookup_keys: Sequence of keys to lookup document to update
234          insert_dict: Dictionary of values to insert only
235          update_dict: Dictionary of values to update only
236        """
237        if update_dict is None:
238            update_dict = {}
239        if insert_dict is None:
240            insert_dict = {}
241        lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys}
242
243        new_doc = self.__autogen_keys(collection_name, doc)
244        new_doc.update(insert_dict)
245        update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys}
246        update_doc.update(update_dict)
247
248        # Create key:val pairs (dictionary without braces):
249        key_attrs = self._format_query_attrs(lookup_key_dict)  # str(lookup_key_dict)[1:-1]
250        insert_attrs = self._format_query_attrs(new_doc)  # str(new_doc)[1:-1]
251        update_attrs = self._format_query_attrs(update_doc)  # str(update_attrs)[1:-1]
252
253        logger.debug(f"key_attrs: {key_attrs}")
254        logger.debug(f"insert_attrs: {insert_attrs}")
255        logger.debug(f"update_attrs: {update_attrs}")
256
257        upsert = PAOQueries.AQL_UPSERT_DOC.format(
258            key_attrs=key_attrs,
259            insert_attrs=insert_attrs,
260            update_attrs=update_attrs)
261
262        logger.debug(f"UPSERT QUERY: {upsert}")
263        upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name})
264        logger.debug(f"upserted_docs: {upserted_docs.count()}")
265        if not upserted_docs.count():
266            raise RuntimeError(f"Error: collection_name document {doc} was not upserted.")
267
268        return upserted_docs.next()

Upsert a doc in collection using given lookup_keys collection_name: Name of DB collection doc:Dict: Document containing keys and values to insert and update lookup_keys: Sequence of keys to lookup document to update insert_dict: Dictionary of values to insert only update_dict: Dictionary of values to update only