Read file#
try:
content = open(file, 'r').read()
lines = open(file, 'r').readline()
except IOError as err:
print(err)
sys.exit()
Read csv#
import csv
with open('FOO.csv', newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=' ', quotechar='|')
next(reader, None) # skip header row
for row in reader:
print(', '.join(row))
Date and time#
time_object = datetime.strptime("19-11-20", '%y-%m-%d').date() # read from string
time_object.strftime("%Y/%m/%d") # print to string
One liner yaml to json#
python -c 'import sys, yaml, json; json.dump(yaml.safe_load(sys.stdin), sys.stdout, indent=4)' < $FILE
Logging#
import logging
loggers = {}
def mylogger(name):
global loggers
if loggers.get(name):
return loggers.get(name)
else:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(module)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
loggers.update(dict(name=logger))
return logger
from log import mylogger
logger = mylogger(__name__)
logger.info("TEXT")
logger.debug("!!!!")
JSON#
import json
json.load(open(file.json, 'r'))
json.loads('{"hello": "world}')
Functional Programming#
Iterate hash
list(map(lambda x: x, out))
Upgrade all pip packages#
sudo pip freeze --local | grep -v '"'"'^\-e'"'"' | cut -d = -f 1 | xargs -n1 sudo pip install -U
pip proxy#
Windows %APPDATA%\pip\pip.ini
/ Linux pip.conf
[global]
proxy = http://[domain name]%5C[username]:[password]@[proxy address]:[proxy port]
pip certificate validation#
pip
pip install install --trusted-host pypi.org --trusted-host files.pythonhosted.org
or in pip.ini (windowns) / pip.conf (linux)
[global]
trusted-host = pypi.python.org
pypi.org
files.pythonhosted.org
or provide certificate (in pem format) with pip3 --cert path/to/cert install PACAKGE
conda
conda config --set ssl_verify false
LDAP testing#
#!/usr/bin/env python
from ldap3 import Server, Connection, ALL
def ldap_initialize(server, port, user, password):
"""
Simple ldap connection test function
:param server: mail server address
:param port: mail server port
:param user: user to auth
:param password: password to auth
"""
s = Server(server, port, get_info=ALL)
# define the connection
c = Connection(s, user, password, auto_bind='NONE', version=3, authentication='SIMPLE', \
client_strategy='SYNC', auto_referrals=True, check_names=True, read_only=True, lazy=False, raise_exceptions=False)
# perform the Bind operation
if not c.bind():
print('error in bind', c.result)
SMTP testing#
import smtplib
from email.mime.text import MIMEText
def send_mail(host, me, you, user, password):
"""
Send a test mail containing a simple ASCII message
:param host: mail server + port
:param me: sender address
:param you: receiver mail address
:param user: user to auth
:param password: password to auth
"""
msg = MIMEText("Hello World!")
msg['Subject'] = 'Test mail from Python'
msg['From'] = me
msg['To'] = you
s = smtplib.SMTP(host)
s.set_debuglevel(True)
s.login(user, password)
s.sendmail(me, [you], msg.as_string())
s.quit()
Generate sha1 password#
For example for Jupyter notebooks credentials
In [1]: from IPython.lib import passwd
In [2]: passwd()
CLI arguments and parameters#
import click
@click.group()
def main():
"""Main click group"""
pass
@main.command()
@click.option('--count', default=10, help='number of entries')
@click.option('--path', default='pckt.db', help='path to sqlite3 file')
def update(count, path):
"""Updates/recreates the database containing all information"""
clear_db(path)
update_db(path, get_data(fetch_items(P, count)))
@main.command()
@click.option('--path', default='pckt.db', help='path to sqlite3 file')
def tags(path):
"""Prints tags and their figures as json"""
print(json.dumps(get_tags_stats(select_row(path, 2))))
@main.command()
@click.option('--path', default='pckt.db', help='path to sqlite3 file')
@click.option('--row', default='complete', help='Which column to search')
@click.argument('keywords', nargs=-1)
def search(path, row, keywords):
"""Full text search all rows or specified row"""
print(filter_entries(path, row, keywords))
if __name__ == '__main__':
main()
CLI table output#
import texttable
table = texttable.Texttable()
table.set_cols_width([width, width, 15, 10, 15])
for row in entries:
table.add_row(list(row))
print(table.draw() + "\n")
Count occurrence with hash values#
‘0’ if the key is set for the first time. Otherwise adds ‘1’ every assignment
tags[tag] = tags.get(tag, 0) + 1
Check keys of nested dicts#
def keys_exists(element, *keys):
"""Check if *keys (nested) exists in `element` (dict).
Args:
element (dict): the dictionary to be checked
keys (string): one or more (nested) keys to check for existence
Returns:
bool: True if key exists
"""
if not isinstance(element, dict):
raise AttributeError('keys_exists() expects dict as first argument.')
if len(keys) == 0:
raise AttributeError('keys_exists() expects at least two arguments, one given.')
for key in keys:
try:
element = element[key]
except (KeyError, TypeError):
return False
return True
Profiling#
cProfile
python -m cProfile -o cprofile.profile main.py
visualize with snakeviz cprofile.profile
PyCallGrap*
pycallgraph graphviz -- ./main.py
open pycallgraph.png
Static code analysis#
[Pyan}(https://github.com/davidfraser/pyan)
Verify Redis connection#
import redis
import os
try:
conn = redis.StrictRedis(
host="localhost",
port=6379,
password="s3cret",
ssl=True,
)
print(conn)
conn.ping()
print("Connected!")
except Exception as ex:
print("Error ", ex)
exit("Failed to connect, terminating.")
Create container in Azure storage account#
def createStorageAccountContainer(storage_account_name, container_name):
account_url = f"https://{storage_account_name}.blob.core.windows.net"
azure_credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url, credential=azure_credential)
try:
blob_service_client.create_container(name=container_name) # private by default
except ResourceExistsError:
print(
f"A container in {storage_account_name} named {container_name} already exists"
)
Upload blob to Azure storage account#
def uploadBlob(storage_account_name, container_name):
account_url = f"https://{storage_account_name}.blob.core.windows.net"
azure_credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url, credential=azure_credential)
ctime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
fp = tempfile.NamedTemporaryFile()
fp.write(ctime.encode("utf-8"))
blob_client = blob_service_client.get_blob_client(
container=container_name, blob=ctime
)
with open(fp.name, "rb") as data:
blob_client.upload_blob(data)
fp.close()