python_arango_ogm.db.pao_migration_builder

  1import json
  2import os
  3from pathlib import Path
  4from typing import Dict, Sequence, List
  5
  6from python_arango_ogm.db import pao_model
  7from python_arango_ogm.db.pao_indexes import Index, IndexTypeEnum
  8from python_arango_ogm.db.pao_migration_model import PAOMigrationModel
  9from python_arango_ogm.db.pao_model_discovery import PAOModelDiscovery
 10from python_arango_ogm.utils import str_util
 11
 12MIGRATION_FILE_TEMPLATE = """
 13def up(db):
 14    {migration_up}
 15    
 16def down(db):
 17    {migration_down}
 18"""
 19
 20INDENT = ' ' * 4
 21
 22
 23class PAOMigrationBuilder:
 24    ADD_HASH_INDEX_STR = "{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)"
 25    ADD_TTL_INDEX_STR = "{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}"
 26
 27    def __init__(self, target_path: str = '.', overwrite: bool = False):
 28        self.target_path = target_path
 29        self.overwrite = overwrite
 30        app_package = os.getenv('PAO_APP_PACKAGE')
 31        if not app_package:
 32            raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)")
 33        self.models_module_name = f"{app_package}.models"
 34        app_root = app_package.replace('.', '/')
 35        p = Path(self.target_path).joinpath(app_root)
 36        self.migration_pathname = p.joinpath("migrations")
 37
 38        print("MIGRATION_PATHNAME:", self.migration_pathname)
 39        if not self.migration_pathname.exists(follow_symlinks=False):
 40            self.migration_pathname.mkdir()
 41
 42        self._sync_existing_migrations()
 43
 44    def create_blank_migration(self, name):
 45        self._sync_existing_migrations()
 46        mig_filename = self.new_migration_filename(name)
 47        with open(mig_filename, 'w') as f:
 48            f.write(MIGRATION_FILE_TEMPLATE.format(migration_up='pass', migration_down='pass'))
 49
 50    def create_model_migrations(self):
 51        discovery = PAOModelDiscovery()
 52        model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover()
 53
 54        graph_edges = []
 55
 56        mig = self.build_migration(PAOMigrationModel, model_hash)
 57
 58        # Special collection to track migrations; should always be first migration:
 59        if "pao_migrations" not in self.existing_migrations.values():
 60            self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
 61
 62        for mod in model_hash.values():
 63            mig = self.build_migration(mod, model_hash)
 64            self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
 65            graph_edges.extend(mig['graph_edges'])
 66
 67        self.create_graph_migration(graph_edges)
 68
 69    def create_model_migration(
 70            self,
 71            mod: type[pao_model.PAOModel],
 72            mod_schema: dict,
 73            hash_indexes: Sequence,
 74            other_indexes: Sequence
 75    ):
 76        """
 77            Create migration for given model, schema, hash_indexes and other_indexes
 78        """
 79        coll_name = mod.collection_name()
 80        coll_var = f"{coll_name}_collection"
 81
 82        mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name)
 83        mig_text, schema_var = self._build_migration_file_text(
 84            coll_name=coll_name,
 85            coll_var=coll_var,
 86            mod_schema=mod_schema,
 87            hash_indexes=hash_indexes,
 88            other_indexes=other_indexes
 89        )
 90
 91        noop = False
 92        if updating:
 93            with open(self.migration_pathname.joinpath(existing_mig_filename)) as f:
 94                existing_text = f.read()
 95
 96            if existing_text == mig_text:
 97                # Don't save file
 98                noop = True
 99                print("Migration is the same; skipping.")
100            else:
101                mig = []
102                mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')")
103                mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})")
104                mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()")
105                mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]")
106                prepend_text = "\n".join(mig)
107                mig_text, _ = self._build_migration_file_text(
108                    coll_name=coll_name,
109                    coll_var=coll_var,
110                    mod_schema=mod_schema,
111                    hash_indexes=hash_indexes,
112                    other_indexes=other_indexes,
113                    prepare_collection_txt=prepend_text
114                )
115
116        # Write to file:
117        if not noop:
118            pathname = self.migration_pathname.joinpath(mig_filename)
119            with open(pathname, mode="w") as file:
120                file.write(mig_text)
121            self._sync_existing_migrations()
122
123    def create_graph_migration(self, graph_edges: []):
124        """ Create migration for graph_edges: """
125        graph_name = os.getenv('PAO_GRAPH_NAME')
126        if graph_name is None:
127            raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)")
128
129        print("Creating migration for graph: ", graph_edges)
130        mig_text = self._build_graph_migration_text(graph_edges, graph_name)
131        graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py")
132        noop = False
133        if graph_mig_filename:
134            with open(self.migration_pathname.joinpath(graph_mig_filename)) as f:
135                existing_text = f.read()
136
137            if existing_text == mig_text:
138                # Don't save file
139                noop = True
140                print("Graph Migration is the same; skipping.")
141
142        if not noop:
143            mig_filename = self.new_migration_filename(graph_name)
144            mig_pathname = self.migration_pathname.joinpath(mig_filename)
145            with open(mig_pathname, mode="w") as file:
146                file.write(mig_text)
147
148            self._sync_existing_migrations()
149
150    def new_migration_filename(self, name: str):
151        """ Return filename for a new migration using given name as suffix """
152        print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations))
153        print(self.existing_migrations)
154        mig_num = len(self.existing_migrations) + 1
155        mig_filename = f"{mig_num:04}_{name}.py"
156        return mig_filename
157
158    def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence):
159        """ Build migrations for indexes """
160        up_migration = []
161        down_migration = []
162
163        # Build indexes defined on Field objects:
164        self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration)
165
166        # Build indexes defined as Index objects:
167        self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration)
168
169        return up_migration, down_migration
170
171    def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[
172        str, any]:
173        indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)]
174
175        mod_schema, hash_indexes = self.build_schema(mod)
176        graph_edges = self.build_model_edges(mod, model_hash)
177        other_indexes = []
178        for oi in indexes:
179            index: Index = getattr(mod, oi)
180            other_indexes.append({
181                'fields': index.fields,
182                'index_type': index.index_type,
183                'name': index.name,
184                'expiry_seconds': index.expiry_seconds,
185                'unique': index.unique,
186            })
187
188        return {
189            'mod_schema': mod_schema,
190            'hash_indexes': hash_indexes,
191            'graph_edges': graph_edges,
192            'other_indexes': other_indexes
193        }
194
195    def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]:
196        required = []
197        hash_indexes = []
198        properties = {}
199
200        fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)]
201
202        for f in fields:
203            field: pao_model.Field = getattr(mod, f)
204            properties[f] = field.build_schema_properties()
205            if field.required:
206                required.append(f)
207            if field.index_name or field.unique:
208                hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, })
209
210        print("MOD:", mod.__name__)
211        mod_schema = dict(
212            rule=dict(
213                properties=properties,
214                additionalProperties=mod.ADDITIONAL_PROPERTIES,
215            ),
216            level=mod.LEVEL,
217        )
218        if len(required):
219            mod_schema['rule']['required'] = required
220
221        return mod_schema, hash_indexes
222
223    def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \
224            Sequence[Dict]:
225        """ Build model edges and return as a list of dictionaries """
226        graph_edges = []
227        edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)]
228        for e in edges:
229            edge: pao_model.PAOEdgeDef = getattr(mod, e)
230            to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model,
231                                                                                         str) else edge.to_model
232            from_name = mod.collection_name()
233            to_name = to_model.collection_name()
234            edge_name = f"{from_name}__{to_name}"
235            graph_edges.append({
236                'edge_collection': edge_name,
237                'from_vertex_collections': [from_name],
238                'to_vertex_collections': [to_name]
239            })
240        return graph_edges
241
242    def _build_migration_file_text(
243            self,
244            coll_name: str,
245            coll_var: str,
246            mod_schema: dict,
247            hash_indexes: Sequence,
248            other_indexes: Sequence,
249            prepare_collection_txt: str = None
250    ) -> tuple[str, str]:
251        up_migration = []
252        down_migration = []
253        schema_var = self._add_migration_schema_up(coll_name, mod_schema, up_migration)
254
255        if prepare_collection_txt:
256            up_migration.append(prepare_collection_txt)
257        else:
258            up_migration.append(f"\n{INDENT}{coll_var}=db.create_collection('{coll_name}', {schema_var})")
259            up_migration.append(f"{INDENT}{coll_var}.configure(schema={schema_var})")
260        down_migration.append(f"{coll_var}=db.collections('{coll_name}')")
261
262        # Index migrations:
263        idx_up, idx_down = self.build_index_migrations(coll_var, hash_indexes, other_indexes)
264        up_migration.extend(idx_up)
265        down_migration.extend(idx_down)
266
267        # Down-migration - Delete whole collection:
268        down_migration.append(f"{INDENT}db.delete_collection('{coll_name}', ignore_missing=True)")
269
270        # Format migration stuff into text for file:
271        mig_up = "\n".join(up_migration)
272        mig_down = "\n".join(down_migration)
273        mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down)
274        return mig_text, schema_var
275
276    def _build_other_indexes(
277            self,
278            coll_var: str,
279            other_indexes: Sequence,
280            up_migration: List,
281            down_migration: List
282    ):
283        """
284        Build indexes defined as Index objects:
285        """
286        for idx in other_indexes:
287            idx_type: IndexTypeEnum = idx['index_type']
288            idx_name = idx['name']
289            if idx_type == IndexTypeEnum.INVERTED:
290                up_migration.append(f"{INDENT}{coll_var}.add_inverted_index(name='{idx_name}', fields={idx['fields']}")
291            elif idx_type == IndexTypeEnum.GEO:
292                up_migration.append(f"{INDENT}{coll_var}.add_geo_index(name='{idx_name}', fields={idx['fields']}")
293            elif idx_type == IndexTypeEnum.TTL:
294                up_migration.append(self.ADD_TTL_INDEX_STR.format(
295                    indent=INDENT,
296                    coll_var=coll_var,
297                    fields=idx['fields'],
298                    idx_name=idx_name,
299                    expiry_time=idx['expiry_seconds']
300                ))
301            elif idx_type == IndexTypeEnum.HASH:
302                up_migration.append(self.ADD_HASH_INDEX_STR.format(
303                    indent=INDENT,
304                    coll_var=coll_var,
305                    idx_name=idx_name,
306                    fields=idx['fields'],
307                    unique=idx['unique'],
308                ))
309            # Add down migration for index:
310            down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')")
311
312    def _build_graph_migration_text(self, graph_edges, graph_name):
313        delete_graph_str = f"db.delete_graph('{graph_name}', ignore_missing=True)"
314        graph_json = self._fix_python_json(json.dumps(graph_edges, indent=4)[2:-2])
315        graph_mig = []
316        graph_mig.append(delete_graph_str)
317        graph_mig.append(f"{INDENT}db.create_graph('{graph_name}', [")
318        graph_mig.append(str_util.indent(graph_json, 2))
319        graph_mig.append(f"{INDENT}])")
320        mig_up = "\n".join(graph_mig)
321        mig_down = delete_graph_str
322        mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down)
323        return mig_text
324
325    def _build_field_index_migrations(
326            self,
327            coll_var: str,
328            hash_indexes: Sequence,
329            up_migration: List,
330            down_migration: List
331    ):
332        """ Build indexes defined on Field objects: """
333        for hash_index in hash_indexes:
334            idx_name = hash_index['name']
335            up_migration.append(
336                self.ADD_HASH_INDEX_STR.format(
337                    indent=INDENT,
338                    coll_var=coll_var,
339                    idx_name=idx_name,
340                    fields=hash_index['fields'],
341                    unique=hash_index['unique'],
342                )
343            )
344            down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')")
345
346    def _determine_filename_updating(self, coll_name: str) -> tuple[str, bool, str]:
347        # Determine whether migration already exists
348        # and whether we are overwriting.  If not, we
349        # to replace the schema in a new migration.
350        # Likewise, if there are existing indexes
351        # that differ from those specified in the model, they should
352        # be removed and re-added
353
354        existing_mig_filename = self._find_existing_migration(coll_name, suffix=".py")
355        updating = False
356        if self.overwrite:
357            mig_filename = existing_mig_filename or self.new_migration_filename(coll_name)
358        else:
359            # Don't overwrite the migration; add a new one that updates the schema:
360            print(f"NOT OVERWRITE: [{coll_name}]")
361            mig_filename = self.new_migration_filename(coll_name)
362            updating = existing_mig_filename is not None
363
364        return mig_filename, updating, existing_mig_filename
365
366    def _add_migration_schema_up(self, coll_name: str, mod_schema: dict, up_migration: List) -> str:
367        schema_json = self._fix_python_json(json.dumps(mod_schema, indent=4)[2:-2])
368        print("SCHEMA_JSON:", schema_json)
369        schema_var = f"{coll_name.upper()}_SCHEMA"
370        up_migration.append(f"{schema_var}={{")
371        up_migration.append(str_util.indent(schema_json, 2))
372        up_migration.append(f"{INDENT}}}")
373        return schema_var
374
375    def _fix_python_json(self, json_str: str):
376        json_str = json_str.replace(': true', ': True')
377        return json_str.replace(': false', ': False')
378
379    def _sync_existing_migrations(self):
380        """ Synchronize self.existing_migrations with migrations on disk """
381
382        def get_migration_name(migration_filename: str) -> str:
383            return migration_filename.split('_', 1)[-1]
384
385        migs = [c.stem for c in self.migration_pathname.iterdir() if not c.is_dir() and c.suffix == '.py']
386        self.existing_migrations = {m: get_migration_name(m) for m in sorted(migs)}
387        print("self.existing_migrations:", self.existing_migrations)
388
389    def _find_existing_migration(self, collection_name: str, suffix: str = None):
390        """ Find existing migration based on collection name """
391
392        try:
393            values = list(self.existing_migrations.values())
394            idx = values.index(collection_name)
395        except ValueError:
396            mig_name = None
397        else:
398            keys = list(self.existing_migrations.keys())
399            print(f"Looking for {idx} in {len(keys)}", keys, keys[idx])
400            mig_name = keys[idx] + str(suffix)
401        return mig_name
MIGRATION_FILE_TEMPLATE = '\ndef up(db):\n {migration_up}\n \ndef down(db):\n {migration_down}\n'
INDENT = ' '
class PAOMigrationBuilder:
 24class PAOMigrationBuilder:
 25    ADD_HASH_INDEX_STR = "{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)"
 26    ADD_TTL_INDEX_STR = "{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}"
 27
 28    def __init__(self, target_path: str = '.', overwrite: bool = False):
 29        self.target_path = target_path
 30        self.overwrite = overwrite
 31        app_package = os.getenv('PAO_APP_PACKAGE')
 32        if not app_package:
 33            raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)")
 34        self.models_module_name = f"{app_package}.models"
 35        app_root = app_package.replace('.', '/')
 36        p = Path(self.target_path).joinpath(app_root)
 37        self.migration_pathname = p.joinpath("migrations")
 38
 39        print("MIGRATION_PATHNAME:", self.migration_pathname)
 40        if not self.migration_pathname.exists(follow_symlinks=False):
 41            self.migration_pathname.mkdir()
 42
 43        self._sync_existing_migrations()
 44
 45    def create_blank_migration(self, name):
 46        self._sync_existing_migrations()
 47        mig_filename = self.new_migration_filename(name)
 48        with open(mig_filename, 'w') as f:
 49            f.write(MIGRATION_FILE_TEMPLATE.format(migration_up='pass', migration_down='pass'))
 50
 51    def create_model_migrations(self):
 52        discovery = PAOModelDiscovery()
 53        model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover()
 54
 55        graph_edges = []
 56
 57        mig = self.build_migration(PAOMigrationModel, model_hash)
 58
 59        # Special collection to track migrations; should always be first migration:
 60        if "pao_migrations" not in self.existing_migrations.values():
 61            self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
 62
 63        for mod in model_hash.values():
 64            mig = self.build_migration(mod, model_hash)
 65            self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
 66            graph_edges.extend(mig['graph_edges'])
 67
 68        self.create_graph_migration(graph_edges)
 69
 70    def create_model_migration(
 71            self,
 72            mod: type[pao_model.PAOModel],
 73            mod_schema: dict,
 74            hash_indexes: Sequence,
 75            other_indexes: Sequence
 76    ):
 77        """
 78            Create migration for given model, schema, hash_indexes and other_indexes
 79        """
 80        coll_name = mod.collection_name()
 81        coll_var = f"{coll_name}_collection"
 82
 83        mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name)
 84        mig_text, schema_var = self._build_migration_file_text(
 85            coll_name=coll_name,
 86            coll_var=coll_var,
 87            mod_schema=mod_schema,
 88            hash_indexes=hash_indexes,
 89            other_indexes=other_indexes
 90        )
 91
 92        noop = False
 93        if updating:
 94            with open(self.migration_pathname.joinpath(existing_mig_filename)) as f:
 95                existing_text = f.read()
 96
 97            if existing_text == mig_text:
 98                # Don't save file
 99                noop = True
100                print("Migration is the same; skipping.")
101            else:
102                mig = []
103                mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')")
104                mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})")
105                mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()")
106                mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]")
107                prepend_text = "\n".join(mig)
108                mig_text, _ = self._build_migration_file_text(
109                    coll_name=coll_name,
110                    coll_var=coll_var,
111                    mod_schema=mod_schema,
112                    hash_indexes=hash_indexes,
113                    other_indexes=other_indexes,
114                    prepare_collection_txt=prepend_text
115                )
116
117        # Write to file:
118        if not noop:
119            pathname = self.migration_pathname.joinpath(mig_filename)
120            with open(pathname, mode="w") as file:
121                file.write(mig_text)
122            self._sync_existing_migrations()
123
124    def create_graph_migration(self, graph_edges: []):
125        """ Create migration for graph_edges: """
126        graph_name = os.getenv('PAO_GRAPH_NAME')
127        if graph_name is None:
128            raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)")
129
130        print("Creating migration for graph: ", graph_edges)
131        mig_text = self._build_graph_migration_text(graph_edges, graph_name)
132        graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py")
133        noop = False
134        if graph_mig_filename:
135            with open(self.migration_pathname.joinpath(graph_mig_filename)) as f:
136                existing_text = f.read()
137
138            if existing_text == mig_text:
139                # Don't save file
140                noop = True
141                print("Graph Migration is the same; skipping.")
142
143        if not noop:
144            mig_filename = self.new_migration_filename(graph_name)
145            mig_pathname = self.migration_pathname.joinpath(mig_filename)
146            with open(mig_pathname, mode="w") as file:
147                file.write(mig_text)
148
149            self._sync_existing_migrations()
150
151    def new_migration_filename(self, name: str):
152        """ Return filename for a new migration using given name as suffix """
153        print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations))
154        print(self.existing_migrations)
155        mig_num = len(self.existing_migrations) + 1
156        mig_filename = f"{mig_num:04}_{name}.py"
157        return mig_filename
158
159    def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence):
160        """ Build migrations for indexes """
161        up_migration = []
162        down_migration = []
163
164        # Build indexes defined on Field objects:
165        self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration)
166
167        # Build indexes defined as Index objects:
168        self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration)
169
170        return up_migration, down_migration
171
172    def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[
173        str, any]:
174        indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)]
175
176        mod_schema, hash_indexes = self.build_schema(mod)
177        graph_edges = self.build_model_edges(mod, model_hash)
178        other_indexes = []
179        for oi in indexes:
180            index: Index = getattr(mod, oi)
181            other_indexes.append({
182                'fields': index.fields,
183                'index_type': index.index_type,
184                'name': index.name,
185                'expiry_seconds': index.expiry_seconds,
186                'unique': index.unique,
187            })
188
189        return {
190            'mod_schema': mod_schema,
191            'hash_indexes': hash_indexes,
192            'graph_edges': graph_edges,
193            'other_indexes': other_indexes
194        }
195
196    def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]:
197        required = []
198        hash_indexes = []
199        properties = {}
200
201        fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)]
202
203        for f in fields:
204            field: pao_model.Field = getattr(mod, f)
205            properties[f] = field.build_schema_properties()
206            if field.required:
207                required.append(f)
208            if field.index_name or field.unique:
209                hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, })
210
211        print("MOD:", mod.__name__)
212        mod_schema = dict(
213            rule=dict(
214                properties=properties,
215                additionalProperties=mod.ADDITIONAL_PROPERTIES,
216            ),
217            level=mod.LEVEL,
218        )
219        if len(required):
220            mod_schema['rule']['required'] = required
221
222        return mod_schema, hash_indexes
223
224    def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \
225            Sequence[Dict]:
226        """ Build model edges and return as a list of dictionaries """
227        graph_edges = []
228        edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)]
229        for e in edges:
230            edge: pao_model.PAOEdgeDef = getattr(mod, e)
231            to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model,
232                                                                                         str) else edge.to_model
233            from_name = mod.collection_name()
234            to_name = to_model.collection_name()
235            edge_name = f"{from_name}__{to_name}"
236            graph_edges.append({
237                'edge_collection': edge_name,
238                'from_vertex_collections': [from_name],
239                'to_vertex_collections': [to_name]
240            })
241        return graph_edges
242
243    def _build_migration_file_text(
244            self,
245            coll_name: str,
246            coll_var: str,
247            mod_schema: dict,
248            hash_indexes: Sequence,
249            other_indexes: Sequence,
250            prepare_collection_txt: str = None
251    ) -> tuple[str, str]:
252        up_migration = []
253        down_migration = []
254        schema_var = self._add_migration_schema_up(coll_name, mod_schema, up_migration)
255
256        if prepare_collection_txt:
257            up_migration.append(prepare_collection_txt)
258        else:
259            up_migration.append(f"\n{INDENT}{coll_var}=db.create_collection('{coll_name}', {schema_var})")
260            up_migration.append(f"{INDENT}{coll_var}.configure(schema={schema_var})")
261        down_migration.append(f"{coll_var}=db.collections('{coll_name}')")
262
263        # Index migrations:
264        idx_up, idx_down = self.build_index_migrations(coll_var, hash_indexes, other_indexes)
265        up_migration.extend(idx_up)
266        down_migration.extend(idx_down)
267
268        # Down-migration - Delete whole collection:
269        down_migration.append(f"{INDENT}db.delete_collection('{coll_name}', ignore_missing=True)")
270
271        # Format migration stuff into text for file:
272        mig_up = "\n".join(up_migration)
273        mig_down = "\n".join(down_migration)
274        mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down)
275        return mig_text, schema_var
276
277    def _build_other_indexes(
278            self,
279            coll_var: str,
280            other_indexes: Sequence,
281            up_migration: List,
282            down_migration: List
283    ):
284        """
285        Build indexes defined as Index objects:
286        """
287        for idx in other_indexes:
288            idx_type: IndexTypeEnum = idx['index_type']
289            idx_name = idx['name']
290            if idx_type == IndexTypeEnum.INVERTED:
291                up_migration.append(f"{INDENT}{coll_var}.add_inverted_index(name='{idx_name}', fields={idx['fields']}")
292            elif idx_type == IndexTypeEnum.GEO:
293                up_migration.append(f"{INDENT}{coll_var}.add_geo_index(name='{idx_name}', fields={idx['fields']}")
294            elif idx_type == IndexTypeEnum.TTL:
295                up_migration.append(self.ADD_TTL_INDEX_STR.format(
296                    indent=INDENT,
297                    coll_var=coll_var,
298                    fields=idx['fields'],
299                    idx_name=idx_name,
300                    expiry_time=idx['expiry_seconds']
301                ))
302            elif idx_type == IndexTypeEnum.HASH:
303                up_migration.append(self.ADD_HASH_INDEX_STR.format(
304                    indent=INDENT,
305                    coll_var=coll_var,
306                    idx_name=idx_name,
307                    fields=idx['fields'],
308                    unique=idx['unique'],
309                ))
310            # Add down migration for index:
311            down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')")
312
313    def _build_graph_migration_text(self, graph_edges, graph_name):
314        delete_graph_str = f"db.delete_graph('{graph_name}', ignore_missing=True)"
315        graph_json = self._fix_python_json(json.dumps(graph_edges, indent=4)[2:-2])
316        graph_mig = []
317        graph_mig.append(delete_graph_str)
318        graph_mig.append(f"{INDENT}db.create_graph('{graph_name}', [")
319        graph_mig.append(str_util.indent(graph_json, 2))
320        graph_mig.append(f"{INDENT}])")
321        mig_up = "\n".join(graph_mig)
322        mig_down = delete_graph_str
323        mig_text = MIGRATION_FILE_TEMPLATE.format(migration_up=mig_up, migration_down=mig_down)
324        return mig_text
325
326    def _build_field_index_migrations(
327            self,
328            coll_var: str,
329            hash_indexes: Sequence,
330            up_migration: List,
331            down_migration: List
332    ):
333        """ Build indexes defined on Field objects: """
334        for hash_index in hash_indexes:
335            idx_name = hash_index['name']
336            up_migration.append(
337                self.ADD_HASH_INDEX_STR.format(
338                    indent=INDENT,
339                    coll_var=coll_var,
340                    idx_name=idx_name,
341                    fields=hash_index['fields'],
342                    unique=hash_index['unique'],
343                )
344            )
345            down_migration.append(f"{INDENT}{coll_var}.delete_index('{idx_name}')")
346
347    def _determine_filename_updating(self, coll_name: str) -> tuple[str, bool, str]:
348        # Determine whether migration already exists
349        # and whether we are overwriting.  If not, we
350        # to replace the schema in a new migration.
351        # Likewise, if there are existing indexes
352        # that differ from those specified in the model, they should
353        # be removed and re-added
354
355        existing_mig_filename = self._find_existing_migration(coll_name, suffix=".py")
356        updating = False
357        if self.overwrite:
358            mig_filename = existing_mig_filename or self.new_migration_filename(coll_name)
359        else:
360            # Don't overwrite the migration; add a new one that updates the schema:
361            print(f"NOT OVERWRITE: [{coll_name}]")
362            mig_filename = self.new_migration_filename(coll_name)
363            updating = existing_mig_filename is not None
364
365        return mig_filename, updating, existing_mig_filename
366
367    def _add_migration_schema_up(self, coll_name: str, mod_schema: dict, up_migration: List) -> str:
368        schema_json = self._fix_python_json(json.dumps(mod_schema, indent=4)[2:-2])
369        print("SCHEMA_JSON:", schema_json)
370        schema_var = f"{coll_name.upper()}_SCHEMA"
371        up_migration.append(f"{schema_var}={{")
372        up_migration.append(str_util.indent(schema_json, 2))
373        up_migration.append(f"{INDENT}}}")
374        return schema_var
375
376    def _fix_python_json(self, json_str: str):
377        json_str = json_str.replace(': true', ': True')
378        return json_str.replace(': false', ': False')
379
380    def _sync_existing_migrations(self):
381        """ Synchronize self.existing_migrations with migrations on disk """
382
383        def get_migration_name(migration_filename: str) -> str:
384            return migration_filename.split('_', 1)[-1]
385
386        migs = [c.stem for c in self.migration_pathname.iterdir() if not c.is_dir() and c.suffix == '.py']
387        self.existing_migrations = {m: get_migration_name(m) for m in sorted(migs)}
388        print("self.existing_migrations:", self.existing_migrations)
389
390    def _find_existing_migration(self, collection_name: str, suffix: str = None):
391        """ Find existing migration based on collection name """
392
393        try:
394            values = list(self.existing_migrations.values())
395            idx = values.index(collection_name)
396        except ValueError:
397            mig_name = None
398        else:
399            keys = list(self.existing_migrations.keys())
400            print(f"Looking for {idx} in {len(keys)}", keys, keys[idx])
401            mig_name = keys[idx] + str(suffix)
402        return mig_name
PAOMigrationBuilder(target_path: str = '.', overwrite: bool = False)
28    def __init__(self, target_path: str = '.', overwrite: bool = False):
29        self.target_path = target_path
30        self.overwrite = overwrite
31        app_package = os.getenv('PAO_APP_PACKAGE')
32        if not app_package:
33            raise RuntimeError("PAO_APP_PACKAGE must be defined in the environment (or a .env.test file)")
34        self.models_module_name = f"{app_package}.models"
35        app_root = app_package.replace('.', '/')
36        p = Path(self.target_path).joinpath(app_root)
37        self.migration_pathname = p.joinpath("migrations")
38
39        print("MIGRATION_PATHNAME:", self.migration_pathname)
40        if not self.migration_pathname.exists(follow_symlinks=False):
41            self.migration_pathname.mkdir()
42
43        self._sync_existing_migrations()
ADD_HASH_INDEX_STR = "{indent}{coll_var}.add_hash_index(name='{idx_name}', fields={fields}, unique={unique}, deduplicate=True)"
ADD_TTL_INDEX_STR = "{indent}{coll_var}.add_ttl_index({fields}, name='{idx_name}', expiry_time={expiry_time}"
target_path
overwrite
models_module_name
migration_pathname
def create_blank_migration(self, name):
45    def create_blank_migration(self, name):
46        self._sync_existing_migrations()
47        mig_filename = self.new_migration_filename(name)
48        with open(mig_filename, 'w') as f:
49            f.write(MIGRATION_FILE_TEMPLATE.format(migration_up='pass', migration_down='pass'))
def create_model_migrations(self):
51    def create_model_migrations(self):
52        discovery = PAOModelDiscovery()
53        model_hash: Dict[str, type[pao_model.PAOModel]] = discovery.discover()
54
55        graph_edges = []
56
57        mig = self.build_migration(PAOMigrationModel, model_hash)
58
59        # Special collection to track migrations; should always be first migration:
60        if "pao_migrations" not in self.existing_migrations.values():
61            self.create_model_migration(PAOMigrationModel, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
62
63        for mod in model_hash.values():
64            mig = self.build_migration(mod, model_hash)
65            self.create_model_migration(mod, mig['mod_schema'], mig['hash_indexes'], mig['other_indexes'])
66            graph_edges.extend(mig['graph_edges'])
67
68        self.create_graph_migration(graph_edges)
def create_model_migration( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], mod_schema: dict, hash_indexes: Sequence, other_indexes: Sequence):
 70    def create_model_migration(
 71            self,
 72            mod: type[pao_model.PAOModel],
 73            mod_schema: dict,
 74            hash_indexes: Sequence,
 75            other_indexes: Sequence
 76    ):
 77        """
 78            Create migration for given model, schema, hash_indexes and other_indexes
 79        """
 80        coll_name = mod.collection_name()
 81        coll_var = f"{coll_name}_collection"
 82
 83        mig_filename, updating, existing_mig_filename = self._determine_filename_updating(coll_name)
 84        mig_text, schema_var = self._build_migration_file_text(
 85            coll_name=coll_name,
 86            coll_var=coll_var,
 87            mod_schema=mod_schema,
 88            hash_indexes=hash_indexes,
 89            other_indexes=other_indexes
 90        )
 91
 92        noop = False
 93        if updating:
 94            with open(self.migration_pathname.joinpath(existing_mig_filename)) as f:
 95                existing_text = f.read()
 96
 97            if existing_text == mig_text:
 98                # Don't save file
 99                noop = True
