Brainstorm's snippets (20/279)


  How to serialize Django queryset.values() into json?
django, json
  PostgreSQL: log slow queries
postgresql
  Cutting the videos based on start and end time using ffmpeg
ffmpeg, video
  Schedule Linux server authomatic reboot once per week
linux
  NPM: Do I commit the package-lock.json file created by npm?
npm
  Switch from master to main when 'git checkout main' does not work
git
  Closing the DB connections after accessing Django ORM in secondary threads
postgresql, django, threading
  FakeLogger: a stupid simple Python logger replacement
logging
  How to Install Sass compiler on a Mac
mac, sass
  Set the directory of a BASH Script as current working direcory
bash
  Delete email spam messages using IMAP
imap
  Markdown notes
markdown
  mirror_gitlab_projects
gitlab
  Github: Checking out pull requests locally
github
  django: how to filter out GET static and media messages with logging
django, logging
  elapsed_time():: Human readable time interval between two values of time.time()
python
  Symmetrical editing fo a M2M relation in Django admin
m2m
  Sort a list of dictionaries in Python3
sort
  Prevent Mac from falling asleep
mac, sleep
  How to Install and upgrade Node.js and NPM on Ubuntu
ubuntu, npm, nodejs

  How to serialize Django queryset.values() into json?

Django's serializers can't handle a ValuesQuerySet.

However, you can serialize by using a standard json.dumps() and transforming your ValuesQuerySet to a list by using list().

If your set includes Django fields such as Decimals or UUIDs, you will need to pass in DjangoJSONEncoder.

Thus:

import json
from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpResponse

queryset = myModel.objects.filter(foo_icontains=bar).values('f1', 'f2', 'f3')
serialized_q = json.dumps(list(queryset), cls=DjangoJSONEncoder)

return HttpResponse(serialized_q, content_type='application/json')

Or just convert the ValuesQuerySet into a Python list, so you can further manipulate it:

import json
from django.core.serializers.json import DjangoJSONEncoder

queryset = myModel.objects.filter(foo_icontains=bar).values('f1', 'f2', 'f3')
data = list(queryset)

...

return JsonResponse(data, safe=False)

References:

  PostgreSQL: log slow queries

Example:

# cat /etc/postgresql/14/main/conf.d/slow_query_logging.conf

log_min_duration_statement = 5000   # expressed in [ms]
logging_collector = on

# log_directory is an absolute path, or relative to the cluster data directory,
# which can be discovered as follows:
# sudo -u postgres psql -c "SHOW data_directory"
log_directory = '/var/log/postgresql/'

log_filename = 'postgresql-14-slow-queries.log'
log_file_mode = 0644

  Cutting the videos based on start and end time using ffmpeg

ffmpeg -ss 00:01:00 -to 00:02:00 -i input.mp4 -c copy output.mp4

References:

https://stackoverflow.com/questions/18444194/cutting-the-videos-based-on-start-and-end-time-using-ffmpeg/42827058#42827058

  Schedule Linux server authomatic reboot once per week

$ cat /etc/cron.d/reboot

# Once per week, on saturday morning
0 6 * * SAT     root    /sbin/shutdown --reboot +5

  NPM: Do I commit the package-lock.json file created by npm?

post by "k0pernikus"

https://stackoverflow.com/questions/44206782/do-i-commit-the-package-lock-json-file-created-by-npm-5#56254478

Yes, you SHOULD:

  • commit the package-lock.json.
  • use npm ci instead of npm install when building your applications both on your CI and your local development machine

The npm ci workflow requires the existence of a package-lock.json.

A big downside of npm install command is its unexpected behavior that it may mutate the package-lock.json, whereas npm ci only uses the versions specified in the lockfile and produces an error

  • if the package-lock.json and package.json are out of sync
  • if a package-lock.json is missing.

Hence, running npm install locally, esp. in larger teams with multiple developers, may lead to lots of conflicts within the package-lock.json and developers to decide to completely delete the package-lock.json instead.

Yet there is a strong use-case for being able to trust that the project's dependencies resolve repeatably in a reliable way across different machines.

From a package-lock.json you get exactly that: a known-to-work state.

In the past, I had projects without package-lock.json / npm-shrinkwrap.json / yarn.lock files whose build would fail one day because a random dependency got a breaking update.

Those issue are hard to resolve as you sometimes have to guess what the last working version was.

  • If you want to add a new dependency, you still run npm install {dependency}.
  • If you want to upgrade, use either npm update {dependency} or npm install ${dependendency}@{version} and commit the changed package-lock.json.

If an upgrade fails, you can revert to the last known working package-lock.json.

  Switch from master to main when 'git checkout main' does not work

git checkout --track origin/main

See also:

Difference between git checkout --track origin/branch and git checkout -b branch origin/branch

  Closing the DB connections after accessing Django ORM in secondary threads

Since Django will create a new connection per thread when you access the ORM, and connections are left open even when the threads are terminated, we need to close the database connection explicitly when terminating a thread:

import threading
from django.db import connection


class MyThread(threading.Thread):

    def run(self):
        while keep_running:
            ...
            ...

        # When the device's communication loop exis, we still need to close the
        # database connection, since Django will create a new connection per thread
        # when you access the ORM, and connections are left open even when the threads
        # are terminated; see:
        # "Make sure you are closing the DB connections after accessing Django ORM in your threads":
        # https://james.lin.net.nz/2016/04/22/make-sure-you-are-closing-the-db-connections-after-accessing-django-orm-in-your-threads/
        connection.close()

If not, possible errors include:

  • Too many connections
  • "Database" is being accessed by other users

References:

  FakeLogger: a stupid simple Python logger replacement

import threading
import datetime
# Adapted from logging.__init__.py

CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
NOTSET = 0

_levelToName = {
    CRITICAL: 'CRITICAL',
    ERROR: 'ERROR',
    WARNING: 'WARNING',
    INFO: 'INFO',
    DEBUG: 'DEBUG',
    NOTSET: 'NOTSET',
}
_nameToLevel = {
    'CRITICAL': CRITICAL,
    'ERROR': ERROR,
    'WARNING': WARNING,
    'INFO': INFO,
    'DEBUG': DEBUG,
    'NOTSET': NOTSET,
}

_lock = threading.RLock()

def _acquireLock():
    """
    Acquire the module-level lock for serializing access to shared data.

    This should be released with _releaseLock().
    """
    if _lock:
        _lock.acquire()

def _releaseLock():
    """
    Release the module-level lock acquired by calling _acquireLock().
    """
    if _lock:
        _lock.release()


class FakeLogger():

    def __init__(self, level=NOTSET):
        self.setLevel(level)

    @staticmethod
    def verbosity_to_log_level(verbosity):
        levels = [WARNING, INFO, DEBUG]
        log_level = levels[min(len(levels)-1, verbosity)]  # capped to number of levels
        return log_level

    def setLevel(self, level):
        self.level = level
        self.disabled = level <= NOTSET

    def _log(self, level, msg, args, **kwargs):
        _acquireLock()
        try:

            msg = str(msg)
            if args:
                msg = msg % args

            # [DEBUG] 2022-03-14 12:08:05,044|MainThread|Server binded to 0.0.0.0:6000
            level_name = '[' + _levelToName[level] + ']'
            line = '%-10.10s %s|%s|%s' % (
                level_name,
                datetime.datetime.now().isoformat().replace('T',' '),
                threading.currentThread().getName(),
                msg
            )

            print(line, flush=True)
        finally:
            _releaseLock()
        return

    def isEnabledFor(self, level):
        """
        Is this logger enabled for level 'level'?
        """
        if self.disabled:
            return False

        _acquireLock()
        try:
            is_enabled = (level >= self.level)
        finally:
            _releaseLock()
        return is_enabled

    def debug(self, msg, *args, **kwargs):
        if self.isEnabledFor(DEBUG):
            self._log(DEBUG, msg, args, **kwargs)

    def info(self, msg, *args, **kwargs):
        if self.isEnabledFor(INFO):
            self._log(INFO, msg, args, **kwargs)

    def warning(self, msg, *args, **kwargs):
        if self.isEnabledFor(WARNING):
            self._log(WARNING, msg, args, **kwargs)

    def error(self, msg, *args, **kwargs):
        if self.isEnabledFor(ERROR):
            self._log(ERROR, msg, args, **kwargs)

    # def exception(self, msg, *args, exc_info=True, **kwargs):
    #     """
    #     Convenience method for logging an ERROR with exception information.
    #     """
    #     self.error(msg, *args, exc_info=exc_info, **kwargs)

    def critical(self, msg, *args, **kwargs):
        if self.isEnabledFor(CRITICAL):
            self._log(CRITICAL, msg, args, **kwargs)


if __name__ == '__main__':

    print('With Level: ERROR ...')
    logger = FakeLogger(level=ERROR)
    logger.debug('...debug...')
    logger.info('...info...')
    logger.warning('...warning...')
    logger.error('...error...')
    logger.critical('...critical...')

    print('With Level: DEBUG ...')
    logger.setLevel(DEBUG)
    logger.debug('...debug...')
    logger.info('...info...')
    logger.warning('...warning...')
    logger.error('...error...')
    logger.critical('...critical...')

  How to Install Sass compiler on a Mac

Check if ruby is available:

ruby -v

If not, install it:

brew install ruby

Install Sass:

sudo gem install sass

In case of SSL error "SSL Error When installing rubygems, Unable to pull data from 'https://rubygems.org/", try this:

sudo gem sources -r https://rubygems.org
sudo gem sources -a http://rubygems.org

the again:

sudo gem install sass

  Set the directory of a BASH Script as current working direcory

#!/bin/bash

# Set the directory of this script as current working direcory
cd "$(dirname "$0")"

See also:

https://stackoverflow.com/questions/3349105/how-can-i-set-the-current-working-directory-to-the-directory-of-the-script-in-ba

  Delete email spam messages using IMAP

import imaplib
import email
import pprint
import datetime

def delete_spam(imap, date, dry_run):

    spam_subjects = [
        '[SPAM]',
        'Suspect subject',
        # and so on ...
    ]

    dt_from = date.strftime('%d-%b-%Y')
    dt_to =  (date + datetime.timedelta(days=1)).strftime('%d-%b-%Y')
    for spam_subject in spam_subjects:

        # List messages with given subject in specified "date";
        # Matching should already be partial and case insensitive (TODO: check this)
        pattern = 'SUBJECT "%s" SINCE "%s" BEFORE "%s"' % (
            spam_subject,
            dt_from,
            dt_to,
        )

        result, mails_data = imap.search(None, pattern)
        mails_id_list = mails_data[0].split()
        #print(mails_id_list)
        print('deleting %d messages for: %s ...' % (len(mails_id_list), pattern))

        # for i in mails_id_list:
        #     result, mail_data = imap.fetch(i, "(RFC822)")
        #     raw_email = mail_data[0][1].decode()
        #     this_email = email.message_from_string(raw_email)
        #     print(this_email.get('subject'))

        if not dry_run:
            for num in mails_id_list:
               imap.store(num, '+FLAGS', '\\Deleted')
            imap.expunge()


imap_host = 'ssl0.xyz.net'
imap_user = 'info@whatever.it'
imap_pass = '*******************'

imap = imaplib.IMAP4_SSL(imap_host)
imap.login(imap_user, imap_pass)
imap.select('Inbox')

# Elaborate from today and backward for the last 30 days (for example)
first_date = datetime.date.today()
last_date = first_date - datetime.timedelta(days=30)

date = first_date
while True:
    print(date)
    delete_spam(imap, date, dry_run=False)
    date = date - datetime.timedelta(days=1)
    if date < last_date:
        break

imap.close()

  Markdown notes

Table of contents:

[[_TOC_]]

  mirror_gitlab_projects

#!/usr/bin/env python3
import gitlab
import os
import sys
import signal
import argparse
import logging
# requires: python-gitlab


GITLAB_URL = 'https://gitlab.somewhere.com'
PRIVATE_TOKEN = '********************'


# Get an instance of a logger
logger = logging.getLogger('main_module')
gl = gitlab.Gitlab(GITLAB_URL, private_token=PRIVATE_TOKEN)


def say_cwd():
    logger.debug('cwd: "%s"', os.getcwd())

def run_command(command):
    logger.info(command)
    rc = os.system(command)
    return rc

def new_dir(name):
    say_cwd()
    logger.info('mkdir: "%s"' % name)
    os.mkdir(name)

def clone_repo(url):
    say_cwd()
    run_command("git clone " + url)

def fetch_all():
    say_cwd()
    run_command("git fetch --all")


