PyCon Italia 2019
Internet delle cose con Redis e django-channels
Development of vertical software solutions for medium to large businesses
Intranet, extranet, web services and IOT
Focused on technologies such as Python, Django, Redis, WebSockets, HTMX, IoT.
Real-time data acquisition, from Arduino to the web, using PubSub with Redis, Django and other friends
A Django app which provides advanced integration for a Django project with the jQuery Javascript library DataTables.net, when used in server-side processing mode.
Demo site
A Django helper app to add editing capabilities to the frontend using modal forms.
Demo site
A Django app to run new background tasks from either admin or cron, and inspect task history from admin; based on django-rq
A collection of tools to trace, analyze and render Querysets
Helper used to remove oldest records from specific db tables in a Django project
.
Strategy:
function print(text) { element.textContent = text } function returnValueAfterDelay(value, delay) { let promise = new Promise(function(resolve, reject) { function callback() { resolve(value) } setTimeout(callback, delay); }) return promise } function main() { returnValueAfterDelay(123, 1000).then( function(value) { print(value) } ) } main()
or the similar version using "arrow functions" instead of "regular functions" ... (but does anyone really think it is more readable?) ...
print = (text) => { element.textContent = text } let returnValueAfterDelay = (value, delay) => { let promise = new Promise((resolve, reject) => { let callback = () => resolve(value) setTimeout(callback, delay) }) return promise } let main = () => { returnValueAfterDelay(123, 1000).then( (value) => print(value) ) } main()
However:
It is recommended to use regular functions when dealing with Promises, Callback functions with dynamic context, and Object methods.
.
Strategy:
function print(text) { element.textContent = text } function returnValueAfterDelay(value, delay) { let promise = new Promise(function(resolve, reject) { function callback() { resolve(value) } setTimeout(callback, delay); }) return promise } function main() { returnValueAfterDelay(123, 1000).then( function(value) { print(value) } ) } main()
or the similar version using "arrow functions" instead of "regular functions" ... (but does anyone really think it is more readable?) ...
print = (text) => { element.textContent = text } let returnValueAfterDelay = (value, delay) => { let promise = new Promise((resolve, reject) => { let callback = () => resolve(value) setTimeout(callback, delay) }) return promise } let main = () => { returnValueAfterDelay(123, 1000).then( (value) => print(value) ) } main()
However:
It is recommended to use regular functions when dealing with Promises, Callback functions with dynamic context, and Object methods.
Using requests:
import requests # https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests#16696317 def download_file(url): local_filename = url.split('/')[-1] # NOTE the stream=True parameter below with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. #if chunk: f.write(chunk) return local_filename
Using urlib.request:
import urllib.request def download_file2(url): """ Use urlib.request instead of requests to avoid "403 Forbidden" errors when the remote server tries to prevent scraping (for example when is protected behind cloudflare) """ request = urllib.request.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:106.0) Gecko/20100101 Firefox/106.0') request.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8') request.add_header('Accept-Language', 'en-US,en;q=0.5') response = urllib.request.urlopen(request) local_filename = url.split('/')[-1] chunk_size = 8192 with open(local_filename, 'wb') as f: size = 0 while True: info = response.read(chunk_size) if len(info) < 1: break size = size + len(info) f.write(info) return local_filename
Using requests:
import requests # https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests#16696317 def download_file(url): local_filename = url.split('/')[-1] # NOTE the stream=True parameter below with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. #if chunk: f.write(chunk) return local_filename
Using urlib.request:
import urllib.request def download_file2(url): """ Use urlib.request instead of requests to avoid "403 Forbidden" errors when the remote server tries to prevent scraping (for example when is protected behind cloudflare) """ request = urllib.request.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:106.0) Gecko/20100101 Firefox/106.0') request.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8') request.add_header('Accept-Language', 'en-US,en;q=0.5') response = urllib.request.urlopen(request) local_filename = url.split('/')[-1] chunk_size = 8192 with open(local_filename, 'wb') as f: size = 0 while True: info = response.read(chunk_size) if len(info) < 1: break size = size + len(info) f.write(info) return local_filename
from ninja import Router v1_router = Router(tags=["cclib"]) @v1_router.get("/hello") def hello(request): return "Hello world!"
from typing import List import uuid from django.shortcuts import get_object_or_404 from ninja import Router from ninja import Schema, ModelSchema from ninja.security import django_auth from cclib.resources.models import Document v1_router = Router(tags=["cclib"]) class DocumentSchema(ModelSchema): class Meta: model = Document fields = "__all__" @v1_router.get("/documents/{uuid:document_id}", response=DocumentSchema, auth=django_auth) def get_document(request, document_id: uuid.UUID): return get_object_or_404(Document, pk=document_id) @v1_router.get("/documents", response=List[DocumentSchema], auth=django_auth) def list_documents(request): qs = Document.objects.all() return qs
There is also often a need to return responses with some nested/child objects ...
https://django-ninja.dev/guides/response/#nested-objects
See also:
Example:
import traceback from typing import List from ninja import Router from ninja import Schema, ModelSchema from ninja.errors import HttpError from ninja.security import django_auth from ninja.security import django_auth_superuser from django.conf import settings from django.contrib.auth.models import Group from django.contrib.auth import get_user_model v1_router = Router(tags=["ccserver"]) class GroupSchema(ModelSchema): class Meta: model = Group fields = "__all__" class UserSchema(ModelSchema): # See: "Nested objects" # https://django-ninja.dev/guides/response/#nested-objects groups: List[GroupSchema] = [] class Meta: model = get_user_model() #fields = "__all__" fields = [ 'id', 'username', ] @v1_router.get("/users", response=List[UserSchema], auth=django_auth_superuser) def list_users(request): try: User = get_user_model() qs = User.objects.all() except Exception as e: # See: "Throwing HTTP responses with exceptions" # https://django-ninja.dev/guides/errors/#throwing-http-responses-with-exceptions raise HttpError(500, str(e) + "\n" + traceback.format_exc()) return qs
Instead of a nested response, you may want to just flatten the response output.
The Ninja Schema object extends Pydantic's Field(..., alias="") format to work with dotted responses.
https://django-ninja.dev/guides/authentication/
Examples:
from ninja import NinjaAPI from ninja.security import django_auth api = NinjaAPI(csrf=True) @api.get("/pets", auth=django_auth) def pets(request): return f"Authenticated user {request.auth}"
If you need to authorize only a superuser, you can use from ninja.security import django_auth_superuser instead.
Here's an example where the client, in order to authenticate, needs to pass a header:
Authorization: Bearer supersecret:
from ninja.security import HttpBearer class AuthBearer(HttpBearer): def authenticate(self, request, token): if token == "supersecret": return token @api.get("/bearer", auth=AuthBearer()) def bearer(request): return {"token": request.auth}
As an alternative to custom exceptions and writing handlers for it - you can as well throw http exception that will lead to returning a http response with desired code
https://django-ninja.dev/guides/errors/#throwing-http-responses-with-exceptions
from ninja.errors import HttpError @api.get("/some/resource") def some_operation(request): if True: raise HttpError(503, "Service Unavailable. Please retry later.")
Have separate routers for each domain:
https://django-ninja.dev/guides/routers/
See also the following discussion:
https://www.reddit.com/r/django/comments/13zg8uw/django_ninja_architecture/
from ninja import Router v1_router = Router(tags=["cclib"]) @v1_router.get("/hello") def hello(request): return "Hello world!"
from typing import List import uuid from django.shortcuts import get_object_or_404 from ninja import Router from ninja import Schema, ModelSchema from ninja.security import django_auth from cclib.resources.models import Document v1_router = Router(tags=["cclib"]) class DocumentSchema(ModelSchema): class Meta: model = Document fields = "__all__" @v1_router.get("/documents/{uuid:document_id}", response=DocumentSchema, auth=django_auth) def get_document(request, document_id: uuid.UUID): return get_object_or_404(Document, pk=document_id) @v1_router.get("/documents", response=List[DocumentSchema], auth=django_auth) def list_documents(request): qs = Document.objects.all() return qs
There is also often a need to return responses with some nested/child objects ...
https://django-ninja.dev/guides/response/#nested-objects
See also:
Example:
import traceback from typing import List from ninja import Router from ninja import Schema, ModelSchema from ninja.errors import HttpError from ninja.security import django_auth from ninja.security import django_auth_superuser from django.conf import settings from django.contrib.auth.models import Group from django.contrib.auth import get_user_model v1_router = Router(tags=["ccserver"]) class GroupSchema(ModelSchema): class Meta: model = Group fields = "__all__" class UserSchema(ModelSchema): # See: "Nested objects" # https://django-ninja.dev/guides/response/#nested-objects groups: List[GroupSchema] = [] class Meta: model = get_user_model() #fields = "__all__" fields = [ 'id', 'username', ] @v1_router.get("/users", response=List[UserSchema], auth=django_auth_superuser) def list_users(request): try: User = get_user_model() qs = User.objects.all() except Exception as e: # See: "Throwing HTTP responses with exceptions" # https://django-ninja.dev/guides/errors/#throwing-http-responses-with-exceptions raise HttpError(500, str(e) + "\n" + traceback.format_exc()) return qs
Instead of a nested response, you may want to just flatten the response output.
The Ninja Schema object extends Pydantic's Field(..., alias="") format to work with dotted responses.
https://django-ninja.dev/guides/authentication/
Examples:
from ninja import NinjaAPI from ninja.security import django_auth api = NinjaAPI(csrf=True) @api.get("/pets", auth=django_auth) def pets(request): return f"Authenticated user {request.auth}"
If you need to authorize only a superuser, you can use from ninja.security import django_auth_superuser instead.
Here's an example where the client, in order to authenticate, needs to pass a header:
Authorization: Bearer supersecret:
from ninja.security import HttpBearer class AuthBearer(HttpBearer): def authenticate(self, request, token): if token == "supersecret": return token @api.get("/bearer", auth=AuthBearer()) def bearer(request): return {"token": request.auth}
As an alternative to custom exceptions and writing handlers for it - you can as well throw http exception that will lead to returning a http response with desired code
https://django-ninja.dev/guides/errors/#throwing-http-responses-with-exceptions
from ninja.errors import HttpError @api.get("/some/resource") def some_operation(request): if True: raise HttpError(503, "Service Unavailable. Please retry later.")
Have separate routers for each domain:
https://django-ninja.dev/guides/routers/
See also the following discussion:
https://www.reddit.com/r/django/comments/13zg8uw/django_ninja_architecture/
sudo dd if=/dev/zero of=testfile bs=1G count=1 oflag=dsync && sudo rm testfile
sudo dd if=/dev/zero of=testfile bs=1G count=1 oflag=dsync && sudo rm testfile
Quick and easy:
fetch('https://reqbin.com/echo/get/json', { method: 'GET', headers: { 'Accept': 'application/json', }, }) .then(response => response.json()) .then(response => console.log(JSON.stringify(response)))
The following is more elaborated with the purpose to collect all errors is a single catch handler:
In the latter case, the response might contain further details JSON-serialized and we want to inspect them
fetch( '/backend/j/mto_evaluate_final_stock_total', { method: "post", body: data, mode: 'cors', // 'no-cors', cache: 'no-cache', credentials: 'same-origin', headers: { // make sure request.is_ajax() return True on the server 'X-Requested-With': 'XMLHttpRequest', 'X-CSRFToken': getCookie('csrftoken') } } ) .then(function(response) { if (response.ok) { return response.json() } // if not 200-OK, reject instead of throw // Here, the plan is to get {status: 4XX, message: 'a message'} format as a result in both cases. // See: https://stackoverflow.com/questions/38235715/fetch-reject-promise-and-catch-the-error-if-status-is-not-ok#67660773 return Promise.reject(response) }) .then(function(data) { console.log(data) ... use data ... }) .catch(function(error) { // Show error console.error(error.status, error.statusText); // also get error messages, if any // Again, see: https://stackoverflow.com/questions/38235715/fetch-reject-promise-and-catch-the-error-if-status-is-not-ok#67660773 error.json().then(function(json) { console.error(json); }) }) .finally(function() { })
// bind to the form’s submit event form.addEventListener('submit', function(event) { // prevent the form from performing its default submit action event.preventDefault(); header.classList.add('loading'); // serialize the form’s content and send via an AJAX call // using the form’s defined method and action let url = form.getAttribute('action') || self.options.url; let method = form.getAttribute('method') || 'post'; // We use FormData // to allow files upload (i.e. process <input type="file"> as expected) // Note that, using FormData, we also need (with jQuery): // - processData: false // - contentType: false let data = new FormData(form); //console.log('form data: %o', new URLSearchParams(data).toString()); self._notify('submitting', {method: method, url: url, data:data}); let promise = fetch( self.options.url, { method: "post", body: data, mode: 'cors', // 'no-cors', cache: 'no-cache', credentials: 'same-origin', headers: { // make sure request.is_ajax() return True on the server 'X-Requested-With': 'XMLHttpRequest', 'X-CSRFToken': getCookie('csrftoken') } } ); promise.then(response => { if (response.ok) { // Upon receiving a JSON response, we assume that the form has been validated, // so we can close the modal if (response.headers.get('Content-Type') === 'application/json') { response.json().then(data => { self._notify('submitted', {method: method, url: url, data: data}); self.close(); }) .catch(error => { FrontendForms.display_server_error(error); }) } else { response.text().then(data => { // update the modal body with the new form body.innerHTML = data; // Does the response contain a form ? let form = self.element.querySelector('.dialog-content .dialog-body form'); if (form !== null) { // If the server sends back a successful response, // we need to further check the HTML received // If xhr contains any field errors, // the form did not validate successfully, // so we keep it open for further editing //if (jQuery(xhr).find('.has-error').length > 0) { if (form.querySelectorAll('.has-error').length > 0 || form.querySelectorAll('.errorlist').length > 0) { self._notify('loaded', {url: url}); self._form_ajax_submit(true); } else { // otherwise, we've done and can close the modal self._notify('submitted', {method: method, url: url, data: data}); self.close(); } } // If not, assume we received a feedback for the user after successfull submission, so: // - keep the dialog open // - hide the save button else { // We also notify the user about successful submission self._notify('submitted', {method: method, url: url, data: data}); btn_save.style.display = 'none'; } }); } } else { self._notify('submission_failure', {method: method, url: url, data: data, error: response.statusText}); FrontendForms.display_server_error(response.statusText); } }).catch(error => { self._notify('submission_failure', {method: method, url: url, data: data, error:error}); FrontendForms.display_server_error(error.toString()); }).finally(() => { header.classList.remove('loading'); });
where:
function getCookie(name) { var value = '; ' + document.cookie, parts = value.split('; ' + name + '='); if (parts.length == 2) return parts.pop().split(';').shift(); }
Quick and easy:
fetch('https://reqbin.com/echo/get/json', { method: 'GET', headers: { 'Accept': 'application/json', }, }) .then(response => response.json()) .then(response => console.log(JSON.stringify(response)))
The following is more elaborated with the purpose to collect all errors is a single catch handler:
In the latter case, the response might contain further details JSON-serialized and we want to inspect them
fetch( '/backend/j/mto_evaluate_final_stock_total', { method: "post", body: data, mode: 'cors', // 'no-cors', cache: 'no-cache', credentials: 'same-origin', headers: { // make sure request.is_ajax() return True on the server 'X-Requested-With': 'XMLHttpRequest', 'X-CSRFToken': getCookie('csrftoken') } } ) .then(function(response) { if (response.ok) { return response.json() } // if not 200-OK, reject instead of throw // Here, the plan is to get {status: 4XX, message: 'a message'} format as a result in both cases. // See: https://stackoverflow.com/questions/38235715/fetch-reject-promise-and-catch-the-error-if-status-is-not-ok#67660773 return Promise.reject(response) }) .then(function(data) { console.log(data) ... use data ... }) .catch(function(error) { // Show error console.error(error.status, error.statusText); // also get error messages, if any // Again, see: https://stackoverflow.com/questions/38235715/fetch-reject-promise-and-catch-the-error-if-status-is-not-ok#67660773 error.json().then(function(json) { console.error(json); }) }) .finally(function() { })
// bind to the form’s submit event form.addEventListener('submit', function(event) { // prevent the form from performing its default submit action event.preventDefault(); header.classList.add('loading'); // serialize the form’s content and send via an AJAX call // using the form’s defined method and action let url = form.getAttribute('action') || self.options.url; let method = form.getAttribute('method') || 'post'; // We use FormData // to allow files upload (i.e. process <input type="file"> as expected) // Note that, using FormData, we also need (with jQuery): // - processData: false // - contentType: false let data = new FormData(form); //console.log('form data: %o', new URLSearchParams(data).toString()); self._notify('submitting', {method: method, url: url, data:data}); let promise = fetch( self.options.url, { method: "post", body: data, mode: 'cors', // 'no-cors', cache: 'no-cache', credentials: 'same-origin', headers: { // make sure request.is_ajax() return True on the server 'X-Requested-With': 'XMLHttpRequest', 'X-CSRFToken': getCookie('csrftoken') } } ); promise.then(response => { if (response.ok) { // Upon receiving a JSON response, we assume that the form has been validated, // so we can close the modal if (response.headers.get('Content-Type') === 'application/json') { response.json().then(data => { self._notify('submitted', {method: method, url: url, data: data}); self.close(); }) .catch(error => { FrontendForms.display_server_error(error); }) } else { response.text().then(data => { // update the modal body with the new form body.innerHTML = data; // Does the response contain a form ? let form = self.element.querySelector('.dialog-content .dialog-body form'); if (form !== null) { // If the server sends back a successful response, // we need to further check the HTML received // If xhr contains any field errors, // the form did not validate successfully, // so we keep it open for further editing //if (jQuery(xhr).find('.has-error').length > 0) { if (form.querySelectorAll('.has-error').length > 0 || form.querySelectorAll('.errorlist').length > 0) { self._notify('loaded', {url: url}); self._form_ajax_submit(true); } else { // otherwise, we've done and can close the modal self._notify('submitted', {method: method, url: url, data: data}); self.close(); } } // If not, assume we received a feedback for the user after successfull submission, so: // - keep the dialog open // - hide the save button else { // We also notify the user about successful submission self._notify('submitted', {method: method, url: url, data: data}); btn_save.style.display = 'none'; } }); } } else { self._notify('submission_failure', {method: method, url: url, data: data, error: response.statusText}); FrontendForms.display_server_error(response.statusText); } }).catch(error => { self._notify('submission_failure', {method: method, url: url, data: data, error:error}); FrontendForms.display_server_error(error.toString()); }).finally(() => { header.classList.remove('loading'); });
where:
function getCookie(name) { var value = '; ' + document.cookie, parts = value.split('; ' + name + '='); if (parts.length == 2) return parts.pop().split(';').shift(); }
#!/usr/bin/env python3 import pprint import json import gitlab import argparse # Requirements # python-gitlab==4.11.1 GITLAB_URL = "https://gitlab.brainstorm.it" GITLAB_PRIVATE_TOKEN = "glpat-******************" GITLAB_PROJECT = 77 PAGE_BREAK_STRING = '<div style="page-break-after: always;"></div>' class GitlabClient(gitlab.Gitlab): def __init__(self, project_id): super().__init__(GITLAB_URL, GITLAB_PRIVATE_TOKEN, order_by='created_at') self.project = self.projects.get(project_id) def retrieve_issue(self, issue_id): issue = self.project.issues.get(issue_id) base_url = issue.web_url # Es: 'https://gitlab.brainstorm.it/group/project/-/issues/36' position = base_url.find('/-/') if position >= 0: base_url = base_url[:position] # Es: 'https://gitlab.brainstorm.it/group/project' data = json.loads(issue.to_json()) self.fix_image_links(data, base_url) data['notes'] = [] notes = issue.notes.list(all=True) notes = sorted([n for n in notes if not n.system], key=lambda k: k.created_at) for n in notes: note = json.loads(n.to_json()) self.fix_image_links(note, base_url) data['notes'].append(note) return data def fix_image_links(self, data, base_url): text = data.get('description', data.get('body')) text = text.replace('/uploads/', base_url+'/uploads/') if 'description' in data: data['description'] = text else: data['body'] = text def to_markdown(data): text = "" for k, v in data.items(): text += '\n# [%d] %s\n\n' % (k, v['title']) text += "### %s (%s)\n\n" % (v['author']['name'], v['created_at']) text += "**Link**: %s\n\n" % v['web_url'] text += v['description'] text += "\n\n" for n in v['notes']: text += "\n\n" + (80*'-') + "\n\n" text += "### %s (%s)\n\n" % (n['author']['name'], v['created_at']) text += n['body'] text += "\n\n" text += "\n\n" + PAGE_BREAK_STRING return text def main(): parser = argparse.ArgumentParser(description="...") parser.add_argument('issue_ids', nargs="+", type=int) parser.add_argument("--project_id", "-p", type=int, default=GITLAB_PROJECT, help="Default: %d" % GITLAB_PROJECT) parser.add_argument("--format", type=str, choices=["", "markdown", ], default="") args = parser.parse_args() project_id = args.project_id client = GitlabClient(GITLAB_PROJECT) data = {} for issue_id in args.issue_ids: issue_data = client.retrieve_issue(issue_id) data[issue_id] = issue_data if args.format == 'markdown': print(to_markdown(data)) else: pprint.pprint(data) if __name__ == '__main__': main()
#!/usr/bin/env python3 import pprint import json import gitlab import argparse # Requirements # python-gitlab==4.11.1 GITLAB_URL = "https://gitlab.brainstorm.it" GITLAB_PRIVATE_TOKEN = "glpat-******************" GITLAB_PROJECT = 77 PAGE_BREAK_STRING = '<div style="page-break-after: always;"></div>' class GitlabClient(gitlab.Gitlab): def __init__(self, project_id): super().__init__(GITLAB_URL, GITLAB_PRIVATE_TOKEN, order_by='created_at') self.project = self.projects.get(project_id) def retrieve_issue(self, issue_id): issue = self.project.issues.get(issue_id) base_url = issue.web_url # Es: 'https://gitlab.brainstorm.it/group/project/-/issues/36' position = base_url.find('/-/') if position >= 0: base_url = base_url[:position] # Es: 'https://gitlab.brainstorm.it/group/project' data = json.loads(issue.to_json()) self.fix_image_links(data, base_url) data['notes'] = [] notes = issue.notes.list(all=True) notes = sorted([n for n in notes if not n.system], key=lambda k: k.created_at) for n in notes: note = json.loads(n.to_json()) self.fix_image_links(note, base_url) data['notes'].append(note) return data def fix_image_links(self, data, base_url): text = data.get('description', data.get('body')) text = text.replace('/uploads/', base_url+'/uploads/') if 'description' in data: data['description'] = text else: data['body'] = text def to_markdown(data): text = "" for k, v in data.items(): text += '\n# [%d] %s\n\n' % (k, v['title']) text += "### %s (%s)\n\n" % (v['author']['name'], v['created_at']) text += "**Link**: %s\n\n" % v['web_url'] text += v['description'] text += "\n\n" for n in v['notes']: text += "\n\n" + (80*'-') + "\n\n" text += "### %s (%s)\n\n" % (n['author']['name'], v['created_at']) text += n['body'] text += "\n\n" text += "\n\n" + PAGE_BREAK_STRING return text def main(): parser = argparse.ArgumentParser(description="...") parser.add_argument('issue_ids', nargs="+", type=int) parser.add_argument("--project_id", "-p", type=int, default=GITLAB_PROJECT, help="Default: %d" % GITLAB_PROJECT) parser.add_argument("--format", type=str, choices=["", "markdown", ], default="") args = parser.parse_args() project_id = args.project_id client = GitlabClient(GITLAB_PROJECT) data = {} for issue_id in args.issue_ids: issue_data = client.retrieve_issue(issue_id) data[issue_id] = issue_data if args.format == 'markdown': print(to_markdown(data)) else: pprint.pprint(data) if __name__ == '__main__': main()
My implementation:
import csv from django.db import models from django.db import connection from django.utils import timezone from django.db import connection from django.contrib.humanize.templatetags.humanize import intcomma from ccms_lib.backend.models import CODE_MAX_LENGTH class SqlViewMixin(): @classmethod def create_view(cls, verbose=False): view_name = cls._meta.db_table cls_sql = cls.sql().strip() if cls_sql.endswith(';'): cls_sql = cls_sql[:-1] if cls.materialized: sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS " sql += cls_sql + ';\n' pk_name = cls._meta.pk.name sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});' else: sql = f"CREATE VIEW {view_name} AS " sql += cls_sql + ';\n' cls.execute(sql, verbose) @classmethod def drop_view(cls, verbose=False): view_name = cls._meta.db_table sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % ( 'materialized ' if cls.materialized else '', view_name, ) pk_name = cls._meta.pk.name sql += f"DROP INDEX IF EXISTS idx_{view_name};" cls.execute(sql, verbose) @classmethod def refresh_view(cls, concurrently=False, verbose=False): if cls.materialized: #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};" sql = "REFRESH MATERIALIZED VIEW %s%s;" % ( "CONCURRENTLY " if concurrently else '', cls._meta.db_table, ) cls.execute(sql, verbose) @classmethod def execute(cls, sql, verbose): with connection.cursor() as cursor: if verbose: print(sql) cursor.execute(sql) @classmethod def export_view_as_csv(cls, verbose, filename=None, delimiter=','): def export_rows(cursor, sql, offset, page_size, writer, verbose): if page_size > 0: sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size) if verbose: print(sql) cursor.execute(sql) if offset <= 0: writer.writerow([f.name for f in cursor.description]) for row in cursor.fetchall(): writer.writerow(row) view_name = cls._meta.db_table if filename is None: filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv' #page_size = 100000 # 0 = no pagination page_size = 1000000 # 0 = no pagination sql = str(cls.objects.all().query) sql += ' ORDER BY "%s"' % cls._meta.pk.name n = 0 step = 0 if verbose: n = cls.objects.count() step = int(n / 100) print('Exporting file "%s"; records: %s' % (filename, intcomma(n))) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, dialect='excel', delimiter=delimiter) with connection.cursor() as cursor: if page_size <= 0: # no pagination cursor.execute(sql) writer.writerow([f.name for f in cursor.description]) j = 0 row = cursor.fetchone() while row is not None: j += 1 if verbose and j==1 or ((j/10) % step) == 0: progress = int((j * 100) / n) + 1 print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n))) writer.writerow(row) row = cursor.fetchone() else: # paginate num_pages = (n // page_size) + (1 if n % page_size else 0) offset = 0 j = 0 while offset < n: if verbose: progress = int(((j*page_size) * 100) / n) print('page %d/%d (%d%%)' % (j+1, num_pages, progress)) export_rows(cursor, sql, offset, page_size, writer, verbose=False) offset += page_size j += 1 ################################################################################ # Example ... class DwhBase(SqlViewMixin, models.Model): materialized = True id = models.CharField(null=False, max_length=256, primary_key=True) base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False) base_name = models.CharField(max_length=256, null=False) class Meta: managed = False db_table = "dwhm_base" def __str__(self): return str(self.id) @staticmethod def sql(): return """ SELECT code AS id, code as base_code, description AS base_name FROM backend_base; """
References:
My implementation:
import csv from django.db import models from django.db import connection from django.utils import timezone from django.db import connection from django.contrib.humanize.templatetags.humanize import intcomma from ccms_lib.backend.models import CODE_MAX_LENGTH class SqlViewMixin(): @classmethod def create_view(cls, verbose=False): view_name = cls._meta.db_table cls_sql = cls.sql().strip() if cls_sql.endswith(';'): cls_sql = cls_sql[:-1] if cls.materialized: sql = f"CREATE materialized VIEW IF NOT EXISTS {view_name} AS " sql += cls_sql + ';\n' pk_name = cls._meta.pk.name sql += f'CREATE UNIQUE INDEX IF NOT EXISTS idx_{view_name} ON {view_name}({pk_name});' else: sql = f"CREATE VIEW {view_name} AS " sql += cls_sql + ';\n' cls.execute(sql, verbose) @classmethod def drop_view(cls, verbose=False): view_name = cls._meta.db_table sql = "DROP %sVIEW IF EXISTS %s CASCADE;\n" % ( 'materialized ' if cls.materialized else '', view_name, ) pk_name = cls._meta.pk.name sql += f"DROP INDEX IF EXISTS idx_{view_name};" cls.execute(sql, verbose) @classmethod def refresh_view(cls, concurrently=False, verbose=False): if cls.materialized: #sql = "REFRESH MATERIALIZED VIEW CONCURRENTLY {cls._meta.db_table};" sql = "REFRESH MATERIALIZED VIEW %s%s;" % ( "CONCURRENTLY " if concurrently else '', cls._meta.db_table, ) cls.execute(sql, verbose) @classmethod def execute(cls, sql, verbose): with connection.cursor() as cursor: if verbose: print(sql) cursor.execute(sql) @classmethod def export_view_as_csv(cls, verbose, filename=None, delimiter=','): def export_rows(cursor, sql, offset, page_size, writer, verbose): if page_size > 0: sql = sql + " OFFSET %d LIMIT %d" % (offset, page_size) if verbose: print(sql) cursor.execute(sql) if offset <= 0: writer.writerow([f.name for f in cursor.description]) for row in cursor.fetchall(): writer.writerow(row) view_name = cls._meta.db_table if filename is None: filename = timezone.now().strftime('%Y-%m-%d_%H-%M-%S__') + view_name + '.csv' #page_size = 100000 # 0 = no pagination page_size = 1000000 # 0 = no pagination sql = str(cls.objects.all().query) sql += ' ORDER BY "%s"' % cls._meta.pk.name n = 0 step = 0 if verbose: n = cls.objects.count() step = int(n / 100) print('Exporting file "%s"; records: %s' % (filename, intcomma(n))) with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, dialect='excel', delimiter=delimiter) with connection.cursor() as cursor: if page_size <= 0: # no pagination cursor.execute(sql) writer.writerow([f.name for f in cursor.description]) j = 0 row = cursor.fetchone() while row is not None: j += 1 if verbose and j==1 or ((j/10) % step) == 0: progress = int((j * 100) / n) + 1 print('%d%% (%s/%s)' % (progress, intcomma(j), intcomma(n))) writer.writerow(row) row = cursor.fetchone() else: # paginate num_pages = (n // page_size) + (1 if n % page_size else 0) offset = 0 j = 0 while offset < n: if verbose: progress = int(((j*page_size) * 100) / n) print('page %d/%d (%d%%)' % (j+1, num_pages, progress)) export_rows(cursor, sql, offset, page_size, writer, verbose=False) offset += page_size j += 1 ################################################################################ # Example ... class DwhBase(SqlViewMixin, models.Model): materialized = True id = models.CharField(null=False, max_length=256, primary_key=True) base_code = models.CharField(max_length=CODE_MAX_LENGTH, null=False) base_name = models.CharField(max_length=256, null=False) class Meta: managed = False db_table = "dwhm_base" def __str__(self): return str(self.id) @staticmethod def sql(): return """ SELECT code AS id, code as base_code, description AS base_name FROM backend_base; """
References:
Adapted from:
"Django: Basic Auth for one view (avoid middleware)"
file decorators.py:
import base64 from django.http import HttpResponse from django.contrib.auth import authenticate def basicauth(view): """ Adapted from: "Django: Basic Auth for one view (avoid middleware)" https://stackoverflow.com/questions/46426683/django-basic-auth-for-one-view-avoid-middleware#47902577 """ def wrap(request, *args, **kwargs): if 'HTTP_AUTHORIZATION' in request.META: auth = request.META['HTTP_AUTHORIZATION'].split() if len(auth) == 2: if auth[0].lower() == "basic": uname, passwd = base64.b64decode(auth[1]).decode( "utf8" ).split(':', 1) user = authenticate(username=uname, password=passwd) if user is not None and user.is_active: request.user = user return view(request, *args, **kwargs) response = HttpResponse() response.status_code = 401 response['WWW-Authenticate'] = 'Basic realm="{}"'.format( #settings.BASIC_AUTH_REALM 'api' ) return response return wrap
file view.py:
from django.http import JsonResponse from .decorators import basicauth @basicauth def get_data(request, query_name): try: if not request.user.is_authenticated: raise Exception("Authentication required") ... data = { whatever } response = JsonResponse( { "success": True, "data": data, } ) except Exception as e: response = JsonResponse( { "success": False, "errors": [ { "status": 400, "source": { "view": "get_dw_data()", "traceback": traceback.format_exc() if config.EXCHANGE_API_TRACEBACK else "", }, "title": "Error", "detail": str(e), } ], }, safe=True, status=400, ) return response
Adapted from:
"Django: Basic Auth for one view (avoid middleware)"
file decorators.py:
import base64 from django.http import HttpResponse from django.contrib.auth import authenticate def basicauth(view): """ Adapted from: "Django: Basic Auth for one view (avoid middleware)" https://stackoverflow.com/questions/46426683/django-basic-auth-for-one-view-avoid-middleware#47902577 """ def wrap(request, *args, **kwargs): if 'HTTP_AUTHORIZATION' in request.META: auth = request.META['HTTP_AUTHORIZATION'].split() if len(auth) == 2: if auth[0].lower() == "basic": uname, passwd = base64.b64decode(auth[1]).decode( "utf8" ).split(':', 1) user = authenticate(username=uname, password=passwd) if user is not None and user.is_active: request.user = user return view(request, *args, **kwargs) response = HttpResponse() response.status_code = 401 response['WWW-Authenticate'] = 'Basic realm="{}"'.format( #settings.BASIC_AUTH_REALM 'api' ) return response return wrap
file view.py:
from django.http import JsonResponse from .decorators import basicauth @basicauth def get_data(request, query_name): try: if not request.user.is_authenticated: raise Exception("Authentication required") ... data = { whatever } response = JsonResponse( { "success": True, "data": data, } ) except Exception as e: response = JsonResponse( { "success": False, "errors": [ { "status": 400, "source": { "view": "get_dw_data()", "traceback": traceback.format_exc() if config.EXCHANGE_API_TRACEBACK else "", }, "title": "Error", "detail": str(e), } ], }, safe=True, status=400, ) return response
file main/middleware.py
import logging from django.utils.deprecation import MiddlewareMixin from django.conf import settings logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s") def to_string(value): text = str(value) nmax = settings.LOG_REQUEST_DETAIL_MAX_LENGTH if nmax > 0 and len(text) > nmax: text = text[:nmax] + "......" return text def user_ip_address(request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) if x_forwarded_for: remote_address = x_forwarded_for.split(',')[0].strip() else: remote_address = request.META.get('REMOTE_ADDR', None) return remote_address class LoggingMiddleware(MiddlewareMixin): """ Log requests and responses Adapted from: https://scripting4ever.wordpress.com/2020/07/27/how-to-log-the-request-and-response-via-django-middleware/ """ def process_request(self, request): if settings.LOG_REQUEST_DETAILS: logging.info("=" * 80) logging.info( "Request Method: " + to_string(request.META.get("REQUEST_METHOD", "???")) ) logging.info("URL requested: " + to_string(request.path)) logging.info("Request Body Contents: " + to_string(request.body)) logging.info("Request Headers: " + to_string(request.headers)) logging.info( "Content Length: " + to_string(request.META.get("CONTENT_LENGTH", "???")) ) logging.info( #"Client IP Address: " + to_string(request.META.get("REMOTE_ADDR", "???")) "Client IP Address: " + to_string(user_ip_address(request)) ) logging.info( "Host Name of Client: " + to_string(request.META.get("REMOTE_HOST", "???")) ) logging.info( "Host Name or Server: " + to_string(request.META.get("SERVER_NAME", "???")) ) logging.info( "Port of the Server: " + to_string(request.META.get("SERVER_PORT", "???")) ) logging.info("-" * 80) return None def process_response(self, request, response): if settings.LOG_REQUEST_DETAILS: logging.info("Response Content: " + to_string(response.content)) logging.info("Response Code: " + to_string(response.status_code)) logging.info("Response Headers: " + to_string(response.headers)) logging.info("=" * 80) return response
then, in settings.py:
MIDDLEWARE = [ ... 'main.middleware.LoggingMiddleware', ] ... LOG_REQUEST_DETAILS = True LOG_REQUEST_DETAIL_MAX_LENGTH = 200
Adapted from:
file main/middleware.py
import logging from django.utils.deprecation import MiddlewareMixin from django.conf import settings logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(levelname)s:%(message)s") def to_string(value): text = str(value) nmax = settings.LOG_REQUEST_DETAIL_MAX_LENGTH if nmax > 0 and len(text) > nmax: text = text[:nmax] + "......" return text def user_ip_address(request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) if x_forwarded_for: remote_address = x_forwarded_for.split(',')[0].strip() else: remote_address = request.META.get('REMOTE_ADDR', None) return remote_address class LoggingMiddleware(MiddlewareMixin): """ Log requests and responses Adapted from: https://scripting4ever.wordpress.com/2020/07/27/how-to-log-the-request-and-response-via-django-middleware/ """ def process_request(self, request): if settings.LOG_REQUEST_DETAILS: logging.info("=" * 80) logging.info( "Request Method: " + to_string(request.META.get("REQUEST_METHOD", "???")) ) logging.info("URL requested: " + to_string(request.path)) logging.info("Request Body Contents: " + to_string(request.body)) logging.info("Request Headers: " + to_string(request.headers)) logging.info( "Content Length: " + to_string(request.META.get("CONTENT_LENGTH", "???")) ) logging.info( #"Client IP Address: " + to_string(request.META.get("REMOTE_ADDR", "???")) "Client IP Address: " + to_string(user_ip_address(request)) ) logging.info( "Host Name of Client: " + to_string(request.META.get("REMOTE_HOST", "???")) ) logging.info( "Host Name or Server: " + to_string(request.META.get("SERVER_NAME", "???")) ) logging.info( "Port of the Server: " + to_string(request.META.get("SERVER_PORT", "???")) ) logging.info("-" * 80) return None def process_response(self, request, response): if settings.LOG_REQUEST_DETAILS: logging.info("Response Content: " + to_string(response.content)) logging.info("Response Code: " + to_string(response.status_code)) logging.info("Response Headers: " + to_string(response.headers)) logging.info("=" * 80) return response
then, in settings.py:
MIDDLEWARE = [ ... 'main.middleware.LoggingMiddleware', ] ... LOG_REQUEST_DETAILS = True LOG_REQUEST_DETAIL_MAX_LENGTH = 200
Adapted from:
Example of a simple C# console application that prints "Hello, World!" to the console using top-level statements:
dotnet new console -n HelloWorldApp cd HelloWorldApp cat Program.cs // Program.cs using System; Console.WriteLine("Hello, World!"); dotnet run
Example of a simple C# console application that prints "Hello, World!" to the console using top-level statements:
dotnet new console -n HelloWorldApp cd HelloWorldApp cat Program.cs // Program.cs using System; Console.WriteLine("Hello, World!"); dotnet run
Realizzato allo scopo di verificare il codice utilizzato internamente nelle nostre applicazioni per l'accesso ai databases mediante l'interfaccia ADO, consente di eseguire query interattive su un generico database (MySQL, Access, SQL Server, Oracle, ecc...). E' inoltre possibile esportare la struttura e il contentuto di un generico database in forma di script SQL compatibile con MySQL oppure SQL Server.
Click here to downloadConsente di inviare dati ASCII oppure binari ad un generico dispositivo, sia utilizzando la comunicazione seriale, che via ethernet (TCP). E' inoltre possibile impostare risposte automatiche per simulare un generico protocollo di comunicazione. Supporta connessioni seriali e/o Ethernet, TCP o UDP (sia lato client che lato server), HTTP, e il protocollo seriale XModem.
Click here to downloadUtilizzato per la distribuzione dei sorgenti in forma crittata. La chiave di decodifica viene fornita singolarmente a ciascun cliente.
Click here to downloadBRAINSTORM S.r.l. di Mario Orlandi & C.
Viale Crispi, 2
41121 Modena, Italy
P.IVA 02409140361
Codice SDI: USAL8PV
Indirizzo Pec: brainstormsnc@pec.it
Tel: (+39) 059 216138
Insert your credentials to access protected services
Questa Applicazione raccoglie alcuni Dati Personali dei propri Utenti.
Fra i Dati Personali raccolti da questa Applicazione, in modo autonomo o tramite terze parti, ci sono: Cookie e Dati di utilizzo.
I Dati Personali possono essere inseriti volontariamente dall’Utente, oppure raccolti in modo automatico durante l'uso di questa Applicazione.
L’eventuale utilizzo di Cookie - o di altri strumenti di tracciamento - da parte di questa Applicazione o dei titolari dei servizi terzi utilizzati da questa Applicazione, ove non diversamente precisato, ha la finalità di identificare l’Utente e registrare le relative preferenze per finalità strettamente legate all'erogazione del servizio richiesto dall’Utente.
Il mancato conferimento da parte dell’Utente di alcuni Dati Personali potrebbe impedire a questa Applicazione di erogare i propri servizi.
L'Utente si assume la responsabilità dei Dati Personali di terzi pubblicati o condivisi mediante questa Applicazione e garantisce di avere il diritto di comunicarli o diffonderli, liberando il Titolare da qualsiasi responsabilità verso terzi.
Il Titolare tratta i Dati Personali degli Utenti adottando le opportune misure di sicurezza volte ad impedire l’accesso, la divulgazione, la modifica o la distruzione non autorizzate dei Dati Personali.
Il trattamento viene effettuato mediante strumenti informatici e/o telematici, con modalità organizzative e con logiche strettamente correlate alle finalità indicate. Oltre al Titolare, in alcuni casi, potrebbero avere accesso ai Dati categorie di incaricati coinvolti nell’organizzazione del sito (personale amministrativo, commerciale, marketing, legali, amministratori di sistema) ovvero soggetti esterni (come fornitori di servizi tecnici terzi, corrieri postali, hosting provider, società informatiche, agenzie di comunicazione) nominati anche, se necessario, Responsabili del Trattamento da parte del Titolare. L’elenco aggiornato dei Responsabili potrà sempre essere richiesto al Titolare del Trattamento.