100                print("Migration is the same; skipping.")
101            else:
102                mig = []
103                mig.append(f"\n{INDENT}{coll_var}=db.collections('{coll_name}')")
104                mig.append(f"{INDENT}{coll_var}.configure(schema={schema_var})")
105                mig.append(f"{INDENT}{coll_var}_indexes={coll_var}.indexes()")
106                mig.append(f"{INDENT}[{coll_var}.delete_index(idx, ignore_missing=True) for idx in {coll_var}_indexes]")
107                prepend_text = "\n".join(mig)
108                mig_text, _ = self._build_migration_file_text(
109                    coll_name=coll_name,
110                    coll_var=coll_var,
111                    mod_schema=mod_schema,
112                    hash_indexes=hash_indexes,
113                    other_indexes=other_indexes,
114                    prepare_collection_txt=prepend_text
115                )
116
117        # Write to file:
118        if not noop:
119            pathname = self.migration_pathname.joinpath(mig_filename)
120            with open(pathname, mode="w") as file:
121                file.write(mig_text)
122            self._sync_existing_migrations()

Create migration for given model, schema, hash_indexes and other_indexes

def create_graph_migration(self, graph_edges: []):
124    def create_graph_migration(self, graph_edges: []):
125        """ Create migration for graph_edges: """
126        graph_name = os.getenv('PAO_GRAPH_NAME')
127        if graph_name is None:
128            raise ValueError("PAO_GRAPH_NAME must be defined in the environment (or a .env file)")
129
130        print("Creating migration for graph: ", graph_edges)
131        mig_text = self._build_graph_migration_text(graph_edges, graph_name)
132        graph_mig_filename = self._find_existing_migration(graph_name, suffix=".py")
133        noop = False
134        if graph_mig_filename:
135            with open(self.migration_pathname.joinpath(graph_mig_filename)) as f:
136                existing_text = f.read()
137
138            if existing_text == mig_text:
139                # Don't save file
140                noop = True
141                print("Graph Migration is the same; skipping.")
142
143        if not noop:
144            mig_filename = self.new_migration_filename(graph_name)
145            mig_pathname = self.migration_pathname.joinpath(mig_filename)
146            with open(mig_pathname, mode="w") as file:
147                file.write(mig_text)
148
149            self._sync_existing_migrations()