def mirror_project(group, project, wiki):
    logger.info('handle %s ...', 'wiki' if wiki else 'code')
    cwd = os.getcwd()
    say_cwd()

    try:
        # Move into group folder
        if not os.path.isdir(group.path):
            new_dir(group.path)
        os.chdir(group.path)

        # Select either code repo or wiki
        path = project.path
        url = project.ssh_url_to_repo
        if wiki:
            path += '.wiki'
            url = url[:-4] + '.wiki.git'

        # Clone repo, in case
        if not os.path.isdir(path):
            clone_repo(url)

        # Update repo
        os.chdir(path)
        fetch_all()

    finally:
        os.chdir(cwd)


def signal_handler(signal, frame):
    sys.exit(0)


def main():

    signal.signal(signal.SIGINT, signal_handler)

    parser = argparse.ArgumentParser(description='Clone and/or fetch repos and wikis from remote Gitlab host')
    parser.add_argument('-l', '--logfile', metavar='logfile', help='log filename; defaults to stdout')
    parser.add_argument('-v', '--verbosity', type=int, choices=range(4), default=2, action='store', help="log verbosity level")
    parser.add_argument('-g', '--group', help='filter by group path')
    args = parser.parse_args()

    # Setup logging
    loglevel = logging.WARN
    if args.verbosity == 0:
        loglevel = logging.ERROR
    elif args.verbosity == 1:  # default
        loglevel = logging.WARN
    elif args.verbosity == 2:
        loglevel = logging.INFO
    elif args.verbosity > 2:
        loglevel = logging.DEBUG

    logging.basicConfig(
        filename=args.logfile,
        level=loglevel,
        format='%(asctime)s|%(levelname)-8s| %(message)s',
    )

    filter_group = args.group

    path = os.path.realpath(__file__)
    os.chdir(os.path.split(path)[0])
    say_cwd()

    groups = gl.groups.list(per_page=1000)
    for g in groups:

        group = gl.groups.get(g.id)
        if filter_group is not None and filter_group != group.path:
            continue

        logger.info('GROUP:   "%s"', group.path)

        projects = group.projects.list(per_page=1000)
        for project in projects:
            logger.info('PROJECT: "%s/%s"', group.path, project.path)
            try:
                mirror_project(group, project, wiki=False)
            except Exception as e:
                logger.error(str(e))
            try:
                mirror_project(group, project, wiki=True)
            except Exception as e:
                logger.error(str(e))


if __name__ == '__main__':
    # main()

  Github: Checking out pull requests locally

  • Find the ID number of the pull request. This is the sequence of digits right after the pull request's title.

  • Fetch the reference to the pull request based on its ID number, creating a new branch in the process:

    $ git fetch origin pull/ID/head:BRANCHNAME
    
  • Switch to the new branch that's based on this pull request:

    $ git checkout BRANCHNAME
    
  • At this point, you can do anything you want with this branch. You can run some local tests, or merge other branches into the branch.

When you're ready, you can push the new branch up:

$ git push origin BRANCHNAME

References: Modifying an inactive pull request locally

  django: how to filter out GET static and media messages with logging

# "django: how to filter out GET static and media messages with logging?"
# https://stackoverflow.com/questions/23833642/django-how-to-filter-out-get-static-and-media-messages-with-logging#41620949
def skip_static_requests(record):
    if record.args[0].startswith('GET /static/'):  # filter whatever you want
        return False
    return True

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'filters': {
        # use Django's built in CallbackFilter to point to your filter
        'skip_static_requests': {
            '()': 'django.utils.log.CallbackFilter',
            'callback': skip_static_requests
        }
    },
    'formatters': {
        # django's default formatter
        'django.server': {
            '()': 'django.utils.log.ServerFormatter',
            'format': '[%(server_time)s] %(message)s',
        }
    },
    'handlers': {
        # django's default handler...
        'django.server': {
            'level': 'INFO',
            'filters': ['skip_static_requests'],  # <- ...with one change
            'class': 'logging.StreamHandler',
            'formatter': 'django.server',
        },
    },
    'loggers': {
        # django's default logger
        'django.server': {
            'handlers': ['django.server'],
            'level': 'INFO',
            'propagate': False,
        },
    }
}

  elapsed_time():: Human readable time interval between two values of time.time()

import math
import time
import datetime

def elapsed_time(t0, t1, with_milliseconds=False):
    """
    Human readable time interval between two values of time.time()
    """
    milliseconds, seconds = math.modf(t1 - t0)
    #dt = time.strftime("%H:%M:%S", time.gmtime(seconds))
    dt = "{:0>8}".format(str(datetime.timedelta(seconds=seconds)))
    if with_milliseconds:
        dt += ('%.3f' % milliseconds)[1:]
    return dt

