python_arango_ogm.db.pao_migration_builder
1import json 2import os 3from pathlib import Path 4from typing import Dict, Sequence, List 5 6from python_arango_ogm.db import pao_model 7from python_arango_ogm.db.pao_indexes import Index, IndexTypeEnum 8from python_arango_ogm.db.pao_migration_model import PAOMigrationModel 9from python_arango_ogm.db.pao_model_discovery import PAOModelDiscovery 10from python_arango_ogm.utils import str_util 11 12MIGRATION_FILE_TEMPLATE = """ 13def up(db): 14 {migration_up} 15 16def down(db): 17 {migration_down} 18""" 19 20INDENT = ' ' * 4 21 22 23class PAOMigrationBuilder: 24 ADD_HASH_INDEX_STR = "{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)" 25 ADD_TTL_INDEX_STR = "{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}" 26 27 def __init__(self, target_path: str = '.', overwrite: bool = False): 28 self.target_path = target_path 29 self.overwrite = overwrite 30 app_package = os.getenv('PAO_APP_PACKAGE') 31 if not app_package: 32 raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)") 33 self.models_module_name = f"{app_package}.models" 34 app_root = app_package.replace('.', '/') 35 p = Path(self.target_path).joinpath(app_root) 36 self.migration_pathname = p.joinpath("migrations") 37 38 print("MIGRATION_PATHNAME:", self.migration_pathname) 39 if not self.migration_pathname.exists(follow_symlinks=False): 40 self.migration_pathname.mkdir() 41 42 self._sync_existing_migrations() 43 44 def create_blank_migration(self, name): 45 self._sync_existing_migrations() 46 mig_filename = self.new_migration_filename(name) 47 with open(mig_filename, 'w') as f: 48 f.write(MIGRATION_FILE_TEMPLATE.format(migration_up='pass', migration_down='pass')) 49 50 def create_model_migrations(self): 51 discovery = PAOModelDiscovery() 52 model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover() 53 54 graph_edges = [] 55 56 mig = self.build_migration(PAOMigrationModel, model_hash) 57 58 # Special collection to track migrations; should always be first migration: 59 if "pao_migrations" not in self.existing_migrations.values(): 60 self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 61 62 for mod in model_hash.values(): 63 mig = self.build_migration(mod, model_hash) 64 self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 65 graph_edges.extend(mig['graph_edges']) 66 67 self.create_graph_migration(graph_edges) 68 69 def create_model_migration( 70 self, 71 mod: type[pao_model.PAOModel], 72 mod_schema: dict, 73 hash_indexes: Sequence, 74 other_indexes: Sequence 75 ): 76 """ 77 Create migration for given model, schema, hash_indexes and other_indexes 78 """ 79 coll_name = mod.collection_name() 80 coll_var = f"{coll_name}_collection" 81 82 mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name) 83 mig_text, schema_var = self._build_migration_file_text( 84 coll_name=coll_name, 85 coll_var=coll_var, 86 mod_schema=mod_schema, 87 hash_indexes=hash_indexes, 88 other_indexes=other_indexes 89 ) 90 91 noop = False 92 if updating: 93 with open(self.migration_pathname.joinpath(existing_mig_filename)) as f: 94 existing_text = f.read() 95 96 if existing_text == mig_text: 97 # Don't save file 98 noop = True 99 print("Migration is the same; skipping.") 100 else: 101 mig = [] 102 mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')") 103 mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})") 104 mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()") 105 mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]") 106 prepend_text = "\n".join(mig) 107 mig_text, _ = self._build_migration_file_text( 108 coll_name=coll_name, 109 coll_var=coll_var, 110 mod_schema=mod_schema, 111 hash_indexes=hash_indexes, 112 other_indexes=other_indexes, 113 prepare_collection_txt=prepend_text 114 ) 115 116 # Write to file: 117 if not noop: 118 pathname = self.migration_pathname.joinpath(mig_filename) 119 with open(pathname, mode="w") as file: 120 file.write(mig_text) 121 self._sync_existing_migrations() 122 123 def create_graph_migration(self, graph_edges: []): 124 """ Create migration for graph_edges: """ 125 graph_name = os.getenv('PAO_GRAPH_NAME') 126 if graph_name is None: 127 raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)") 128 129 print("Creating migration for graph: ", graph_edges) 130 mig_text = self._build_graph_migration_text(graph_edges, graph_name) 131 graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py") 132 noop = False 133 if graph_mig_filename: 134 with open(self.migration_pathname.joinpath(graph_mig_filename)) as f: 135 existing_text = f.read() 136 137 if existing_text == mig_text: 138 # Don't save file 139 noop = True 140 print("Graph Migration is the same; skipping.") 141 142 if not noop: 143 mig_filename = self.new_migration_filename(graph_name) 144 mig_pathname = self.migration_pathname.joinpath(mig_filename) 145 with open(mig_pathname, mode="w") as file: 146 file.write(mig_text) 147 148 self._sync_existing_migrations() 149 150 def new_migration_filename(self, name: str): 151 """ Return filename for a new migration using given name as suffix """ 152 print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations)) 153 print(self.existing_migrations) 154 mig_num = len(self.existing_migrations) + 1 155 mig_filename = f"{mig_num:04}_{name}.py" 156 return mig_filename 157 158 def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence): 159 """ Build migrations for indexes """ 160 up_migration = [] 161 down_migration = [] 162 163 # Build indexes defined on Field objects: 164 self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration) 165 166 # Build indexes defined as Index objects: 167 self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration) 168 169 return up_migration, down_migration 170 171 def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[ 172 str, any]: 173 indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)] 174 175 mod_schema, hash_indexes = self.build_schema(mod) 176 graph_edges = self.build_model_edges(mod, model_hash) 177 other_indexes = [] 178 for oi in indexes: 179 index: Index = getattr(mod, oi) 180 other_indexes.append({ 181 'fields': index.fields, 182 'index_type': index.index_type, 183 'name': index.name, 184 'expiry_seconds': index.expiry_seconds, 185 'unique': index.unique, 186 }) 187 188 return { 189 'mod_schema': mod_schema, 190 'hash_indexes': hash_indexes, 191 'graph_edges': graph_edges, 192 'other_indexes': other_indexes 193 } 194 195 def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]: 196 required = [] 197 hash_indexes = [] 198 properties = {} 199 200 fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)] 201 202 for f in fields: 203 field: pao_model.Field = getattr(mod, f) 204 properties[f] = field.build_schema_properties() 205 if field.required: 206 required.append(f) 207 if field.index_name or field.unique: 208 hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, }) 209 210 print("MOD:", mod.__name__) 211 mod_schema = dict( 212 rule=dict( 213 properties=properties, 214 additionalProperties=mod.ADDITIONAL_PROPERTIES, 215 ), 216 level=mod.LEVEL, 217 ) 218 if len(required): 219 mod_schema['rule']['required'] = required 220 221 return mod_schema, hash_indexes 222 223 def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \ 224 Sequence[Dict]: 225 """ Build model edges and return as a list of dictionaries """ 226 graph_edges = [] 227 edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)] 228 for e in edges: 229 edge: pao_model.PAOEdgeDef = getattr(mod, e) 230 to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model, 231 str) else edge.to_model 232 from_name = mod.collection_name() 233 to_name = to_model.collection_name() 234 edge_name = f"{from_name}__{to_name}" 235 graph_edges.append({ 236 'edge_collection': edge_name, 237 'from_vertex_collections': [from_name], 238 'to_vertex_collections': [to_name] 239 }) 240 return graph_edges 241 242 def _build_migration_file_text( 243 self, 244 coll_name: str, 245 coll_var: str, 246 mod_schema: dict, 247 hash_indexes: Sequence, 248 other_indexes: Sequence, 249 prepare_collection_txt: str = None 250 ) -> tuple[str, str]: 251 up_migration = [] 252 down_migration = [] 253 schema_var = self._add_migration_schema_up(coll_name, mod_schema, up_migration) 254 255 if prepare_collection_txt: 256 up_migration.append(prepare_collection_txt) 257 else: 258 up_migration.append(f"\n{INDENT}{coll_var}=db.create_collection('{coll_name}', {schema_var})") 259 up_migration.append(f"{INDENT}{coll_var}.configure(schema={schema_var})") 260 down_migration.append(f"{coll_var}=db.collections('{coll_name}')") 261 262 # Index migrations: 263 idx_up, idx_down = self.build_index_migrations(coll_var, hash_indexes, other_indexes) 264 up_migration.extend(idx_up) 265 down_migration.extend(idx_down) 266 267 # Down-migration - Delete whole collection: 268 down_migration.append(f"{INDENT}db.delete_collection('{coll_name}', ignore_missing=True)") 269 270 # Format migration stuff into text for file: 271 mig_up = "\n".join(up_migration) 272 mig_down = "\n".join(down_migration) 273 mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down) 274 return mig_text, schema_var 275 276 def _build_other_indexes( 277 self, 278 coll_var: str, 279 other_indexes: Sequence, 280 up_migration: List, 281 down_migration: List 282 ): 283 """ 284 Build indexes defined as Index objects: 285 """ 286 for idx in other_indexes: 287 idx_type: IndexTypeEnum = idx['index_type'] 288 idx_name = idx['name'] 289 if idx_type == IndexTypeEnum.INVERTED: 290 up_migration.append(f"{INDENT}{coll_var}.add_inverted_index(name='{idx_name}', fields={idx['fields']}") 291 elif idx_type == IndexTypeEnum.GEO: 292 up_migration.append(f"{INDENT}{coll_var}.add_geo_index(name='{idx_name}', fields={idx['fields']}") 293 elif idx_type == IndexTypeEnum.TTL: 294 up_migration.append(self.ADD_TTL_INDEX_STR.format( 295 indent=INDENT, 296 coll_var=coll_var, 297 fields=idx['fields'], 298 idx_name=idx_name, 299 expiry_time=idx['expiry_seconds'] 300 )) 301 elif idx_type == IndexTypeEnum.HASH: 302 up_migration.append(self.ADD_HASH_INDEX_STR.format( 303 indent=INDENT, 304 coll_var=coll_var, 305 idx_name=idx_name, 306 fields=idx['fields'], 307 unique=idx['unique'], 308 )) 309 # Add down migration for index: 310 down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')") 311 312 def _build_graph_migration_text(self, graph_edges, graph_name): 313 delete_graph_str = f"db.delete_graph('{graph_name}', ignore_missing=True)" 314 graph_json = self._fix_python_json(json.dumps(graph_edges, indent=4)[2:-2]) 315 graph_mig = [] 316 graph_mig.append(delete_graph_str) 317 graph_mig.append(f"{INDENT}db.create_graph('{graph_name}', [") 318 graph_mig.append(str_util.indent(graph_json, 2)) 319 graph_mig.append(f"{INDENT}])") 320 mig_up = "\n".join(graph_mig) 321 mig_down = delete_graph_str 322 mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down) 323 return mig_text 324 325 def _build_field_index_migrations( 326 self, 327 coll_var: str, 328 hash_indexes: Sequence, 329 up_migration: List, 330 down_migration: List 331 ): 332 """ Build indexes defined on Field objects: """ 333 for hash_index in hash_indexes: 334 idx_name = hash_index['name'] 335 up_migration.append( 336 self.ADD_HASH_INDEX_STR.format( 337 indent=INDENT, 338 coll_var=coll_var, 339 idx_name=idx_name, 340 fields=hash_index['fields'], 341 unique=hash_index['unique'], 342 ) 343 ) 344 down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')") 345 346 def _determine_filename_updating(self, coll_name: str) -> tuple[str, bool, str]: 347 # Determine whether migration already exists 348 # and whether we are overwriting. If not, we 349 # to replace the schema in a new migration. 350 # Likewise, if there are existing indexes 351 # that differ from those specified in the model, they should 352 # be removed and re-added 353 354 existing_mig_filename = self._find_existing_migration(coll_name, suffix=".py") 355 updating = False 356 if self.overwrite: 357 mig_filename = existing_mig_filename or self.new_migration_filename(coll_name) 358 else: 359 # Don't overwrite the migration; add a new one that updates the schema: 360 print(f"NOT OVERWRITE: [{coll_name}]") 361 mig_filename = self.new_migration_filename(coll_name) 362 updating = existing_mig_filename is not None 363 364 return mig_filename, updating, existing_mig_filename 365 366 def _add_migration_schema_up(self, coll_name: str, mod_schema: dict, up_migration: List) -> str: 367 schema_json = self._fix_python_json(json.dumps(mod_schema, indent=4)[2:-2]) 368 print("SCHEMA_JSON:", schema_json) 369 schema_var = f"{coll_name.upper()}_SCHEMA" 370 up_migration.append(f"{schema_var}={{") 371 up_migration.append(str_util.indent(schema_json, 2)) 372 up_migration.append(f"{INDENT}}}") 373 return schema_var 374 375 def _fix_python_json(self, json_str: str): 376 json_str = json_str.replace(': true', ': True') 377 return json_str.replace(': false', ': False') 378 379 def _sync_existing_migrations(self): 380 """ Synchronize self.existing_migrations with migrations on disk """ 381 382 def get_migration_name(migration_filename: str) -> str: 383 return migration_filename.split('_', 1)[-1] 384 385 migs = [c.stem for c in self.migration_pathname.iterdir() if not c.is_dir() and c.suffix == '.py'] 386 self.existing_migrations = {m: get_migration_name(m) for m in sorted(migs)} 387 print("self.existing_migrations:", self.existing_migrations) 388 389 def _find_existing_migration(self, collection_name: str, suffix: str = None): 390 """ Find existing migration based on collection name """ 391 392 try: 393 values = list(self.existing_migrations.values()) 394 idx = values.index(collection_name) 395 except ValueError: 396 mig_name = None 397 else: 398 keys = list(self.existing_migrations.keys()) 399 print(f"Looking for {idx} in {len(keys)}", keys, keys[idx]) 400 mig_name = keys[idx] + str(suffix) 401 return mig_name
MIGRATION_FILE_TEMPLATE =
'\ndef up(db):\n {migration_up}\n \ndef down(db):\n {migration_down}\n'
INDENT =
' '
class
PAOMigrationBuilder:
24class PAOMigrationBuilder: 25 ADD_HASH_INDEX_STR = "{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)" 26 ADD_TTL_INDEX_STR = "{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}" 27 28 def __init__(self, target_path: str = '.', overwrite: bool = False): 29 self.target_path = target_path 30 self.overwrite = overwrite 31 app_package = os.getenv('PAO_APP_PACKAGE') 32 if not app_package: 33 raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)") 34 self.models_module_name = f"{app_package}.models" 35 app_root = app_package.replace('.', '/') 36 p = Path(self.target_path).joinpath(app_root) 37 self.migration_pathname = p.joinpath("migrations") 38 39 print("MIGRATION_PATHNAME:", self.migration_pathname) 40 if not self.migration_pathname.exists(follow_symlinks=False): 41 self.migration_pathname.mkdir() 42 43 self._sync_existing_migrations() 44 45 def create_blank_migration(self, name): 46 self._sync_existing_migrations() 47 mig_filename = self.new_migration_filename(name) 48 with open(mig_filename, 'w') as f: 49 f.write(MIGRATION_FILE_TEMPLATE.format(migration_up='pass', migration_down='pass')) 50 51 def create_model_migrations(self): 52 discovery = PAOModelDiscovery() 53 model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover() 54 55 graph_edges = [] 56 57 mig = self.build_migration(PAOMigrationModel, model_hash) 58 59 # Special collection to track migrations; should always be first migration: 60 if "pao_migrations" not in self.existing_migrations.values(): 61 self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 62 63 for mod in model_hash.values(): 64 mig = self.build_migration(mod, model_hash) 65 self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 66 graph_edges.extend(mig['graph_edges']) 67 68 self.create_graph_migration(graph_edges) 69 70 def create_model_migration( 71 self, 72 mod: type[pao_model.PAOModel], 73 mod_schema: dict, 74 hash_indexes: Sequence, 75 other_indexes: Sequence 76 ): 77 """ 78 Create migration for given model, schema, hash_indexes and other_indexes 79 """ 80 coll_name = mod.collection_name() 81 coll_var = f"{coll_name}_collection" 82 83 mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name) 84 mig_text, schema_var = self._build_migration_file_text( 85 coll_name=coll_name, 86 coll_var=coll_var, 87 mod_schema=mod_schema, 88 hash_indexes=hash_indexes, 89 other_indexes=other_indexes 90 ) 91 92 noop = False 93 if updating: 94 with open(self.migration_pathname.joinpath(existing_mig_filename)) as f: 95 existing_text = f.read() 96 97 if existing_text == mig_text: 98 # Don't save file 99 noop = True 100 print("Migration is the same; skipping.") 101 else: 102 mig = [] 103 mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')") 104 mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})") 105 mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()") 106 mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]") 107 prepend_text = "\n".join(mig) 108 mig_text, _ = self._build_migration_file_text( 109 coll_name=coll_name, 110 coll_var=coll_var, 111 mod_schema=mod_schema, 112 hash_indexes=hash_indexes, 113 other_indexes=other_indexes, 114 prepare_collection_txt=prepend_text 115 ) 116 117 # Write to file: 118 if not noop: 119 pathname = self.migration_pathname.joinpath(mig_filename) 120 with open(pathname, mode="w") as file: 121 file.write(mig_text) 122 self._sync_existing_migrations() 123 124 def create_graph_migration(self, graph_edges: []): 125 """ Create migration for graph_edges: """ 126 graph_name = os.getenv('PAO_GRAPH_NAME') 127 if graph_name is None: 128 raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)") 129 130 print("Creating migration for graph: ", graph_edges) 131 mig_text = self._build_graph_migration_text(graph_edges, graph_name) 132 graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py") 133 noop = False 134 if graph_mig_filename: 135 with open(self.migration_pathname.joinpath(graph_mig_filename)) as f: 136 existing_text = f.read() 137 138 if existing_text == mig_text: 139 # Don't save file 140 noop = True 141 print("Graph Migration is the same; skipping.") 142 143 if not noop: 144 mig_filename = self.new_migration_filename(graph_name) 145 mig_pathname = self.migration_pathname.joinpath(mig_filename) 146 with open(mig_pathname, mode="w") as file: 147 file.write(mig_text) 148 149 self._sync_existing_migrations() 150 151 def new_migration_filename(self, name: str): 152 """ Return filename for a new migration using given name as suffix """ 153 print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations)) 154 print(self.existing_migrations) 155 mig_num = len(self.existing_migrations) + 1 156 mig_filename = f"{mig_num:04}_{name}.py" 157 return mig_filename 158 159 def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence): 160 """ Build migrations for indexes """ 161 up_migration = [] 162 down_migration = [] 163 164 # Build indexes defined on Field objects: 165 self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration) 166 167 # Build indexes defined as Index objects: 168 self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration) 169 170 return up_migration, down_migration 171 172 def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[ 173 str, any]: 174 indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)] 175 176 mod_schema, hash_indexes = self.build_schema(mod) 177 graph_edges = self.build_model_edges(mod, model_hash) 178 other_indexes = [] 179 for oi in indexes: 180 index: Index = getattr(mod, oi) 181 other_indexes.append({ 182 'fields': index.fields, 183 'index_type': index.index_type, 184 'name': index.name, 185 'expiry_seconds': index.expiry_seconds, 186 'unique': index.unique, 187 }) 188 189 return { 190 'mod_schema': mod_schema, 191 'hash_indexes': hash_indexes, 192 'graph_edges': graph_edges, 193 'other_indexes': other_indexes 194 } 195 196 def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]: 197 required = [] 198 hash_indexes = [] 199 properties = {} 200 201 fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)] 202 203 for f in fields: 204 field: pao_model.Field = getattr(mod, f) 205 properties[f] = field.build_schema_properties() 206 if field.required: 207 required.append(f) 208 if field.index_name or field.unique: 209 hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, }) 210 211 print("MOD:", mod.__name__) 212 mod_schema = dict( 213 rule=dict( 214 properties=properties, 215 additionalProperties=mod.ADDITIONAL_PROPERTIES, 216 ), 217 level=mod.LEVEL, 218 ) 219 if len(required): 220 mod_schema['rule']['required'] = required 221 222 return mod_schema, hash_indexes 223 224 def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \ 225 Sequence[Dict]: 226 """ Build model edges and return as a list of dictionaries """ 227 graph_edges = [] 228 edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)] 229 for e in edges: 230 edge: pao_model.PAOEdgeDef = getattr(mod, e) 231 to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model, 232 str) else edge.to_model 233 from_name = mod.collection_name() 234 to_name = to_model.collection_name() 235 edge_name = f"{from_name}__{to_name}" 236 graph_edges.append({ 237 'edge_collection': edge_name, 238 'from_vertex_collections': [from_name], 239 'to_vertex_collections': [to_name] 240 }) 241 return graph_edges 242 243 def _build_migration_file_text( 244 self, 245 coll_name: str, 246 coll_var: str, 247 mod_schema: dict, 248 hash_indexes: Sequence, 249 other_indexes: Sequence, 250 prepare_collection_txt: str = None 251 ) -> tuple[str, str]: 252 up_migration = [] 253 down_migration = [] 254 schema_var = self._add_migration_schema_up(coll_name, mod_schema, up_migration) 255 256 if prepare_collection_txt: 257 up_migration.append(prepare_collection_txt) 258 else: 259 up_migration.append(f"\n{INDENT}{coll_var}=db.create_collection('{coll_name}', {schema_var})") 260 up_migration.append(f"{INDENT}{coll_var}.configure(schema={schema_var})") 261 down_migration.append(f"{coll_var}=db.collections('{coll_name}')") 262 263 # Index migrations: 264 idx_up, idx_down = self.build_index_migrations(coll_var, hash_indexes, other_indexes) 265 up_migration.extend(idx_up) 266 down_migration.extend(idx_down) 267 268 # Down-migration - Delete whole collection: 269 down_migration.append(f"{INDENT}db.delete_collection('{coll_name}', ignore_missing=True)") 270 271 # Format migration stuff into text for file: 272 mig_up = "\n".join(up_migration) 273 mig_down = "\n".join(down_migration) 274 mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down) 275 return mig_text, schema_var 276 277 def _build_other_indexes( 278 self, 279 coll_var: str, 280 other_indexes: Sequence, 281 up_migration: List, 282 down_migration: List 283 ): 284 """ 285 Build indexes defined as Index objects: 286 """ 287 for idx in other_indexes: 288 idx_type: IndexTypeEnum = idx['index_type'] 289 idx_name = idx['name'] 290 if idx_type == IndexTypeEnum.INVERTED: 291 up_migration.append(f"{INDENT}{coll_var}.add_inverted_index(name='{idx_name}', fields={idx['fields']}") 292 elif idx_type == IndexTypeEnum.GEO: 293 up_migration.append(f"{INDENT}{coll_var}.add_geo_index(name='{idx_name}', fields={idx['fields']}") 294 elif idx_type == IndexTypeEnum.TTL: 295 up_migration.append(self.ADD_TTL_INDEX_STR.format( 296 indent=INDENT, 297 coll_var=coll_var, 298 fields=idx['fields'], 299 idx_name=idx_name, 300 expiry_time=idx['expiry_seconds'] 301 )) 302 elif idx_type == IndexTypeEnum.HASH: 303 up_migration.append(self.ADD_HASH_INDEX_STR.format( 304 indent=INDENT, 305 coll_var=coll_var, 306 idx_name=idx_name, 307 fields=idx['fields'], 308 unique=idx['unique'], 309 )) 310 # Add down migration for index: 311 down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')") 312 313 def _build_graph_migration_text(self, graph_edges, graph_name): 314 delete_graph_str = f"db.delete_graph('{graph_name}', ignore_missing=True)" 315 graph_json = self._fix_python_json(json.dumps(graph_edges, indent=4)[2:-2]) 316 graph_mig = [] 317 graph_mig.append(delete_graph_str) 318 graph_mig.append(f"{INDENT}db.create_graph('{graph_name}', [") 319 graph_mig.append(str_util.indent(graph_json, 2)) 320 graph_mig.append(f"{INDENT}])") 321 mig_up = "\n".join(graph_mig) 322 mig_down = delete_graph_str 323 mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down) 324 return mig_text 325 326 def _build_field_index_migrations( 327 self, 328 coll_var: str, 329 hash_indexes: Sequence, 330 up_migration: List, 331 down_migration: List 332 ): 333 """ Build indexes defined on Field objects: """ 334 for hash_index in hash_indexes: 335 idx_name = hash_index['name'] 336 up_migration.append( 337 self.ADD_HASH_INDEX_STR.format( 338 indent=INDENT, 339 coll_var=coll_var, 340 idx_name=idx_name, 341 fields=hash_index['fields'], 342 unique=hash_index['unique'], 343 ) 344 ) 345 down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')") 346 347 def _determine_filename_updating(self, coll_name: str) -> tuple[str, bool, str]: 348 # Determine whether migration already exists 349 # and whether we are overwriting. If not, we 350 # to replace the schema in a new migration. 351 # Likewise, if there are existing indexes 352 # that differ from those specified in the model, they should 353 # be removed and re-added 354 355 existing_mig_filename = self._find_existing_migration(coll_name, suffix=".py") 356 updating = False 357 if self.overwrite: 358 mig_filename = existing_mig_filename or self.new_migration_filename(coll_name) 359 else: 360 # Don't overwrite the migration; add a new one that updates the schema: 361 print(f"NOT OVERWRITE: [{coll_name}]") 362 mig_filename = self.new_migration_filename(coll_name) 363 updating = existing_mig_filename is not None 364 365 return mig_filename, updating, existing_mig_filename 366 367 def _add_migration_schema_up(self, coll_name: str, mod_schema: dict, up_migration: List) -> str: 368 schema_json = self._fix_python_json(json.dumps(mod_schema, indent=4)[2:-2]) 369 print("SCHEMA_JSON:", schema_json) 370 schema_var = f"{coll_name.upper()}_SCHEMA" 371 up_migration.append(f"{schema_var}={{") 372 up_migration.append(str_util.indent(schema_json, 2)) 373 up_migration.append(f"{INDENT}}}") 374 return schema_var 375 376 def _fix_python_json(self, json_str: str): 377 json_str = json_str.replace(': true', ': True') 378 return json_str.replace(': false', ': False') 379 380 def _sync_existing_migrations(self): 381 """ Synchronize self.existing_migrations with migrations on disk """ 382 383 def get_migration_name(migration_filename: str) -> str: 384 return migration_filename.split('_', 1)[-1] 385 386 migs = [c.stem for c in self.migration_pathname.iterdir() if not c.is_dir() and c.suffix == '.py'] 387 self.existing_migrations = {m: get_migration_name(m) for m in sorted(migs)} 388 print("self.existing_migrations:", self.existing_migrations) 389 390 def _find_existing_migration(self, collection_name: str, suffix: str = None): 391 """ Find existing migration based on collection name """ 392 393 try: 394 values = list(self.existing_migrations.values()) 395 idx = values.index(collection_name) 396 except ValueError: 397 mig_name = None 398 else: 399 keys = list(self.existing_migrations.keys()) 400 print(f"Looking for {idx} in {len(keys)}", keys, keys[idx]) 401 mig_name = keys[idx] + str(suffix) 402 return mig_name
PAOMigrationBuilder(target_path: str = '.', overwrite: bool = False)
28 def __init__(self, target_path: str = '.', overwrite: bool = False): 29 self.target_path = target_path 30 self.overwrite = overwrite 31 app_package = os.getenv('PAO_APP_PACKAGE') 32 if not app_package: 33 raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)") 34 self.models_module_name = f"{app_package}.models" 35 app_root = app_package.replace('.', '/') 36 p = Path(self.target_path).joinpath(app_root) 37 self.migration_pathname = p.joinpath("migrations") 38 39 print("MIGRATION_PATHNAME:", self.migration_pathname) 40 if not self.migration_pathname.exists(follow_symlinks=False): 41 self.migration_pathname.mkdir() 42 43 self._sync_existing_migrations()
ADD_HASH_INDEX_STR =
"{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)"
ADD_TTL_INDEX_STR =
"{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}"
def
create_model_migrations(self):
51 def create_model_migrations(self): 52 discovery = PAOModelDiscovery() 53 model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover() 54 55 graph_edges = [] 56 57 mig = self.build_migration(PAOMigrationModel, model_hash) 58 59 # Special collection to track migrations; should always be first migration: 60 if "pao_migrations" not in self.existing_migrations.values(): 61 self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 62 63 for mod in model_hash.values(): 64 mig = self.build_migration(mod, model_hash) 65 self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes']) 66 graph_edges.extend(mig['graph_edges']) 67 68 self.create_graph_migration(graph_edges)
def
create_model_migration( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], mod_schema: dict, hash_indexes: Sequence, other_indexes: Sequence):
70 def create_model_migration( 71 self, 72 mod: type[pao_model.PAOModel], 73 mod_schema: dict, 74 hash_indexes: Sequence, 75 other_indexes: Sequence 76 ): 77 """ 78 Create migration for given model, schema, hash_indexes and other_indexes 79 """ 80 coll_name = mod.collection_name() 81 coll_var = f"{coll_name}_collection" 82 83 mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name) 84 mig_text, schema_var = self._build_migration_file_text( 85 coll_name=coll_name, 86 coll_var=coll_var, 87 mod_schema=mod_schema, 88 hash_indexes=hash_indexes, 89 other_indexes=other_indexes 90 ) 91 92 noop = False 93 if updating: 94 with open(self.migration_pathname.joinpath(existing_mig_filename)) as f: 95 existing_text = f.read() 96 97 if existing_text == mig_text: 98 # Don't save file 99 noop = True 100 print("Migration is the same; skipping.") 101 else: 102 mig = [] 103 mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')") 104 mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})") 105 mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()") 106 mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]") 107 prepend_text = "\n".join(mig) 108 mig_text, _ = self._build_migration_file_text( 109 coll_name=coll_name, 110 coll_var=coll_var, 111 mod_schema=mod_schema, 112 hash_indexes=hash_indexes, 113 other_indexes=other_indexes, 114 prepare_collection_txt=prepend_text 115 ) 116 117 # Write to file: 118 if not noop: 119 pathname = self.migration_pathname.joinpath(mig_filename) 120 with open(pathname, mode="w") as file: 121 file.write(mig_text) 122 self._sync_existing_migrations()
Create migration for given model, schema, hash_indexes and other_indexes
def
create_graph_migration(self, graph_edges: []):
124 def create_graph_migration(self, graph_edges: []): 125 """ Create migration for graph_edges: """ 126 graph_name = os.getenv('PAO_GRAPH_NAME') 127 if graph_name is None: 128 raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)") 129 130 print("Creating migration for graph: ", graph_edges) 131 mig_text = self._build_graph_migration_text(graph_edges, graph_name) 132 graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py") 133 noop = False 134 if graph_mig_filename: 135 with open(self.migration_pathname.joinpath(graph_mig_filename)) as f: 136 existing_text = f.read() 137 138 if existing_text == mig_text: 139 # Don't save file 140 noop = True 141 print("Graph Migration is the same; skipping.") 142 143 if not noop: 144 mig_filename = self.new_migration_filename(graph_name) 145 mig_pathname = self.migration_pathname.joinpath(mig_filename) 146 with open(mig_pathname, mode="w") as file: 147 file.write(mig_text) 148 149 self._sync_existing_migrations()
Create migration for graph_edges:
def
new_migration_filename(self, name: str):
151 def new_migration_filename(self, name: str): 152 """ Return filename for a new migration using given name as suffix """ 153 print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations)) 154 print(self.existing_migrations) 155 mig_num = len(self.existing_migrations) + 1 156 mig_filename = f"{mig_num:04}_{name}.py" 157 return mig_filename
Return filename for a new migration using given name as suffix
def
build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence):
159 def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence): 160 """ Build migrations for indexes """ 161 up_migration = [] 162 down_migration = [] 163 164 # Build indexes defined on Field objects: 165 self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration) 166 167 # Build indexes defined as Index objects: 168 self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration) 169 170 return up_migration, down_migration
Build migrations for indexes
def
build_migration( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], model_hash: Dict[str, type[python_arango_ogm.db.pao_model.PAOModel]]) -> Dict[str, <built-in function any>]:
172 def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[ 173 str, any]: 174 indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)] 175 176 mod_schema, hash_indexes = self.build_schema(mod) 177 graph_edges = self.build_model_edges(mod, model_hash) 178 other_indexes = [] 179 for oi in indexes: 180 index: Index = getattr(mod, oi) 181 other_indexes.append({ 182 'fields': index.fields, 183 'index_type': index.index_type, 184 'name': index.name, 185 'expiry_seconds': index.expiry_seconds, 186 'unique': index.unique, 187 }) 188 189 return { 190 'mod_schema': mod_schema, 191 'hash_indexes': hash_indexes, 192 'graph_edges': graph_edges, 193 'other_indexes': other_indexes 194 }
def
build_schema( self, mod: type[python_arango_ogm.db.pao_model.PAOModel]) -> tuple[typing.Dict, typing.Sequence]:
196 def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]: 197 required = [] 198 hash_indexes = [] 199 properties = {} 200 201 fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)] 202 203 for f in fields: 204 field: pao_model.Field = getattr(mod, f) 205 properties[f] = field.build_schema_properties() 206 if field.required: 207 required.append(f) 208 if field.index_name or field.unique: 209 hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, }) 210 211 print("MOD:", mod.__name__) 212 mod_schema = dict( 213 rule=dict( 214 properties=properties, 215 additionalProperties=mod.ADDITIONAL_PROPERTIES, 216 ), 217 level=mod.LEVEL, 218 ) 219 if len(required): 220 mod_schema['rule']['required'] = required 221 222 return mod_schema, hash_indexes
def
build_model_edges( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], model_hash: Dict[str, type[python_arango_ogm.db.pao_model.PAOModel]]) -> Sequence[Dict]:
224 def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \ 225 Sequence[Dict]: 226 """ Build model edges and return as a list of dictionaries """ 227 graph_edges = [] 228 edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)] 229 for e in edges: 230 edge: pao_model.PAOEdgeDef = getattr(mod, e) 231 to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model, 232 str) else edge.to_model 233 from_name = mod.collection_name() 234 to_name = to_model.collection_name() 235 edge_name = f"{from_name}__{to_name}" 236 graph_edges.append({ 237 'edge_collection': edge_name, 238 'from_vertex_collections': [from_name], 239 'to_vertex_collections': [to_name] 240 }) 241 return graph_edges
Build model edges and return as a list of dictionaries