Create migration for graph_edges:

def new_migration_filename(self, name: str):
151    def new_migration_filename(self, name: str):
152        """ Return filename for a new migration using given name as suffix """
153        print(f"NEW_MIGRATION: [{name}]", len(self.existing_migrations))
154        print(self.existing_migrations)
155        mig_num = len(self.existing_migrations) + 1
156        mig_filename = f"{mig_num:04}_{name}.py"
157        return mig_filename

Return filename for a new migration using given name as suffix

def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence):
159    def build_index_migrations(self, coll_var: str, hash_indexes: Sequence, other_indexes: Sequence):
160        """ Build migrations for indexes """
161        up_migration = []
162        down_migration = []
163
164        # Build indexes defined on Field objects:
165        self._build_field_index_migrations(coll_var, hash_indexes, up_migration, down_migration)
166
167        # Build indexes defined as Index objects:
168        self._build_other_indexes(coll_var, other_indexes, up_migration, down_migration)
169
170        return up_migration, down_migration

Build migrations for indexes

def build_migration( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], model_hash: Dict[str, type[python_arango_ogm.db.pao_model.PAOModel]]) -> Dict[str, <built-in function any>]:
172    def build_migration(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> Dict[
173        str, any]:
174        indexes = [e for e in dir(mod) if isinstance(getattr(mod, e), Index)]
175
176        mod_schema, hash_indexes = self.build_schema(mod)
177        graph_edges = self.build_model_edges(mod, model_hash)
178        other_indexes = []
179        for oi in indexes:
180            index: Index = getattr(mod, oi)
181            other_indexes.append({
182                'fields': index.fields,
183                'index_type': index.index_type,
184                'name': index.name,
185                'expiry_seconds': index.expiry_seconds,
186                'unique': index.unique,
187            })
188
189        return {
190            'mod_schema': mod_schema,
191            'hash_indexes': hash_indexes,
192            'graph_edges': graph_edges,
193            'other_indexes': other_indexes
194        }
def build_schema( self, mod: type[python_arango_ogm.db.pao_model.PAOModel]) -> tuple[typing.Dict, typing.Sequence]:
196    def build_schema(self, mod: type[pao_model.PAOModel]) -> tuple[Dict, Sequence]:
197        required = []
198        hash_indexes = []
199        properties = {}
200
201        fields = [f for f in dir(mod) if isinstance(getattr(mod, f), pao_model.Field)]
202
203        for f in fields:
204            field: pao_model.Field = getattr(mod, f)
205            properties[f] = field.build_schema_properties()
206            if field.required:
207                required.append(f)
208            if field.index_name or field.unique:
209                hash_indexes.append({'name': field.index_name, 'fields': [f], 'unique': field.unique, })
210
211        print("MOD:", mod.__name__)
212        mod_schema = dict(
213            rule=dict(
214                properties=properties,
215                additionalProperties=mod.ADDITIONAL_PROPERTIES,
216            ),
217            level=mod.LEVEL,
218        )
219        if len(required):
220            mod_schema['rule']['required'] = required
221
222        return mod_schema, hash_indexes
def build_model_edges( self, mod: type[python_arango_ogm.db.pao_model.PAOModel], model_hash: Dict[str, type[python_arango_ogm.db.pao_model.PAOModel]]) -> Sequence[Dict]:
224    def build_model_edges(self, mod: type[pao_model.PAOModel], model_hash: Dict[str, type[pao_model.PAOModel]]) -> \
225            Sequence[Dict]:
226        """ Build model edges and return as a list of dictionaries """
227        graph_edges = []
228        edges = [e for e in dir(mod) if isinstance(getattr(mod, e), pao_model.PAOEdgeDef)]
229        for e in edges:
230            edge: pao_model.PAOEdgeDef = getattr(mod, e)
231            to_model: type[pao_model.PAOModel] = model_hash[edge.to_model] if isinstance(edge.to_model,
232                                                                                         str) else edge.to_model
233            from_name = mod.collection_name()
234            to_name = to_model.collection_name()
235            edge_name = f"{from_name}__{to_name}"
236            graph_edges.append({
237                'edge_collection': edge_name,
238                'from_vertex_collections': [from_name],
239                'to_vertex_collections': [to_name]
240            })
241        return graph_edges

Build model edges and return as a list of dictionaries