One liner

import time, datetime
t0 = time.time()
...

dt = str(datetime.timedelta(seconds=time.time() - t0))

  Symmetrical editing fo a M2M relation in Django admin

Given:

class Team(models.Model):

    name = models.CharField(_('name'), max_length=150, unique=True)
    jobs = models.ManyToManyField(Job, verbose_name=u'Jobs', blank=True,
        related_name='teams')


@admin.register(Team)
class TeamAdmin(admin.ModelAdmin):
    filter_horizontal = ['jobs',]
    ...

Django already provides a widget for editing the relation in the Team change form.

To have a similar behaviour in the Job change form as well, proceed as follows:

from django import forms
from django.contrib.admin.widgets import FilteredSelectMultiple


class JobAdminForm(forms.ModelForm):
    teams = forms.ModelMultipleChoiceField(
        queryset=Team.objects.all(),
        required=False,
        widget=FilteredSelectMultiple(
            verbose_name=_('Teams'),
            is_stacked=False
        )
    )

    class Meta:
        model = Job
        exclude = []

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if self.instance and self.instance.pk:
            self.fields['teams'].initial = self.instance.teams.all()

    def save(self, commit=True):
        job = super().save(commit=commit)
        if commit:
            job.teams = self.cleaned_data['teams']
        else:
            old_save_m2m = self.save_m2m
            def new_save_m2m():
                old_save_m2m()
                job.teams.set(self.cleaned_data['teams'])
            self.save_m2m = new_save_m2m
        return job


@admin.register(Job)
class JobAdmin(BaseModelAdmin):
    form = JobAdminForm
    ...

Similarly, you can even add multiple M2M relation to the JobAdminForm:

class JobAdminForm(forms.ModelForm):
    users = forms.ModelMultipleChoiceField(
        queryset=User.objects.all(),
        required=False,
        widget=FilteredSelectMultiple(
            verbose_name=_('Users'),
            is_stacked=False
        )
    )
    teams = forms.ModelMultipleChoiceField(
        queryset=Team.objects.all(),
        required=False,
        widget=FilteredSelectMultiple(
            verbose_name=_('Teams'),
            is_stacked=False
        )
    )

    class Meta:
        model = Job
        exclude = []

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if self.instance and self.instance.pk:
            self.fields['users'].initial = self.instance.users.all()
            self.fields['teams'].initial = self.instance.teams.all()

    def save(self, commit=True):
        job = super().save(commit=commit)
        if commit:
            job.users = self.cleaned_data['users']
            job.teams = self.cleaned_data['teams']
        else:
            old_save_m2m = self.save_m2m
            def new_save_m2m():
                old_save_m2m()
                job.users.set(self.cleaned_data['users'])
                job.teams.set(self.cleaned_data['teams'])
            self.save_m2m = new_save_m2m
        return job

  Sort a list of dictionaries in Python3

import functools


def sort_results(data):
    """
    Sort on "anno", than on "label"
    """

    def mycmp(a, b):
        if a < b:
            return -1
        elif a > b:
            return 1
        return 0

    def compare(x, y):
        if x['anno'] == y['anno']:
            return mycmp(x['label'], y['label'])
        return mycmp(x['anno'], y['anno'])

    data.sort(key=functools.cmp_to_key(compare), reverse=False)

  Prevent Mac from falling asleep

The caffeinate command in terminal keeps the Mac from falling asleep.

For indefinite:

caffeinate

For a specific time, eg 5 hours:

caffeinate -i -t 18000

  How to Install and upgrade Node.js and NPM on Ubuntu

Installation (on Ubuntu 18)

sudo apt-get install nodejs
sudo apt-get install npm

Results:

$ which node
/usr/bin/node
$ which nodejs
/usr/bin/nodejs
$ which npm
/usr/bin/npm
$ node -v
v8.10.0
$ nodejs -v
v8.10.0
$ npm -v
3.5.2

Upgrade (on Ubuntu 18)

npm install -g n
n stable

Results:

$ which node
/usr/local/bin/node
$ which nodejs
/usr/bin/nodejs
$ which npm
/usr/local/bin/npm
$ node -v
v14.16.0
$ nodejs -v
v8.10.0
$ npm -v
6.14.11