Privacy Policy
Snippets index

  PostgreSQL materialized views with Django

My implementation:

import csv
from django.db import models
from django.db import connection
from django.utils import timezone
from django.db import connection
from django.contrib.humanize.templatetags.humanize import intcomma
from ccms_lib.backend.models import CODE_MAX_LENGTH


class SqlViewMixin():

    @classmethod
    def create_view(cls, verbose=False):
        view_name = cls._meta.db_table
        cls_sql = cls.sql().strip()
        if cls_sql.endswith(';'):
            cls_sql = cls_sql[:-1]

        if cls.materialized:
            sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS "
            sql += cls_sql + ';\n'
            pk_name = cls._meta.pk.name
            sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});'
        else:
            sql = f"CREATE VIEW {view_name} AS "
            sql += cls_sql + ';\n'
        cls.execute(sql, verbose)

    @classmethod
    def drop_view(cls, verbose=False):
        view_name = cls._meta.db_table
        sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % (
            'materialized ' if cls.materialized else '',
            view_name,
        )
        pk_name = cls._meta.pk.name
        sql += f"DROP INDEX IF EXISTS idx_{view_name};"
        cls.execute(sql, verbose)

    @classmethod
    def refresh_view(cls, concurrently=False, verbose=False):
        if cls.materialized:
            #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};"
            sql = "REFRESH MATERIALIZED VIEW %s%s;" % (
                "CONCURRENTLY " if concurrently else '',
                cls._meta.db_table,
            )
            cls.execute(sql, verbose)

    @classmethod
    def execute(cls, sql, verbose):
        with connection.cursor() as cursor:
            if verbose:
                print(sql)
            cursor.execute(sql)

    @classmethod
    def export_view_as_csv(cls, verbose, filename=None, delimiter=','):

        def export_rows(cursor, sql, offset, page_size, writer, verbose):
            if page_size > 0:
                sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size)
            if verbose:
                print(sql)
            cursor.execute(sql)
            if offset <= 0:
                writer.writerow([f.name for f in cursor.description])
            for row in cursor.fetchall():
                writer.writerow(row)

        view_name = cls._meta.db_table

        if filename is None:
            filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv'

        #page_size = 100000 # 0 = no pagination
        page_size = 1000000 # 0 = no pagination
        sql = str(cls.objects.all().query)
        sql += ' ORDER BY "%s"' % cls._meta.pk.name
        n = 0
        step = 0

        if verbose:
            n = cls.objects.count()
            step = int(n / 100)
            print('Exporting file "%s"; records: %s' % (filename, intcomma(n)))

        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile,  dialect='excel', delimiter=delimiter)
            with connection.cursor() as cursor:

                if page_size <= 0:
                    # no pagination
                    cursor.execute(sql)
                    writer.writerow([f.name for f in cursor.description])
                    j = 0
                    row = cursor.fetchone()
                    while row is not None:
                        j += 1
                        if verbose and j==1 or ((j/10) % step) == 0:
                            progress = int((j * 100) / n) + 1
                            print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n)))
                        writer.writerow(row)
                        row = cursor.fetchone()

                else:
                    # paginate
                    num_pages = (n // page_size) + (1 if n % page_size else 0)
                    offset = 0
                    j = 0
                    while offset < n:

                        if verbose:
                            progress = int(((j*page_size) * 100) / n)
                            print('page %d/%d (%d%%)' % (j+1, num_pages, progress))

                        export_rows(cursor, sql, offset, page_size, writer, verbose=False)
                        offset += page_size
                        j += 1

################################################################################
# Example ...

class DwhBase(SqlViewMixin, models.Model):

    materialized = True

    id = models.CharField(null=False, max_length=256, primary_key=True)
    base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False)
    base_name = models.CharField(max_length=256, null=False)

    class Meta:
        managed = False
        db_table = "dwhm_base"

    def __str__(self):
        return str(self.id)

    @staticmethod
    def sql():
        return """
SELECT code AS id,
    code as base_code,
    description AS base_name
   FROM backend_base;
"""

References: