Commit 3658fe21 authored by dani rusdan's avatar dani rusdan

helper

parent bf3eaaf9
...@@ -42,11 +42,16 @@ def create_app(): ...@@ -42,11 +42,16 @@ def create_app():
from app.routes.auth import auth_bp from app.routes.auth import auth_bp
from app.routes.user import user_bp from app.routes.user import user_bp
from app.routes.email import email_bp from app.routes.email import email_bp
from app.routes.tester import tester
from app.routes.registerapp import registerapp
app.register_blueprint(auth_bp, url_prefix='/auth') app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(user_bp, url_prefix='/user') app.register_blueprint(user_bp, url_prefix='/user')
app.register_blueprint(email_bp, url_prefix='/email') app.register_blueprint(email_bp, url_prefix='/email')
app.register_blueprint(tester)
app.register_blueprint(registerapp)
swagger = Swagger(app, config={ swagger = Swagger(app, config={
"headers": [ "headers": [
('Access-Control-Allow-Origin', '*') ('Access-Control-Allow-Origin', '*')
......
#!/usr/bin/python
import psycopg2
from configparser import ConfigParser
class connection():
def config_db(filename='/database.ini', section='postgresql'):
# create a parser
parser = ConfigParser()
# read config file
parser.read(filename)
# get section, default to postgresql
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db
def connect():
""" Connect to the PostgreSQL database server """
conn = None
try:
# read connection parameters
params = connection.config_db()
# connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
# create a cursor
cur = conn.cursor()
# execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')
# display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)
# close the communication with the PostgreSQL
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
print('Database connection closed.')
def dbengine():
conn = None
try:
# read connection parameters
conn = psycopg2.connect(
database = "BUILDER",
user = "postgres",
password = "khansia215758",
host = "103.126.28.68",
port = "8082"
)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
return conn
return conn
# if __name__ == '__main__':
# connection.connect()
\ No newline at end of file
[postgresql]
host=103.126.28.68
database=hansiajaya
user=postgres
password=khansia215758
port=8082
from django.http import JsonResponse
# from application.helper.Patterns import Patterns
from django.shortcuts import redirect
from datetime import datetime
from dateutil.relativedelta import relativedelta
from django.db import connection
# from cryptography.fernet import Fernet
import base64
import binascii
import json
# https://pypi.org/project/pycryptodome/
from Crypto import Random
from Crypto.Cipher import AES
import json
import sys
# PHRASE = 'AygqrawSf1GI2FFzUX8AnRjLxb0V1paOuwMR22UR_1I='
PHRASE_KY = b'0123456789abcdef' # Key should be 16 bytes (128 bits)
PHRASE_IV = b'abcdef9876543210' # IV should be 16 bytes
class Master(object):
PHRASE_KY = b'0123456789abcdef' # Key should be 16 bytes (128 bits)
PHRASE_IV = b'abcdef9876543210' # IV should be 16 bytes
CODE_SUCCESS = 0
INFO_SUCCESS = 'Success'
CODE_FAILED = 1
INFO_FAILED = 'Failed'
INPUTEXT = 1
TEXTAREA = 2
SELECTOPT = 3
RADIONBTN = 4
CHECKBOX = 5
RADIO_IMAGE = 9
P_MAP = 7
M_MAP = 8
L_MAP = 10
C_MAP = 11
PROD_URI = 'http://223.27.149.202'
# ENVIRONMENT = setting.env('ENVIRONMENT')
def __init__(self, value):
self._request = value
self._view = None
# self._env = ENVIRONMENT
def display(self):
return self._request
def getSession(self, value):
v = None
if value == 'userid':
v = self._request.session['userid']
if value == 'roleid':
v = self._request.session['role_id']
if value == 'appsid':
v = self._request.session['apps_id']
if value == 'data':
if 'my_data' in self._request.session:
v = self._request.session['my_data']
if value == 'role_name':
if 'roled' in self._request.session:
v = self._request.session['roled']
if value == 'f_name':
v = self._request.session['f_name']
if value == 'l_name':
v = self._request.session['l_name']
return v
def querydict_to_dict(query_dict):
colll = []
for key in query_dict.keys():
data = {}
v = query_dict.getlist(key)
if len(v) == 1:
v = v[0]
data[key] = v
colll.append(data)
return colll
def defaultsError(self):
results = {
"draw" : int(0),
"recordsTotal" : int(0),
"recordsFiltered" : int(0),
"data" : None
}
return results
def checkArray(isData):
if 'guid' not in isData : isData['guid'] = 0
if 'info' not in isData : isData['info'] = Master.INFO_SUCCESS
if 'code' not in isData : isData['code'] = Master.CODE_SUCCESS
if 'data' not in isData : isData['data'] = None
return isData
def Results(self, isData = {'guid': 0, 'code': CODE_SUCCESS, 'info': INFO_SUCCESS, 'data': None}, asJson = False):
if asJson == False :
myData = Master.checkArray(isData)
setArr = {
'guid' : myData['guid'],
'code' : myData['code'],
'info' : myData['info'],
'data' : myData['data'],
}
for item in isData:
if item not in setArr :
setArr[item] = isData[item]
else:
setArr = isData
return JsonResponse(setArr)
def jsonLoads(self, param):
return(json.loads(param))
def getDate(self):
isDate = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return isDate
def getYear(self):
isDate = datetime.now().strftime("%Y")
return isDate
def getDatePlusOneYear(self):
currentTimeDate = datetime.now() + relativedelta(years=1)
currentTime = currentTimeDate.strftime("%Y-%m-%d %H:%M:%S")
return currentTime
def getDatePlusOneMonth(self):
currentTimeDate = datetime.now() + relativedelta(months=1)
currentTime = currentTimeDate.strftime("%Y-%m-%d %H:%M:%S")
return currentTime
def directLoad(self, param):
return json.dumps(param)
def checkCookieSession(request):
cook = request.COOKIES
ex = 'sessionid' in cook
if ex:
reason = request.COOKIES['sessionid']
if reason == None:
sys.exit('asss')
else:
sys.exit('asss')
def jsonDumps(self, param):
return(json.dumps(param))
def isset(self, key, data):
setData = None
if key in data:
setData = data[str(key)]
return setData
def issetOne(key, data):
setData = None
if key in data:
setData = data[str(key)]
return setData
def set_cookie(response, key, value, days_expire=7):
if days_expire is None:
max_age = 365 * 24 * 60 * 60 # one year
else:
max_age = days_expire * 24 * 60 * 60
expires = datetime.datetime.strftime(
datetime.datetime.utcnow() + datetime.timedelta(seconds=max_age),
"%a, %d-%b-%Y %H:%M:%S GMT",
)
response.set_cookie(
key,
value,
max_age=max_age,
expires=expires,
)
def encr(data):
# BUKA KOMEN UNTUK GENERAATE KEY BARU
# key = Fernet.generate_key()
# print(key)
cipher_suite = Fernet(PHRASE)
data_str = json.dumps(data)
encrypted_data = cipher_suite.encrypt(data_str.encode())
# print(f"Encrypted data: {encrypted_data.decode()}")
return encrypted_data.decode()
def pycrdome(data):
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
# Generate a key and initialization vector (iv)
key = PHRASE_KY # Key should be 16 bytes (128 bits)
iv = PHRASE_IV # IV should be 16 bytes
# BUKA KOMEN UNTUK GENERAATE KEY BARU
# from Crypto.Random import get_random_bytes
# key = get_random_bytes(16)
# iv = get_random_bytes(16)
# print(f"Key: {base64.b64encode(key).decode()}")
# print(f"IV: {base64.b64encode(iv).decode()}")
# print(get_random_bytes(16) )
# Convert the object to a JSON string
data_str = json.dumps(data)
# Encrypt the JSON string
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted_data = cipher.encrypt(pad(data_str.encode(), AES.block_size))
# Encode the encrypted data and IV as base64 for transmission
encrypted_data_b64 = base64.b64encode(encrypted_data).decode()
# iv_b64 = base64.b64encode(iv).decode()
return encrypted_data_b64
# from ctypes.wintypes import BOOLEAN
# from telnetlib import PRAGMA_HEARTBEAT
# from pickle import TRUE
# from django.db import models, connection, transaction
from app.config_db import connection
from app.helper.Master import Master
from app.models.storages import storages
from var_dump import var_dump
class param:
DEFAULT_RETURN = 'id'
CUSTOM_RETURN = 'DANIRUSDAN'
# TRANSACTION = transaction
def __init__(self, _tables = None):
self._data = {}
self._tables = _tables
self._condition = None
self._return_id = None
self._return_fixed = None # digunakan jika tidak memakai seq tapi by kolom, ya gitu yach
self._schema = 'public'
self.connection = connection.dbengine()
self._cols = None
def createTable(self):
try:
storage = storages(self._schema)
storage._datum = self._data
storage.callStatmentsCreateTable()
sql = storage.createTableIfNotExists()
sql += self._schema+'.'+self._tables
if self._cols == None:
sql += storage.build()[:-3]+')'
else:
sql += storage.build()
# print(sql)
with connection.dbengine().cursor() as load:
load.execute(sql)
id = load.fetchone()[0]
load.close()
if id is not None:
y = {
'code' : Master.CODE_SUCCESS,
'info' : Master.INFO_SUCCESS,
'data' : id,
}
else:
y = {
'code' : Master.CODE_FAILED,
'info' : Master.INFO_FAILED,
}
except Exception as e:
y = {
'code' : Master.CODE_FAILED,
'info' : str(e),
}
return y
def copyTable(self):
param.dynamicQuery(f""" CREATE TABLE {self._schema}.{self._tables} AS TABLE {self._tables} WITH NO DATA """)
def createSchema(self):
param.dynamicQuery(" CREATE SCHEMA IF NOT EXISTS "+self._schema+" AUTHORIZATION postgres ")
def infomationSchema(self):
infr = param.dynamicQuery(" SELECT column_name, udt_name, character_maximum_length, numeric_precision FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"+self._tables+"' AND table_schema NOT LIKE '%gen_application%' ", 1)
self._data = infr['data']
return infr
def checkTableExist(schema_name, table_name):
b = False
infr = param.dynamicQuery(" SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = '"+schema_name+"' AND tablename = '"+table_name+"' ) ")
if infr['code'] == Master.CODE_SUCCESS:
if infr['data'][0]['exists'] == True:
b = infr['data'][0]['exists']
return b
def saveGlobal(self):
try:
id = None
storage = storages(self._tables)
storage._seq = self._return_id
storage._datum = self._data
storage._condition = self._condition
if self._return_fixed != None:
storage._seq = self._return_fixed
self._return_id = self._return_fixed
elif (self._return_id is not param.DEFAULT_RETURN or self._return_id is None) and self._return_fixed == None:
self._return_id = storage._granat()
if self._return_id is not None:
storage.callStatments()
if self._condition: # is update
sql = storage.update()
sql += storage.set()
sql += storage.where()
else:
sql = storage.insert()
sql += storage.into()
sql += storage.columns()
sql += storage.values()
# print(sql)
with connection.dbengine().cursor() as load:
load.execute(sql)
id = load.fetchone()[0]
load.close()
if id is not None:
# del self._data
y = {
'code' : Master.CODE_SUCCESS,
'info' : Master.INFO_SUCCESS,
'data' : id,
}
else:
y = {
'code' : Master.CODE_FAILED,
'info' : Master.INFO_FAILED,
}
else:
y = {
'code' : 92323898940,
'info' : 'GRANT SEQUNCE NOT FOUND',
}
except Exception as e:
y = {
'code' : Master.CODE_FAILED,
'info' : str(e),
}
# finally:
# if connection is not None:
# connection.close()
return y
def deleteGlobal(self, where):
try:
deleted = 0
storage = storages(self._tables)
sql = storage.delete()
sql += storage.where(where)
with connection.cursor() as load:
load.execute(sql)
deleted = load.rowcount
load.close()
if deleted is not None:
y = {
'code' : Master.CODE_SUCCESS,
'info' : Master.INFO_SUCCESS,
'data' : deleted,
}
else:
y = {
'code' : Master.CODE_FAILED,
'info' : Master.INFO_FAILED,
}
except Exception as e:
y = {
'code' : Master.CODE_FAILED,
'info' : str(e),
}
# finally:
# if connection is not None:
# connection.close()
return y
def loadGlobal(_column, _tabel = None, _where = None):
try:
sql = " SELECT "+_column
if _tabel:
sql += " FROM "+_tabel
if _where:
sql += " WHERE "+_where
# print(sql)
with connection.dbengine().cursor() as load:
load.execute(sql)
ret = load.fetchall()
columns = load.description
load.close()
colll = []
for i in ret:
column = {}
for n,k in enumerate(columns):
column[k.name] = i[n]
colll.append(column)
if ret:
y = {
'code' : Master.CODE_SUCCESS,
'info' : Master.INFO_SUCCESS,
'data' : colll,
}
else:
y = {
'code' : Master.CODE_FAILED,
'info' : Master.INFO_FAILED,
'data' : None
}
except Exception as e:
y = {
'code' : Master.CODE_FAILED,
'info' : str(e),
'data' : None
}
return y
def dynamicQuery(sql, dumps = False):
try:
sql = " "+sql+ " "
if dumps == True:
print(sql)
with connection.dbengine().cursor() as load:
load.execute(sql)
ret = load.fetchall()
columns = load.description
load.close()
colll = []
for i in ret:
column = {}
for n,k in enumerate(columns):
column[k.name] = i[n]
colll.append(column)
if ret:
y = {
'code' : Master.CODE_SUCCESS,
'info' : Master.INFO_SUCCESS,
'data' : colll,
}
else:
y = {
'code' : Master.CODE_FAILED,
'info' : Master.INFO_FAILED,
'data' : None
}
except Exception as e:
y = {
'code' : Master.CODE_FAILED,
'info' : str(e),
'data' : None
}
return y
from app.helper.Master import Master
from app.models.param import param
class tester:
def getuser():
sql = f'select * from auth_user'
res = param.dynamicQuery(sql)
return res
def getrole():
sql = f'select * from auth_role'
res = param.dynamicQuery(sql)
return res
from app.config_db import connection
from var_dump import var_dump
# @ DANIRUSDAN MEMANG TAMVAN
class storages:
SPECIFICATION_INSERT = 'insert'
def __init__(self, _grants = None):
self._grants = _grants
self._condition = ''
self.SPECIFICATION_DELETE = ' DELETE FROM '
self.SPECIFICATION_INSERT = ' INSERT INTO '
self.SPECIFICATION_UPDATE = ' UPDATE '
self.SPECIFICATION_VALUES = ' VALUES '
self._seq = None
self.SPECIFICATION_SET = ' SET '
self.SPECIFICATION_WHERE = ' WHERE '
self._datum = {}
self.value = ''
self.column = ''
self.CREATE_TABLE = 'CREATE TABLE IF NOT EXISTS '
self._condition_one = 'geo'
self._condition_two = 'id'
self._condition_tri = 'geom'
def callStatmentsCreateTable(self):
for i, k in enumerate(self._datum):
if i == 0:
self.column += " id serial PRIMARY KEY, "
# self.column += " new_deleted int, "
self.column += " new_create_date TIMESTAMP, "
if i == len(self._datum)-1:
if k['column_name'] != self._condition_one:
if k['column_name'] != self._condition_two and k['column_name'] != self._condition_tri:
if k['character_maximum_length'] != None:
self.column += k['column_name'] +" "+ k['udt_name'] +" ("+str(k['character_maximum_length'])+") "
elif k['numeric_precision'] != None:
self.column += k['column_name'] +" "+ k['udt_name'] +" "
else:
if k['udt_name'] == 'timestamp':
self.column += k['column_name'] +" "+ k['udt_name'] +" "
else:
if k['column_name'] == self._condition_two:
self.column += " ref_id int "
elif k['column_name'] == self._condition_tri:
self.column += " geom geometry "
else:
if k['column_name'] != self._condition_one:
if k['column_name'] != self._condition_two and k['column_name'] != self._condition_tri:
if k['character_maximum_length'] != None:
self.column += k['column_name'] +" "+ k['udt_name'] +" ("+str(k['character_maximum_length'])+"), "
elif k['numeric_precision'] != None:
self.column += k['column_name'] +" "+ k['udt_name'] +", "
else:
if k['udt_name'] == 'timestamp':
self.column += k['column_name'] +" "+ k['udt_name'] +", "
else:
if k['column_name'] == self._condition_two:
self.column += " ref_id int, "
elif k['column_name'] == self._condition_tri:
self.column += " geom geometry, "
def createTableIfNotExists(self):
return self.CREATE_TABLE
def build(self):
return '('+self.column+')'
def _granat(self):
try:
sql = " SELECT table_name, column_name, column_default from information_schema.columns where table_name='"+self._grants+"' "
with connection.dbengine().cursor() as load:
load.execute(sql)
ret = load.fetchall()
columns = load.description
load.close()
colll = []
for i in ret:
column = {}
for n,k in enumerate(columns):
column[k.name] = i[n]
colll.append(column)
b = None
if ret:
for j in colll:
if j['column_default'] is not None:
if (j['column_default'].find('nextval') != -1):
b = j['column_name']
self._seq = j['column_name']
except Exception as e:
b = None
return b
def checkTypeData(_var):
if _var == 'null':
_var = None
if _var != 'None' and _var != None:
if _var != '':
if _var.isnumeric() is False:
_var = "'"+_var+"'"
else:
_var = 'NULL'
else:
_var = 'NULL'
return _var
def callStatments(self):
for i, k in enumerate(self._datum):
if i == len(self._datum)-1:
if self._condition is None:
self.column += k
self.value += storages.checkTypeData(str(self._datum[k]))
else:
self.value += k + ' = ' +storages.checkTypeData(str(self._datum[k]))
else:
if self._condition is None:
self.column += k+', '
self.value += storages.checkTypeData(str(self._datum[k]))+', '
else:
self.value += k + ' = ' +storages.checkTypeData(str(self._datum[k]))+', '
def insert(self):
return self.SPECIFICATION_INSERT
def update(self):
return self.SPECIFICATION_UPDATE
def into(self):
return self._grants
def set(self):
return self._grants + ' ' + self.SPECIFICATION_SET + '' + self.value
def columns(self):
return ' ('+self.column+') '
def values(self):
return self.SPECIFICATION_VALUES+ '('+self.value+') RETURNING '+self._seq
def where(self, _tabs = None):
if _tabs is None:
return self.SPECIFICATION_WHERE +" "+self._condition+" RETURNING " +self._seq
else:
return self.SPECIFICATION_WHERE + " "+_tabs
def delete(self):
return self.SPECIFICATION_DELETE + " " + self._grants
\ No newline at end of file
import requests
import json
from app.models.param import param
from flask import Flask, render_template, Blueprint
registerapp = Blueprint('registerapp', __name__)
class jsondata():
@registerapp.route('/tester/loadroleuser')
def loadRoleUser():
from app.models.querys.tester import tester
res = tester.getrole()
return res
@registerapp.route('/tester/loadlistuser')
def loadListUser():
from app.models.querys.tester import tester
res = tester.getuser()
return res
class viewrenders():
@registerapp.route('/app/')
def onmodule():
return render_template('registerapp.html')
import requests
import json
from app.models.param import param
from flask import Flask, render_template, Blueprint
tester = Blueprint('tester', __name__)
class jsondata():
@tester.route('/tester/loadroleuser')
def loadRoleUser():
from app.models.querys.tester import tester
res = tester.getrole()
return res
@tester.route('/tester/loadlistuser')
def loadListUser():
from app.models.querys.tester import tester
res = tester.getuser()
return res
class viewrenders():
@tester.route('/tester/')
def tester():
return render_template('tester.html')
This diff is collapsed.
uwu
\ No newline at end of file
...@@ -13,4 +13,4 @@ def setup(): ...@@ -13,4 +13,4 @@ def setup():
if __name__ == "__main__": if __name__ == "__main__":
port = app.config['PORT'] port = app.config['PORT']
debug = app.config['DEBUG'] debug = app.config['DEBUG']
app.run(port=port, debug=debug) app.run(port=port, debug=True)
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment