python_arango_ogm.db.pao_database
1import os 2from typing import Any, Dict, Sequence, Literal 3import uuid 4 5from arango import ArangoClient 6from loguru import logger 7 8from python_arango_ogm.db.pao_db_base import PAODBBase 9from python_arango_ogm.db.pao_model_discovery import PAOModelDiscovery 10from python_arango_ogm.db.pao_queries import PAOQueries 11from python_arango_ogm.db.pao_migration_model import PAOMigrationModel 12 13from python_arango_ogm.utils.singleton import Singleton 14 15 16class PAODatabase(PAODBBase): 17 __metaclass__ = Singleton 18 VALID_SORT_VALUES = ["ASC", "DESC", ""] 19 20 def __init__(self, delete_db: bool = False): 21 # TODO: These probably don't need to be members: 22 print("CONSTRUCTING DB") 23 self.app_db_name = os.getenv('PAO_APP_DB_NAME') 24 host = os.getenv('PAO_DB_HOST', 'localhost') 25 port = os.getenv('PAO_DB_PORT', 8529) 26 protocol = os.getenv('PAO_DB_PROTOCOL', 'http') 27 root_user = os.getenv('PAO_DB_ROOT_USER') 28 root_password = os.getenv('PAO_DB_ROOT_PASS') 29 30 if self.app_db_name is None: 31 raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.") 32 33 if root_user is None: 34 raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.") 35 36 if root_password is None: 37 raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.') 38 39 self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}") 40 41 # Connect to "_system" database as root user. 42 # This returns an API wrapper for "_system" database. 43 # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...") 44 self.sys_db = self.client.db('_system', username=root_user, password=root_password) 45 46 self.db = None 47 48 # Create a new database if it does not exist. 49 self.setup_app_database(delete_db) 50 51 def setup_app_database(self, delete_db): 52 """ 53 Setup app databases; deleting if specified by delete_db 54 This is called by constructor, but since this based on a 55 Singleton metaclass, it might neccesary to call it manually 56 after migrations have been appplied and whatnot. 57 """ 58 app_user = os.getenv('PAO_APP_DB_USER', 'root') 59 app_pass = os.getenv('PAO_APP_DB_PASS') 60 61 if app_pass is None: 62 raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.') 63 64 create_db = True 65 if self.sys_db.has_database(self.app_db_name): 66 if delete_db: 67 self.sys_db.delete_database(self.app_db_name, ignore_missing=True) 68 else: 69 create_db = False 70 if create_db: 71 was_created = self.sys_db.create_database(self.app_db_name, [{ 72 'username': app_user, 73 'password': app_pass, 74 'active': True, 75 }]) 76 if not was_created: 77 raise ValueError(f"Database {self.app_db_name} was not created.") 78 79 self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass) 80 self.inject_into_models() 81 82 def inject_into_models(self): 83 """ Inject database into models, as PAODatabase is where the functionality is implemented.""" 84 discoverer = PAOModelDiscovery() 85 model_hash: Dict[str, any] = discoverer.discover() 86 for m, model in model_hash.items(): 87 logger.debug(f"Injecting DB into model {m}") 88 model.db = self 89 90 # Inject into built-in models: 91 PAOMigrationModel.db = self 92 93 def get_db(self): 94 """ Return underlying python-arango database""" 95 return self.db 96 97 def find_by_key(self, collection_name: str, key: Any): 98 """ 99 Find document on collection by given key value: 100 """ 101 return self.db.collection(collection_name).get(key) 102 103 def get_related_edges(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict): 104 """ 105 Gets `association_collection_name` edges of `collection_name`; 106 looking up by the keys and values in `lookup_key_dict`: 107 """ 108 lookup_filter = self._format_lookup_filter(lookup_key_dict) 109 aql = PAOQueries.AQL_QUERY_RELATED_EDGES.format(lookup_filter=lookup_filter) 110 edge_collection_name = f"{collection_name}__{association_collection_name}" 111 logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]") 112 cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={ 113 '@collection': collection_name, 114 '@edge_collection': edge_collection_name 115 }) 116 117 return self._cursor_doc_generator(cursor) 118 119 def get_related_vertices(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict): 120 """ 121 Lookup associated vertices (`association_collection_name`) through edges, 122 from given collection_name, using keys and values in lookup_key_dict: 123 """ 124 lookup_filter = self._format_lookup_filter(lookup_key_dict) 125 aql = PAOQueries.AQL_QUERY_RELATED_VERTICES.format(lookup_filter=lookup_filter) 126 edge_collection_name = f"{collection_name}__{association_collection_name}" 127 logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]") 128 cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={ 129 '@collection': collection_name, 130 '@edge_collection': edge_collection_name, 131 '@association_collection': association_collection_name 132 }) 133 134 return self._cursor_doc_generator(cursor) 135 136 def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None): 137 """ 138 Find a single document by given collection_name, 139 looking up by the keys and values in lookup_key_dict: 140 """ 141 docs = self.get_by_attributes(collection_name, lookup_key_dict) 142 try: 143 result = docs.__next__() 144 except StopIteration: 145 result = None 146 147 return result 148 149 def remove_by_key(self, collection_name: str, key: str): 150 """ Remove a document identified by `key` from collection """ 151 lookup_filter = self._format_lookup_filter({"_key": key}) 152 aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter) 153 154 logger.debug(f"REMOVE query: [{aql}]") 155 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 156 return self._cursor_doc_generator(cursor) 157 158 def get_by_attributes( 159 self, 160 collection_name: str, 161 lookup_key_dict: Dict[str, Any] = None, 162 sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None 163 ): 164 """ 165 Gets documents from given collection_name, looking up by the keys and values 166 in lookup_key_dict, sorting by keys and direction 167 168 :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values 169 (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents. Values specify 170 direction: [ASC, DESC, ''] 171 """ 172 logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}") 173 lookup_filter = self._format_lookup_filter(lookup_key_dict) 174 sort_by = self._format_sort(sort_key_dict) 175 aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by) 176 logger.debug(f"LOOKUP query: {aql}") 177 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 178 return self._cursor_doc_generator(cursor) 179 180 def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key): 181 """ 182 Insert edge document using keys (_from and _to are generated using collection name). 183 Collection inferred from collection_name and association_collection_name. 184 TODO: Add attributes to set on edge 185 """ 186 edge_collection_name = f"{collection_name}__{association_collection_name}" 187 doc = { 188 "_from": f"{collection_name}/{from_key}", 189 "_to": f"{association_collection_name}/{to_key}" 190 } 191 return self.insert_doc(edge_collection_name, doc) 192 193 def insert_doc(self, collection_name: str, doc: Dict): 194 """ 195 Insert a new doc in collection: 196 """ 197 print(f"Inserting into collection {collection_name}: ", doc) 198 new_doc = self.__autogen_keys(collection_name, doc) 199 insert_attrs = self._format_query_attrs(new_doc) 200 aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs) 201 logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}") 202 inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 203 logger.debug(f"inserted_docs: {inserted_docs.count()}") 204 if not inserted_docs.count(): 205 raise RuntimeError(f"Error: collection_name document {doc} was not inserted.") 206 207 return inserted_docs.next() 208 209 def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]): 210 """ Insert given documents into collection with a single query""" 211 new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs] 212 new_docs = [self.__format_query_values(doc) for doc in new_docs] 213 cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={ 214 '@collection': collection_name, 215 'docs': new_docs 216 }) 217 218 return self._cursor_doc_generator(cursor) 219 220 def upsert_doc( 221 self, 222 collection_name: str, 223 doc: Dict, 224 lookup_keys: Sequence[str] = None, 225 insert_dict=None, 226 update_dict=None 227 ): 228 """ 229 Upsert a doc in collection using given lookup_keys 230 collection_name: Name of DB collection 231 doc:Dict: Document containing keys and values to insert and update 232 lookup_keys: Sequence of keys to lookup document to update 233 insert_dict: Dictionary of values to insert only 234 update_dict: Dictionary of values to update only 235 """ 236 if update_dict is None: 237 update_dict = {} 238 if insert_dict is None: 239 insert_dict = {} 240 lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys} 241 242 new_doc = self.__autogen_keys(collection_name, doc) 243 new_doc.update(insert_dict) 244 update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys} 245 update_doc.update(update_dict) 246 247 # Create key:val pairs (dictionary without braces): 248 key_attrs = self._format_query_attrs(lookup_key_dict) # str(lookup_key_dict)[1:-1] 249 insert_attrs = self._format_query_attrs(new_doc) # str(new_doc)[1:-1] 250 update_attrs = self._format_query_attrs(update_doc) # str(update_attrs)[1:-1] 251 252 logger.debug(f"key_attrs: {key_attrs}") 253 logger.debug(f"insert_attrs: {insert_attrs}") 254 logger.debug(f"update_attrs: {update_attrs}") 255 256 upsert = PAOQueries.AQL_UPSERT_DOC.format( 257 key_attrs=key_attrs, 258 insert_attrs=insert_attrs, 259 update_attrs=update_attrs) 260 261 logger.debug(f"UPSERT QUERY: {upsert}") 262 upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name}) 263 logger.debug(f"upserted_docs: {upserted_docs.count()}") 264 if not upserted_docs.count(): 265 raise RuntimeError(f"Error: collection_name document {doc} was not upserted.") 266 267 return upserted_docs.next() 268 269 def _format_sort(self, sort_list: Dict[str, str]) -> str: 270 """ 271 Format a lookup filter from keys and values in lookup_key_dict 272 Returns string in format "doc.active == true AND doc.gender == 'f'" 273 """ 274 result = '' 275 if sort_list and len(sort_list): 276 sorts = [] 277 for k, v in sort_list.items(): 278 if v not in self.VALID_SORT_VALUES: 279 raise ValueError(f"Sort value for {k} is should be one of {self.VALID_SORT_VALUES}") 280 sorts.append(f"doc.{k} {v}") 281 282 sort_by = " ,".join(sorts) 283 result = f"SORT {sort_by}" if sort_by else "" 284 return result 285 286 def _format_lookup_filter(self, lookup_key_dict: Dict): 287 """ 288 Format a lookup filter from keys and values in lookup_key_dict 289 Returns string in format "doc.active == true AND doc.gender == 'f'" 290 """ 291 result = '' 292 if lookup_key_dict and len(lookup_key_dict) > 0: 293 attrs = {f"doc.{k} == {self.__format_query_value(v)}" for (k, v) in lookup_key_dict.items()} 294 filter_by = " AND ".join(attrs) 295 result = f"FILTER {filter_by}" if filter_by else "" 296 return result 297 298 def _format_query_attrs(self, doc: Dict) -> str: 299 """ 300 Format keys and values for query, allowing for literals. 301 Returns a string with format: 302 'key1':'val1', 'key2':'val2", 'key3': literal_expression... 303 """ 304 attrs = {f"'{k}': {self.__format_query_value(v)}" for (k, v) in doc.items()} 305 return ", ".join(attrs) 306 307 @staticmethod 308 def _cursor_doc_generator(cursor): 309 while True: 310 # batch_items = cursor.batch() 311 # logger.debug(f"batch_items", batch_items) 312 for doc in cursor.batch(): 313 yield doc 314 if not cursor.has_more(): 315 break 316 cursor.next() 317 318 def __format_query_values(self, doc: Dict[str, Any]): 319 return {k: self.__format_query_value(v) for k, v in doc.items()} 320 321 @staticmethod 322 def __format_query_value(value: Any): 323 """ Format value, allowing for literals """ 324 if isinstance(value, str): 325 if value.startswith('`') and value.endswith('`'): 326 # Literal, no quotes: 327 query_expression = value.replace('`', '') 328 else: 329 # Non-literal; quote it: 330 query_expression = f"'{value}'" 331 else: 332 query_expression = value 333 334 return query_expression 335 336 def __autogen_keys(self, collection_name: str, doc: dict): 337 """ Autogenerate key & id using UUID. Can probably be changed to a DB function. """ 338 new_doc = dict(doc) 339 new_doc["_key"] = self.__new_uuid() 340 new_doc["_id"] = f"{collection_name}/{new_doc['_key']}" 341 return new_doc 342 343 @staticmethod 344 def __new_uuid(): 345 """ Return a new UUID """ 346 return str(uuid.uuid1())
17class PAODatabase(PAODBBase): 18 __metaclass__ = Singleton 19 VALID_SORT_VALUES = ["ASC", "DESC", ""] 20 21 def __init__(self, delete_db: bool = False): 22 # TODO: These probably don't need to be members: 23 print("CONSTRUCTING DB") 24 self.app_db_name = os.getenv('PAO_APP_DB_NAME') 25 host = os.getenv('PAO_DB_HOST', 'localhost') 26 port = os.getenv('PAO_DB_PORT', 8529) 27 protocol = os.getenv('PAO_DB_PROTOCOL', 'http') 28 root_user = os.getenv('PAO_DB_ROOT_USER') 29 root_password = os.getenv('PAO_DB_ROOT_PASS') 30 31 if self.app_db_name is None: 32 raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.") 33 34 if root_user is None: 35 raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.") 36 37 if root_password is None: 38 raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.') 39 40 self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}") 41 42 # Connect to "_system" database as root user. 43 # This returns an API wrapper for "_system" database. 44 # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...") 45 self.sys_db = self.client.db('_system', username=root_user, password=root_password) 46 47 self.db = None 48 49 # Create a new database if it does not exist. 50 self.setup_app_database(delete_db) 51 52 def setup_app_database(self, delete_db): 53 """ 54 Setup app databases; deleting if specified by delete_db 55 This is called by constructor, but since this based on a 56 Singleton metaclass, it might neccesary to call it manually 57 after migrations have been appplied and whatnot. 58 """ 59 app_user = os.getenv('PAO_APP_DB_USER', 'root') 60 app_pass = os.getenv('PAO_APP_DB_PASS') 61 62 if app_pass is None: 63 raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.') 64 65 create_db = True 66 if self.sys_db.has_database(self.app_db_name): 67 if delete_db: 68 self.sys_db.delete_database(self.app_db_name, ignore_missing=True) 69 else: 70 create_db = False 71 if create_db: 72 was_created = self.sys_db.create_database(self.app_db_name, [{ 73 'username': app_user, 74 'password': app_pass, 75 'active': True, 76 }]) 77 if not was_created: 78 raise ValueError(f"Database {self.app_db_name} was not created.") 79 80 self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass) 81 self.inject_into_models() 82 83 def inject_into_models(self): 84 """ Inject database into models, as PAODatabase is where the functionality is implemented.""" 85 discoverer = PAOModelDiscovery() 86 model_hash: Dict[str, any] = discoverer.discover() 87 for m, model in model_hash.items(): 88 logger.debug(f"Injecting DB into model {m}") 89 model.db = self 90 91 # Inject into built-in models: 92 PAOMigrationModel.db = self 93 94 def get_db(self): 95 """ Return underlying python-arango database""" 96 return self.db 97 98 def find_by_key(self, collection_name: str, key: Any): 99 """ 100 Find document on collection by given key value: 101 """ 102 return self.db.collection(collection_name).get(key) 103 104 def get_related_edges(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict): 105 """ 106 Gets `association_collection_name` edges of `collection_name`; 107 looking up by the keys and values in `lookup_key_dict`: 108 """ 109 lookup_filter = self._format_lookup_filter(lookup_key_dict) 110 aql = PAOQueries.AQL_QUERY_RELATED_EDGES.format(lookup_filter=lookup_filter) 111 edge_collection_name = f"{collection_name}__{association_collection_name}" 112 logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]") 113 cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={ 114 '@collection': collection_name, 115 '@edge_collection': edge_collection_name 116 }) 117 118 return self._cursor_doc_generator(cursor) 119 120 def get_related_vertices(self, collection_name: str, association_collection_name: str, lookup_key_dict: Dict): 121 """ 122 Lookup associated vertices (`association_collection_name`) through edges, 123 from given collection_name, using keys and values in lookup_key_dict: 124 """ 125 lookup_filter = self._format_lookup_filter(lookup_key_dict) 126 aql = PAOQueries.AQL_QUERY_RELATED_VERTICES.format(lookup_filter=lookup_filter) 127 edge_collection_name = f"{collection_name}__{association_collection_name}" 128 logger.debug(f"Association query on [{collection_name}]->[{edge_collection_name}] [aql]") 129 cursor = self.db.aql.execute(aql, count=True, batch_size=10, bind_vars={ 130 '@collection': collection_name, 131 '@edge_collection': edge_collection_name, 132 '@association_collection': association_collection_name 133 }) 134 135 return self._cursor_doc_generator(cursor) 136 137 def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None): 138 """ 139 Find a single document by given collection_name, 140 looking up by the keys and values in lookup_key_dict: 141 """ 142 docs = self.get_by_attributes(collection_name, lookup_key_dict) 143 try: 144 result = docs.__next__() 145 except StopIteration: 146 result = None 147 148 return result 149 150 def remove_by_key(self, collection_name: str, key: str): 151 """ Remove a document identified by `key` from collection """ 152 lookup_filter = self._format_lookup_filter({"_key": key}) 153 aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter) 154 155 logger.debug(f"REMOVE query: [{aql}]") 156 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 157 return self._cursor_doc_generator(cursor) 158 159 def get_by_attributes( 160 self, 161 collection_name: str, 162 lookup_key_dict: Dict[str, Any] = None, 163 sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None 164 ): 165 """ 166 Gets documents from given collection_name, looking up by the keys and values 167 in lookup_key_dict, sorting by keys and direction 168 169 :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values 170 (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents. Values specify 171 direction: [ASC, DESC, ''] 172 """ 173 logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}") 174 lookup_filter = self._format_lookup_filter(lookup_key_dict) 175 sort_by = self._format_sort(sort_key_dict) 176 aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by) 177 logger.debug(f"LOOKUP query: {aql}") 178 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 179 return self._cursor_doc_generator(cursor) 180 181 def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key): 182 """ 183 Insert edge document using keys (_from and _to are generated using collection name). 184 Collection inferred from collection_name and association_collection_name. 185 TODO: Add attributes to set on edge 186 """ 187 edge_collection_name = f"{collection_name}__{association_collection_name}" 188 doc = { 189 "_from": f"{collection_name}/{from_key}", 190 "_to": f"{association_collection_name}/{to_key}" 191 } 192 return self.insert_doc(edge_collection_name, doc) 193 194 def insert_doc(self, collection_name: str, doc: Dict): 195 """ 196 Insert a new doc in collection: 197 """ 198 print(f"Inserting into collection {collection_name}: ", doc) 199 new_doc = self.__autogen_keys(collection_name, doc) 200 insert_attrs = self._format_query_attrs(new_doc) 201 aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs) 202 logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}") 203 inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 204 logger.debug(f"inserted_docs: {inserted_docs.count()}") 205 if not inserted_docs.count(): 206 raise RuntimeError(f"Error: collection_name document {doc} was not inserted.") 207 208 return inserted_docs.next() 209 210 def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]): 211 """ Insert given documents into collection with a single query""" 212 new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs] 213 new_docs = [self.__format_query_values(doc) for doc in new_docs] 214 cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={ 215 '@collection': collection_name, 216 'docs': new_docs 217 }) 218 219 return self._cursor_doc_generator(cursor) 220 221 def upsert_doc( 222 self, 223 collection_name: str, 224 doc: Dict, 225 lookup_keys: Sequence[str] = None, 226 insert_dict=None, 227 update_dict=None 228 ): 229 """ 230 Upsert a doc in collection using given lookup_keys 231 collection_name: Name of DB collection 232 doc:Dict: Document containing keys and values to insert and update 233 lookup_keys: Sequence of keys to lookup document to update 234 insert_dict: Dictionary of values to insert only 235 update_dict: Dictionary of values to update only 236 """ 237 if update_dict is None: 238 update_dict = {} 239 if insert_dict is None: 240 insert_dict = {} 241 lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys} 242 243 new_doc = self.__autogen_keys(collection_name, doc) 244 new_doc.update(insert_dict) 245 update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys} 246 update_doc.update(update_dict) 247 248 # Create key:val pairs (dictionary without braces): 249 key_attrs = self._format_query_attrs(lookup_key_dict) # str(lookup_key_dict)[1:-1] 250 insert_attrs = self._format_query_attrs(new_doc) # str(new_doc)[1:-1] 251 update_attrs = self._format_query_attrs(update_doc) # str(update_attrs)[1:-1] 252 253 logger.debug(f"key_attrs: {key_attrs}") 254 logger.debug(f"insert_attrs: {insert_attrs}") 255 logger.debug(f"update_attrs: {update_attrs}") 256 257 upsert = PAOQueries.AQL_UPSERT_DOC.format( 258 key_attrs=key_attrs, 259 insert_attrs=insert_attrs, 260 update_attrs=update_attrs) 261 262 logger.debug(f"UPSERT QUERY: {upsert}") 263 upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name}) 264 logger.debug(f"upserted_docs: {upserted_docs.count()}") 265 if not upserted_docs.count(): 266 raise RuntimeError(f"Error: collection_name document {doc} was not upserted.") 267 268 return upserted_docs.next() 269 270 def _format_sort(self, sort_list: Dict[str, str]) -> str: 271 """ 272 Format a lookup filter from keys and values in lookup_key_dict 273 Returns string in format "doc.active == true AND doc.gender == 'f'" 274 """ 275 result = '' 276 if sort_list and len(sort_list): 277 sorts = [] 278 for k, v in sort_list.items(): 279 if v not in self.VALID_SORT_VALUES: 280 raise ValueError(f"Sort value for {k} is should be one of {self.VALID_SORT_VALUES}") 281 sorts.append(f"doc.{k} {v}") 282 283 sort_by = " ,".join(sorts) 284 result = f"SORT {sort_by}" if sort_by else "" 285 return result 286 287 def _format_lookup_filter(self, lookup_key_dict: Dict): 288 """ 289 Format a lookup filter from keys and values in lookup_key_dict 290 Returns string in format "doc.active == true AND doc.gender == 'f'" 291 """ 292 result = '' 293 if lookup_key_dict and len(lookup_key_dict) > 0: 294 attrs = {f"doc.{k} == {self.__format_query_value(v)}" for (k, v) in lookup_key_dict.items()} 295 filter_by = " AND ".join(attrs) 296 result = f"FILTER {filter_by}" if filter_by else "" 297 return result 298 299 def _format_query_attrs(self, doc: Dict) -> str: 300 """ 301 Format keys and values for query, allowing for literals. 302 Returns a string with format: 303 'key1':'val1', 'key2':'val2", 'key3': literal_expression... 304 """ 305 attrs = {f"'{k}': {self.__format_query_value(v)}" for (k, v) in doc.items()} 306 return ", ".join(attrs) 307 308 @staticmethod 309 def _cursor_doc_generator(cursor): 310 while True: 311 # batch_items = cursor.batch() 312 # logger.debug(f"batch_items", batch_items) 313 for doc in cursor.batch(): 314 yield doc 315 if not cursor.has_more(): 316 break 317 cursor.next() 318 319 def __format_query_values(self, doc: Dict[str, Any]): 320 return {k: self.__format_query_value(v) for k, v in doc.items()} 321 322 @staticmethod 323 def __format_query_value(value: Any): 324 """ Format value, allowing for literals """ 325 if isinstance(value, str): 326 if value.startswith('`') and value.endswith('`'): 327 # Literal, no quotes: 328 query_expression = value.replace('`', '') 329 else: 330 # Non-literal; quote it: 331 query_expression = f"'{value}'" 332 else: 333 query_expression = value 334 335 return query_expression 336 337 def __autogen_keys(self, collection_name: str, doc: dict): 338 """ Autogenerate key & id using UUID. Can probably be changed to a DB function. """ 339 new_doc = dict(doc) 340 new_doc["_key"] = self.__new_uuid() 341 new_doc["_id"] = f"{collection_name}/{new_doc['_key']}" 342 return new_doc 343 344 @staticmethod 345 def __new_uuid(): 346 """ Return a new UUID """ 347 return str(uuid.uuid1())
Helper class that provides a standard way to create an ABC using inheritance.
21 def __init__(self, delete_db: bool = False): 22 # TODO: These probably don't need to be members: 23 print("CONSTRUCTING DB") 24 self.app_db_name = os.getenv('PAO_APP_DB_NAME') 25 host = os.getenv('PAO_DB_HOST', 'localhost') 26 port = os.getenv('PAO_DB_PORT', 8529) 27 protocol = os.getenv('PAO_DB_PROTOCOL', 'http') 28 root_user = os.getenv('PAO_DB_ROOT_USER') 29 root_password = os.getenv('PAO_DB_ROOT_PASS') 30 31 if self.app_db_name is None: 32 raise ValueError("PAO_APP_DB_NAME needs to be defined in environment or in a .env file.") 33 34 if root_user is None: 35 raise ValueError("PAO_DB_ROOT_USER needs to be defined in environment or in a .env file.") 36 37 if root_password is None: 38 raise ValueError('PAO_DB_ROOT_PASS needs to be defined in environment or in a .env file.') 39 40 self.client = ArangoClient(hosts=f"{protocol}://{host}:{port}") 41 42 # Connect to "_system" database as root user. 43 # This returns an API wrapper for "_system" database. 44 # logger.debug(f"Connecting to system DB with {root_user}:{root_password}...") 45 self.sys_db = self.client.db('_system', username=root_user, password=root_password) 46 47 self.db = None 48 49 # Create a new database if it does not exist. 50 self.setup_app_database(delete_db)
52 def setup_app_database(self, delete_db): 53 """ 54 Setup app databases; deleting if specified by delete_db 55 This is called by constructor, but since this based on a 56 Singleton metaclass, it might neccesary to call it manually 57 after migrations have been appplied and whatnot. 58 """ 59 app_user = os.getenv('PAO_APP_DB_USER', 'root') 60 app_pass = os.getenv('PAO_APP_DB_PASS') 61 62 if app_pass is None: 63 raise ValueError('PAO_APP_DB_PASS needs to be defined in environment or in a .env file.') 64 65 create_db = True 66 if self.sys_db.has_database(self.app_db_name): 67 if delete_db: 68 self.sys_db.delete_database(self.app_db_name, ignore_missing=True) 69 else: 70 create_db = False 71 if create_db: 72 was_created = self.sys_db.create_database(self.app_db_name, [{ 73 'username': app_user, 74 'password': app_pass, 75 'active': True, 76 }]) 77 if not was_created: 78 raise ValueError(f"Database {self.app_db_name} was not created.") 79 80 self.db = self.client.db(self.app_db_name, username=app_user, password=app_pass) 81 self.inject_into_models()
Setup app databases; deleting if specified by delete_db This is called by constructor, but since this based on a Singleton metaclass, it might neccesary to call it manually after migrations have been appplied and whatnot.
83 def inject_into_models(self): 84 """ Inject database into models, as PAODatabase is where the functionality is implemented.""" 85 discoverer = PAOModelDiscovery() 86 model_hash: Dict[str, any] = discoverer.discover() 87 for m, model in model_hash.items(): 88 logger.debug(f"Injecting DB into model {m}") 89 model.db = self 90 91 # Inject into built-in models: 92 PAOMigrationModel.db = self
Inject database into models, as PAODatabase is where the functionality is implemented.
98 def find_by_key(self, collection_name: str, key: Any): 99 """ 100 Find document on collection by given key value: 101 """ 102 return self.db.collection(collection_name).get(key)
Find document on collection by given key value:
137 def find_by_attributes(self, collection_name: str, lookup_key_dict: Dict = None): 138 """ 139 Find a single document by given collection_name, 140 looking up by the keys and values in lookup_key_dict: 141 """ 142 docs = self.get_by_attributes(collection_name, lookup_key_dict) 143 try: 144 result = docs.__next__() 145 except StopIteration: 146 result = None 147 148 return result
Find a single document by given collection_name, looking up by the keys and values in lookup_key_dict:
150 def remove_by_key(self, collection_name: str, key: str): 151 """ Remove a document identified by `key` from collection """ 152 lookup_filter = self._format_lookup_filter({"_key": key}) 153 aql = PAOQueries.AQL_REMOVE_BY_ATTRS.format(lookup_filter=lookup_filter) 154 155 logger.debug(f"REMOVE query: [{aql}]") 156 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 157 return self._cursor_doc_generator(cursor)
Remove a document identified by key
from collection
159 def get_by_attributes( 160 self, 161 collection_name: str, 162 lookup_key_dict: Dict[str, Any] = None, 163 sort_key_dict: Dict[str, Literal['ASC', 'DESC', '']] = None 164 ): 165 """ 166 Gets documents from given collection_name, looking up by the keys and values 167 in lookup_key_dict, sorting by keys and direction 168 169 :param lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values 170 (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents. Values specify 171 direction: [ASC, DESC, ''] 172 """ 173 logger.debug(f"LOOKUP:{lookup_key_dict} and SORT:{sort_key_dict}") 174 lookup_filter = self._format_lookup_filter(lookup_key_dict) 175 sort_by = self._format_sort(sort_key_dict) 176 aql = PAOQueries.AQL_QUERY_BY_ATTRS.format(lookup_filter=lookup_filter, sort_by=sort_by) 177 logger.debug(f"LOOKUP query: {aql}") 178 cursor = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 179 return self._cursor_doc_generator(cursor)
Gets documents from given collection_name, looking up by the keys and values in lookup_key_dict, sorting by keys and direction
Parameters
- lookup_key_dict: A dictionary of keys and corresponding values used to query this collection values (ASC, DESC): :param sort_key_dict: A dictionary of keys by which to sort documents. Values specify direction: [ASC, DESC, '']
181 def insert_edge(self, collection_name: str, association_collection_name: str, from_key, to_key): 182 """ 183 Insert edge document using keys (_from and _to are generated using collection name). 184 Collection inferred from collection_name and association_collection_name. 185 TODO: Add attributes to set on edge 186 """ 187 edge_collection_name = f"{collection_name}__{association_collection_name}" 188 doc = { 189 "_from": f"{collection_name}/{from_key}", 190 "_to": f"{association_collection_name}/{to_key}" 191 } 192 return self.insert_doc(edge_collection_name, doc)
Insert edge document using keys (_from and _to are generated using collection name). Collection inferred from collection_name and association_collection_name. TODO: Add attributes to set on edge
194 def insert_doc(self, collection_name: str, doc: Dict): 195 """ 196 Insert a new doc in collection: 197 """ 198 print(f"Inserting into collection {collection_name}: ", doc) 199 new_doc = self.__autogen_keys(collection_name, doc) 200 insert_attrs = self._format_query_attrs(new_doc) 201 aql = PAOQueries.AQL_INSERT_DOC.format(insert_attrs=insert_attrs) 202 logger.debug(f"INSERT QUERY: {aql}: {insert_attrs}") 203 inserted_docs = self.db.aql.execute(aql, count=True, bind_vars={'@collection': collection_name}) 204 logger.debug(f"inserted_docs: {inserted_docs.count()}") 205 if not inserted_docs.count(): 206 raise RuntimeError(f"Error: collection_name document {doc} was not inserted.") 207 208 return inserted_docs.next()
Insert a new doc in collection:
210 def insert_docs(self, collection_name: str, docs: Sequence[dict[str, Any]]): 211 """ Insert given documents into collection with a single query""" 212 new_docs = [self.__autogen_keys(collection_name, doc) for doc in docs] 213 new_docs = [self.__format_query_values(doc) for doc in new_docs] 214 cursor = self.db.aql.execute(PAOQueries.AQL_INSERT_DOCS, bind_vars={ 215 '@collection': collection_name, 216 'docs': new_docs 217 }) 218 219 return self._cursor_doc_generator(cursor)
Insert given documents into collection with a single query
221 def upsert_doc( 222 self, 223 collection_name: str, 224 doc: Dict, 225 lookup_keys: Sequence[str] = None, 226 insert_dict=None, 227 update_dict=None 228 ): 229 """ 230 Upsert a doc in collection using given lookup_keys 231 collection_name: Name of DB collection 232 doc:Dict: Document containing keys and values to insert and update 233 lookup_keys: Sequence of keys to lookup document to update 234 insert_dict: Dictionary of values to insert only 235 update_dict: Dictionary of values to update only 236 """ 237 if update_dict is None: 238 update_dict = {} 239 if insert_dict is None: 240 insert_dict = {} 241 lookup_key_dict = {k: v for (k, v) in doc.items() if k in lookup_keys} 242 243 new_doc = self.__autogen_keys(collection_name, doc) 244 new_doc.update(insert_dict) 245 update_doc = {k: v for (k, v) in doc.items() if k not in lookup_keys} 246 update_doc.update(update_dict) 247 248 # Create key:val pairs (dictionary without braces): 249 key_attrs = self._format_query_attrs(lookup_key_dict) # str(lookup_key_dict)[1:-1] 250 insert_attrs = self._format_query_attrs(new_doc) # str(new_doc)[1:-1] 251 update_attrs = self._format_query_attrs(update_doc) # str(update_attrs)[1:-1] 252 253 logger.debug(f"key_attrs: {key_attrs}") 254 logger.debug(f"insert_attrs: {insert_attrs}") 255 logger.debug(f"update_attrs: {update_attrs}") 256 257 upsert = PAOQueries.AQL_UPSERT_DOC.format( 258 key_attrs=key_attrs, 259 insert_attrs=insert_attrs, 260 update_attrs=update_attrs) 261 262 logger.debug(f"UPSERT QUERY: {upsert}") 263 upserted_docs = self.db.aql.execute(upsert, count=True, bind_vars={'@collection': collection_name}) 264 logger.debug(f"upserted_docs: {upserted_docs.count()}") 265 if not upserted_docs.count(): 266 raise RuntimeError(f"Error: collection_name document {doc} was not upserted.") 267 268 return upserted_docs.next()
Upsert a doc in collection using given lookup_keys collection_name: Name of DB collection doc:Dict: Document containing keys and values to insert and update lookup_keys: Sequence of keys to lookup document to update insert_dict: Dictionary of values to insert only update_dict: Dictionary of values to update only