PostgreSQL materialized views with Django ¶
My implementation:
import csv from django.db import models from django.db import connection from django.utils import timezone from django.db import connection from django.contrib.humanize.templatetags.humanize import intcomma from ccms_lib.backend.models import CODE_MAX_LENGTH class SqlViewMixin(): @classmethod def create_view(cls, verbose=False): view_name = cls._meta.db_table cls_sql = cls.sql().strip() if cls_sql.endswith(';'): cls_sql = cls_sql[:-1] if cls.materialized: sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS " sql += cls_sql + ';\n' pk_name = cls._meta.pk.name sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});' else: sql = f"CREATE VIEW {view_name} AS " sql += cls_sql + ';\n' cls.execute(sql, verbose) @classmethod def drop_view(cls, verbose=False): view_name = cls._meta.db_table sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % ( 'materialized ' if cls.materialized else '', view_name, ) pk_name = cls._meta.pk.name sql += f"DROP INDEX IF EXISTS idx_{view_name};" cls.execute(sql, verbose) @classmethod def refresh_view(cls, concurrently=False, verbose=False): if cls.materialized: #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};" sql = "REFRESH MATERIALIZED VIEW %s%s;" % ( "CONCURRENTLY " if concurrently else '', cls._meta.db_table, ) cls.execute(sql, verbose) @classmethod def execute(cls, sql, verbose): with connection.cursor() as cursor: if verbose: print(sql) cursor.execute(sql) @classmethod def export_view_as_csv(cls, verbose, filename=None, delimiter=','): def export_rows(cursor, sql, offset, page_size, writer, verbose): if page_size > 0: sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size) if verbose: print(sql) cursor.execute(sql) if offset <= 0: writer.writerow([f.name for f in cursor.description]) for row in cursor.fetchall(): writer.writerow(row) view_name = cls._meta.db_table if filename is None: filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv' #page_size = 100000 # 0 = no pagination page_size = 1000000 # 0 = no pagination sql = str(cls.objects.all().query) sql += ' ORDER BY "%s"' % cls._meta.pk.name n = 0 step = 0 if verbose: n = cls.objects.count() step = int(n / 100) print('Exporting file "%s"; records: %s' % (filename, intcomma(n))) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, dialect='excel', delimiter=delimiter) with connection.cursor() as cursor: if page_size <= 0: # no pagination cursor.execute(sql) writer.writerow([f.name for f in cursor.description]) j = 0 row = cursor.fetchone() while row is not None: j += 1 if verbose and j==1 or ((j/10) % step) == 0: progress = int((j * 100) / n) + 1 print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n))) writer.writerow(row) row = cursor.fetchone() else: # paginate num_pages = (n // page_size) + (1 if n % page_size else 0) offset = 0 j = 0 while offset < n: if verbose: progress = int(((j*page_size) * 100) / n) print('page %d/%d (%d%%)' % (j+1, num_pages, progress)) export_rows(cursor, sql, offset, page_size, writer, verbose=False) offset += page_size j += 1 ################################################################################ # Example ... class DwhBase(SqlViewMixin, models.Model): materialized = True id = models.CharField(null=False, max_length=256, primary_key=True) base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False) base_name = models.CharField(max_length=256, null=False) class Meta: managed = False db_table = "dwhm_base" def __str__(self): return str(self.id) @staticmethod def sql(): return """ SELECT code AS id, code as base_code, description AS base_name FROM backend_base; """
References: