core_operations
"""
Copyright (C) 2020 New Entity Operations Inc.
ALL RIGHTS RESERVED
core_operations is the default helper stack for C.ORE containing useful classes and methods
Copyright 2020 (C) Ryan McKenna All Rights Reserved
Operations File: This should have any periodical operation methods inside
Load in the dependency Files
Main utils: (A)
Import additional utility functions
"""
## Imports: Standard
from argparse import (
ArgumentParser,
)
from glob import (
glob,
)
from hashlib import (
sha512,
)
from os import (
chdir, environ, getcwd, listdir, mkdir, path, remove, stat, system, walk
)
# Imports: Standard->from
from tempfile import NamedTemporaryFile
from zipfile import ZipFile
from time import sleep
from shutil import copyfile, make_archive, rmtree
from pathlib import Path
from datetime import datetime
## Estabilsh archive types
import gzip
import tarfile
# ABOUT Function Here: (C)
from core_middlelayer import (
# AirEP
ABOUT_ACCESS, ABOUT_ASSET, ABOUT_DS, ABOUT_ENTITY, ABOUT_IDENTITY,
ABOUT_RING, ABOUT_SUPPORTING,
ACCESSPATH, ACTIVEES,
ALLOWED_LANG_BASH, ALLOWED_LANG_PHP,
ALLOWED_LANG_PYTHON, ALLOWED_LANG_RUBY,
ARCHIVETYPE,
AUTHORS,
CHANGELOG,
chosen_entity_path,
CONFIGINI, CONFIGINIBACKUP,
CONSTRUCTEDCOREES, CONTEXTENGINE, CONTROLS, COREES,
DEFAULT_CHAR_OUTPUT,
DIRCONFIG, DIRDATA, DIRDOCS, DIRLICENSES, DIRLOCATION,
INSTALLED,
LICENSE, LOCATIONSOURCE,
malware_guard_clam_log, malware_guard_event_log, malware_guard_inotify_log,
***********************
LEFT OUT OF PREVIEW BUILD
*********, malware_guard_runner_slug,
MASTERLISTPATH, MEMBERFILE, MY_SYSTEM_HOME,
NAMEARCHIVE, PACKAGES,
proxy_cache_slug, proxy_log_slug, proxy_path, proxy_path_runner_slug,
README, RINGENTITY_LOC,
SLUGARCHIVE, SLUG_BROWSEMEH, SLUG_ENTITY_LEDGER,
USAGE,
USR_SYSTEM_LOCATION,
VAR_LOG_LOCATION,
VIRTUALPACKAGE
)
from OPENPACKAGER.internal.INDEX_ORGANIZER.KEY_LISTER_INDEX \
import (
KEY_WALKER
)
class KEY_TEMP:
key_holder_plain = []
## Suppress Loader functions during import
#def main():
# print("A MAIN FUNCTION")
#if __name__ == 'core_operations':
# main()
## Class Overview
class Structure:
"""
Generate your current STRUCTURE.es file in the [DATADIR].
In this case, in ~/.CORE/DATA/
"""
def generate():
"""
'generate' STRUCTURE.es' and exclude the following
directories that are considered 'non-essential' or
personal
There is also a system-level function on top of this
"""
# Generate th without personalized data
# This funciton can be run anywhere from a bash alias
structure='find . -type d \
-not -path "./__pycache__" \
-not -path "./core-env/*" \
-not -path "./CONFIG/*" \
-not -path "./DATA/LOGS/*" \
-not -path "./DOCUMENTATION/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINE_DATA/vpn/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINES/RECOVERY/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINES/STARTING/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINES/EXPLORER_RIG/RIG_VM/Virtual_Hard_Disks/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINES/EXPLORER_RIG/RIG_VM/Virtual_Machines/*" \
-not -path "./FILTER/\|_\|' \
'FILTER_MACHINES/MACHINES/EXPLORER_RIG/universe/*" \
-not -path "./IDENTITY/PHOTO/background/*" \
-not -path "./IDENTITY/PHOTO/media/*" \
-not -path "./LEARNING/RRR_RECOVERY/*" \
-not -path "./LEARNING/ZZZ_DEPRECATED/*" \
-not -path "./LEARNING/TOPICS/*" \
-not -path "./OPENPACKAGER/corehost/*" \
-not -path "./OPENPACKAGER/jupyter/*" \
-not -path "./RING/BACKUP/*" \
-not -path "./RING/DOCUMENTS/*" \
-not -path "./RING/ENTITY/FIGMENTS/*" \
-not -path "./RING/PROGRAMS/AIREP/*" \
-not -path "./RING/PROGRAMS/COLLECTIONS/*" \
-not -path "./RING/PROGRAMS/LINUX/*" \
-not -path "./RING/PROGRAMS/WINE/*" \
| sort \
> DATA/STRUCTURE.es'
def generate_default():
"""
Generate the default folder structure.
This is stored in 'core_build.py'
If the folder structure is not empty, alert!
Disabled by default
"""
# import core_build
pass
class GLOBAL_ALIAS_OPTIONS:
"""
Operate the Global Member settings
"""
def entity_form():
# If you don't want to modify/create something, 0 will pass to the loops end.
ALIAS_Add = int(input(
"Would you like to add an ALIAS? (1) for Yes - (0) for No"
))
if ALIAS_Add == 0:
pass
# If you want to create something, 1 will bring you into the creator.
elif ALIAS_Add == 1:
# Setup your global to export the chosen_entity_path at the end of the loop.
global chosen_entity_path
# All standard options can be configured automatically at the end of this
# block will take you to the manual setting. Exact adherence to the structure is
# required for operations until regex is used to clean the inputs
# If you want to fall back to your automation routine, select 1
# Else, if you want to adhere to the format, select 0 and proceed.
auto_add = int(input(
"Would you like to automate this process? : 1 for Yes - 0 for No"
))
if auto_add == 0:
MEMBERPATH = ACCESSPATH+MASTERLISTPATH
# Establish the base folders here
ext_path_context = INSTALLED+CONTEXTENGINE
ext_path_controls = INSTALLED+CONTROLS
ext_path_virtual_package = INSTALLED+VIRTUALPACKAGE
# Ask the questoin that states the creator
entity_type_selector = input(
"Would you like to generate this process for... " \
": (a) for Alias OR {for ENTITY: (b) Context - (c) Figments - (d) Human - (e) Location}?"
)
if entity_type_selector == 'a':
alias_type_selector = input(
"What are you doing? : (a) for Add Member - (b) for Modify your VCN Alias List (VCN)"
)
# Outline relevant access lists
chosen_member_path = MEMBERPATH
member_file = MEMBERFILE
ext_path = chosen_member_path+member_file
# Start your game-logic
if alias_type_selector == 'a':
# make sure Alias writes to the correct folder
print('Add Member to: '+ext_path)
# The rest of the routine logic goes here to write the member in, but will
# require you to enter admin key on your machine to make the file
# writeable, which requires it to be unmuted.
elif alias_type_selector == 'b':
print('Modify VCN Alias List (VCN) at location: '+ext_path)
# The rest of the modify logic loop would go here. Which follows a similar pattern
# from above.
else:
print('Please, only a or b')
elif entity_type_selector == 'b':
alias_type_selector = input(
"What type of entity are you creating? " \
": (a) Context Engine - (b) Controller - (c) Virtual Package"
)
if alias_type_selector == 'a':
print('Creating a Context Engine at path: '+
chosen_entity_path+ext_path_context
)
elif alias_type_selector == 'b':
print('Creating a Controller at path: '+
chosen_entity_path+ext_path_controller
)
elif alias_type_selector == 'c':
print('Creating a Virtual Package at path: '+
chosen_entity_path+ext_path_virtual_package
)
else:
print('Please, only a, b, or c')
elif entity_type_selector == 'c':
alias_type_selector = input(
"What are you doing? : a for Quick Format - b for Staging"
)
chosen_entity_path = 'FIGMENTS/'
if alias_type_selector == 'a':
ext_path = 'QUICK_FORMAT/'
# print('Add a Quick format datasript sheet.')
elif alias_type_selector == 'b':
ext_path = 'STAGING/'
# print('Add a datascript sheet to staging.')
else:
print('Please, only a or b')
elif entity_type_selector == 'd':
alias_type_selector = input(
"What type of Human are you adding? a for Contact - b for Ficticious - c for a Person"
)
chosen_entity_path = 'HUMAN/'
if alias_type_selector == 'a':
ext_path = 'CONTACT/'
# print('Add contact sheet')
elif alias_type_selector == 'b':
ext_path = 'FICTICIOUS/'
# print('Add a ficticious character sheet')
elif alias_type_selector == 'c':
ext_path = 'PERSON/'
# print('Add a personal contact sheet')
else:
print('Please, only a, b, or c')
elif entity_type_selector == 'e':
alias_type_selector = input(
"Are you adding a picture's geo-location? If so, what type? : a for JSON geo-location data"
)
chosen_entity_path = \
'ALIAS/LOCATION/pic_json_data/'
if alias_type_selector == 'a':
ext_path = ''
# print('Add JSON geo-location tag')
else:
print('Only JSON supported for now')
# fallback block
# Automation Block: Blank for the time being
elif auto_add ==1:
pass
else:
# If you put in anything else you'll fall into the else block and restart.
print("Please, only 0 or 1")
class GLOBAL_ENTITY_OPTIONS:
"""
Operate the Global Entity settings
"""
def entity_form():
# If you don't want to modify/create something, 0 will pass to the loops end.
ALIAS_Add = int(input(
"Would you like to create or modify an 'ENTITY'? (1) for Yes - (0) for No")
)
if ALIAS_Add == 0:
pass
# If you want to create something, 1 will bring you into the creator.
elif ALIAS_Add == 1:
# Setup your global to export the chosen_entity_path at the end of the loop.
global chosen_entity_path
# All standard options can be configured automatically at the end of this
# block will take you to the manual setting. Exact adherence to the structure is
# required for operations until regex is used to clean the inputs
# If you want to fall back to your automation routine, select 1
# Else, if you want to adhere to the format, select 0 and proceed.
auto_add = int(input(
"Would you like to automate this process? : 1 for Yes - 0 for No")
)
if auto_add == 0:
MEMBERPATH = ACCESSPATH+MASTERLISTPATH
# Establish the base folders here
ext_path_context = INSTALLED+CONTEXTENGINE
ext_path_controls = INSTALLED+CONTROLS
ext_path_virtual_package = INSTALLED+VIRTUALPACKAGE
# Ask the questoin that states the creator
entity_type_selector = input(
"Would you like to generate this process for... " \
": (a) for Alias OR {for ENTITY: (b) Context - (c) Figments - (d) Human - (e) Location}?"
)
if entity_type_selector == 'a':
alias_type_selector = input(
"What are you doing? : (a) for Add Member - (b) for Modify your VCN Alias List (VCN)"
)
# Outline relevant access lists
chosen_member_path = MEMBERPATH
member_file = MEMBERFILE
ext_path = chosen_member_path+member_file
# Start your game-logic
if alias_type_selector == 'a':
# make sure Alias writes to the correct folder
print('Add Member to: '+ext_path)
# The rest of the routine logic goes here to write the member in, but will
# require you to enter admin key on your machine to make the file
# writeable, which requires it to be unmuted.
elif alias_type_selector == 'b':
print('Modify VCN Alias List (VCN) at location: '+ext_path)
# The rest of the modify logic loop would go here. Which follows a similar pattern
# from above.
else:
print('Please, only a or b')
elif entity_type_selector == 'b':
alias_type_selector = input(
"What type of entity are you creating?" \
": (a) Context Engine - (b) Controller - (c) Virtual Package"
)
if alias_type_selector == 'a':
print('Creating a Context Engine at path: '+chosen_entity_path+ext_path_context)
elif alias_type_selector == 'b':
print('Creating a Controller at path: '+chosen_entity_path+ext_path_controller)
elif alias_type_selector == 'c':
print('Creating a Virtual Package at path: '+chosen_entity_path+ext_path_virtual_package)
else:
print('Please, only a, b, or c')
elif entity_type_selector == 'c':
alias_type_selector = input(
"What are you doing? : a for Quick Format - b for Staging"
)
chosen_entity_path = 'FIGMENTS/'
if alias_type_selector == 'a':
ext_path = 'QUICK_FORMAT/'
# print('Add a Quick format datasript sheet.')
elif alias_type_selector == 'b':
ext_path = 'STAGING/'
# print('Add a datascript sheet to staging.')
else:
print('Please, only a or b')
elif entity_type_selector == 'd':
alias_type_selector = input(
"What type of Human are you adding? a for Contact - b for Ficticious - c for a Person"
)
chosen_entity_path = 'HUMAN/'
if alias_type_selector == 'a':
ext_path = 'CONTACT/'
# print('Add contact sheet')
elif alias_type_selector == 'b':
ext_path = 'FICTICIOUS/'
# print('Add a ficticious character sheet')
elif alias_type_selector == 'c':
ext_path = 'PERSON/'
# print('Add a personal contact sheet')
else:
print('Please, only a, b, or c')
elif entity_type_selector == 'e':
alias_type_selector = input(
"Are you adding a picture's geo-location?" \
"If so, what type? : a for JSON geo-location data"
)
chosen_entity_path = \
'ALIAS/LOCATION/pic_json_data/'
if alias_type_selector == 'a':
ext_path = ''
# print('Add JSON geo-location tag')
else:
print('Only JSON supported for now')
# fallback block
# Automation Block: Blank for the time being
elif auto_add ==1:
pass
else:
# If you put in anything else you'll fall into the else block and restart.
print("Please, only 0 or 1")
## Class: Error checker
def ErrorStateMachine(object_option='a'):
"""
Basic state checker and logging machine
"""
if object_option=='a':
"""
Default 'object_option' but plans are to expand this eventually
A possible refactored interface will be available soon
"""
from core_alerts import UNITTEST_ALL
print(UNITTEST_ALL.absorb())
elif object_option=='b':
"""
Check against a standard critera for cert 'ALERT'
status messages.
"""
from core_alerts import (
ALERT_LOGGING_INFO,
ALERT_LOGGING_WARNING,
ALERT_LOGGING_FAILURE
)
print(ALERT_LOGGING_INFO.absorb())
print(ALERT_LOGGING_WARNING.absorb())
print(ALERT_LOGGING_FAILURE.absorb())
elif object_option=='c':
"""
Check the configuration of your .entity formats and .ds
'datascript' files
"""
from core_alerts import LOADER_DS
from core_alerts import LOADER_ENTITY
print(LOADER_DS.absorb()+" : "+LOADER_ENTITY.absorb())
else:
pass
## Skipper can be applied to skip certain unit functions with the pass param
skipper = ErrorStateMachine(object_option='pass')
## Write a new configuration file
def config_clone():
"""
Copy a config.ini file from the DIRCONFIG location
to the backup DIRCONFIG+config_BACKUP.ini
"""
copyfile(
DIRCONFIG+CONFIGINI, DIRCONFIG+CONFIGINIBACKUP
)
def archive_entities():
"""
Create a tar.gz backup of your ENTITY db
"""
archive_name_ENTITY = path.expanduser(
path.join(SLUGARCHIVE, NAMEARCHIVE)
)
archive_root_ENTITY = path.expanduser(
path.join(LOCATIONSOURCE, DIRLOCATION)
)
make_archive(archive_name_ENTITY,
ARCHIVETYPE,
archive_root_ENTITY
)
## Setup the argument parser
class PARSER_reseed_entities:
"""
PARSER_ classes are followed by an import skipping mechanism
This allows you to skip certain processes at import
Passing the global parser options will potentially activate this
"""
if __name__ == 'core_operations':
pass
else:
def reseed_entities():
"""
Reseed allows you to organize a file setup structure using external
programming environments
"""
****************
LEFT OUT OF PREVIEW BUILD
****************
# Establish part 1 of the teardown funciton by creating a reseed manifest
maintainme_files_proxy = [
# squid
proxy_path+proxy_log_slug,
proxy_path+proxy_cache_slug
]
maintainme_files_maldetect = [
# clamscan
USR_SYSTEM_LOCATION+malware_guard_clam_log, \
USR_SYSTEM_LOCATION+malware_guard_event_log, \
USR_SYSTEM_LOCATION+malware_guard_inotify_log
]
def reseed_proxy():
"""
proxy reseed
Custom teardown and setup function follows reseed_*TEARDOWNTYPE*
"""
try:
for x in maintainme_proxy_files:
remove(x)
print("success")
except:
PermissionError
print(
"ALERT: proxy files -> " \
"You need elevated permissions to proceed, dropping to shell-script"
)
try:
system(runner1)
print("STATUS: Complete")
except:
FileNotFoundError
print("STATUS: No action taken")
***************
LEFT OUT OF PREVIEW BUILD
***************
try:
system(runner2)
print("STATUS: Complete")
except:
FileNotFoundError
print("STATUS: No action taken")
def reseed_nova():
"""
reseed_*ITEM* is the setup function for whatever you name your
default backup routine. This links together the teardown list
"""
reseed_proxy()
******************
LEFT OUT OF PREVIEW BUILD
******************
reseed_nova()
def maintainmesummary():
# Summary Section
******************
LEFT OUT OF PREVIEW BUILD
******************
print(stat(PROXY_log_dir))
for dirpath, dirnames, filenames in walk(PROXY_log_dir):
print('Current Path: ', dirpath)
print('Directories: ', dirnames)
print('Files: ', filenames)
print('---')
last_mod_time = stat(proxy_log_slug).st_mtime
print(datetime.fromtimestamp(last_mod_time))
maintainmesummary()
reseed_entities()
#*******************************#
# START: STEP 4: CUSTOM OPTIONS #
#*******************************#
# Output functions
print(
"---------- START: STEP 4 - CUSTOM OPTIONS ----------"
)
print(
'------ ARGS -------'
)
print(
"--reseed=a was provided to the argument parser"
)
print(
'------ DONE -------'
)
print(
"---------- STOP: STEP 4 - CUSTOM OPTIONS ----------\n"
)
#******************************#
# END: STEP 4 - CUSTOM OPTIONS #
#******************************#
def P():
#*********************************#
# START: STEP 4: - CUSTOM OPTIONS #
#*********************************#
"""
Default pass
"""
print(
"--------- START: STEP 4 - CUSTOM OPTIONS ----------"
)
print(
'------ ARGS -------'
)
print(
'No arguments were supplied'
)
print(
'------ DONE -------'
)
print(
"---------- END: STEP 4 - CUSTOM OPTIONS ----------\n"
)
#*************#
# END: STEP 4 #
#*************#
# Parse Entity: Visual Output
def PARSE_ENTITY(object):
"""
Parse a .entity script utilizing csv logic.
.entity files must double quote both headings and blocks in order to run
advanced options
"""
# import csv
AcceptedFileTypes = ["**/*"+object]
file_input_mechanism = []
entity_summary_list = []
try:
for file_accepted in AcceptedFileTypes:
file_input_mechanism.extend(glob(
RINGENTITY_LOC+file_accepted, recursive=True
))
for EntityFile in file_input_mechanism:
entity_summary_list.append(EntityFile)
try:
with open(entity_summary_list[0]) as f:
with open(DIRDATA+ACTIVEES, 'w') as AE:
for row in f:
AE.write(row)
AE.write("-" * int(DEFAULT_CHAR_OUTPUT)+"\n")
AE.close()
f.close()
except:
with open(DIRDATA+ACTIVEES, 'w') as AE:
AE.write("The file wasn't found.")
AE.close()
except:
IndexError
with open(DIRDATA+ACTIVEES, 'w') as AE:
AE.write("Couldn't locate the Datascript file.")
AE.close()
class Binary:
class writer:
pass
# from struct import Struct
# import time
#
# def write_string(records, format, f):
# """
# Binary Tuple Writer
# """
# record_struct = Struct(format)
# for r in records:
# f.write(record_struct.pack(*r))
# def read_records(format, f):
# record_struct = Struct(format)
# chunks = iter(lambda: f.read(record_struct.size), b'')
# return (record_struct.unpack(chunk) for chunk in chunks)
#if __name__ == '__main__':
# Write
#records = [(1, 22, 49),(43, 2, 22),(45, 89, 3)]
# with open('data.b', 'wb') as f:
# write_string(records, ' for big endian or ! for network byte order, and > for little endian
#record_struct_a = Struct('idd', 1, 30, 22.4)
#struct.unpack('>idd', _)
#f = open('data.b', 'rb')
#chunks = iter(lambda: f.read(20), b'')
#chunks
#for chk in chunks:
# print(chk)
# You can also create a generator expression
#def read_records_b(format, f):
# record_struct_b = Struct(format)
# while True:
# chk = f.read(record_struct_b.size)
# if chk == b'':
# break
# yield record_struct_b.unpack(chk)
# QR CODE GENERATION ON PUSH
# 1. Alert or message type delivered
# 2. Interest and engagement attained
# 3. Button is pushed to generate QR link
# 4. Link is generated, engagement count increases by 1
# 5. Link is stored in the lookup table for additional engagement
# 6. QR code is archived for X time
# 7. Are you done style confirmation with X time out follows
# 8. Engagement ended
# Extensible Assistant v.0.1
#class printAlpha:
# def addName(self, name):
# self.name=name
# def listerName(self):
# return(self.name)
# def utilizedName(self):
# print("""This is a utilized name: %s""" % self.name)
#
#class HumanBuilder():
# def __init__(self, name, gender, trine):
# self.name = name
***********
LEFT OUT OF PREVIEW BUILD
***********
#
#def add(a, b):
# return a + b
#information.values()
# early stage prototype idea below
#{score if x >= 515 else score):(print('
# You will probably be admitted') if score >= \
# 515 else print('This will be hard')): value for score, value in information.items()}
## Establish an argument parser (ap)
# This section is setup upon each import too. The above functions will run
# only after the __name__ convention is satisfied to prevent 'active' or
# 'state-changing' functions from running everytime you want to import the
# main module. In this case 'core_operations'
ap = ArgumentParser(
description="Run programs"\
"with 'core_operate' by utilzing Argument Parsing options."
)
ap.add_argument('--reseed',
help='Reseed desired file types'
)
# Setup the representative string versions for comparison
ARGUMENT_LOGIC = ap.parse_args()
REPRESENTED_LOGIC = str(ARGUMENT_LOGIC)
# Sanity Testers
#print(ARGUMENT_LOGIC)
#print(REPRESENTED_LOGIC)
if REPRESENTED_LOGIC == "Namespace(reseed=None)":
"""
The default ARGS runner will run at every import
"""
P()
elif REPRESENTED_LOGIC == None:
print(
'------ ARGS -------'
)
print(
"No Namespace was provided."
)
print(
'------ DONE -------'
)
elif REPRESENTED_LOGIC == "Namespace(reseed='a')":
PARSER_reseed_entities()
else:
print(
'------ ARGS -------'
)
print(
"Unknown option supplied"
)
print(
'------ DONE -------'
)
## Verbose Read Dir
class VerboseAboutDir:
"""
Get more information about and command options for method or object
"""
pass
#dir_to_inspect = 'os'
#for item in dir(dir_to_inspect):
# print('#',item,':', item)
# This is the resulted format we're looking for on an unpacked loop
# print('#__breakpointhook__:', sys.__breakpointhook__)
class GenerateEntityLedger:
def clean_entity_list():
ENTITY_LEDGER_FILE = \
RINGENTITY_LOC+SLUG_ENTITY_LEDGER
blank = \
""
datascript_id = \
"datascript_id = ['"
title = \
"title = ['"
package_type = \
"package_type = ['"
package_id = \
"package_id = ['"
DLIST = \
"DLIST = ['"
#entity_summary_file.append(title+',
# '+DLIST+', '+datascript_id+', '+package_type', '+package_id+'\n')
AcceptedFileTypes = ['**/*.entity']
file_input_mechanism = []
entity_summary_list = []
for file_accepted in AcceptedFileTypes:
file_input_mechanism.extend(glob(
RINGENTITY_LOC+file_accepted, recursive=True
))
file_input_mechanism.sort()
for EntityFile in file_input_mechanism:
entity_summary_list.append(EntityFile)
# Sanity Test: Passing...
print(entity_summary_list)
entity_summary_list_clean = []
# Entity Snapshot
for file in entity_summary_list:
with open(file, 'r') as f:
local_list = []
for line in f:
if datascript_id in line:
line_needed = line
cleaned_line = line_needed.replace(datascript_id,blank)
formatted_line = cleaned_line.replace("']",blank)
field_result = formatted_line
local_list.append(str(field_result))
elif DLIST in line:
line_needed = line
cleaned_line = line_needed.replace(DLIST,blank)
formatted_line = cleaned_line.replace("']",blank)
field_result = formatted_line
local_list.append(str(field_result))
elif title in line:
line_needed = line
cleaned_line = line_needed.replace(title,blank)
formatted_line = cleaned_line.replace("']",blank)
field_result = formatted_line
local_list.append(str(field_result))
elif package_type in line:
line_needed = line
cleaned_line = line_needed.replace(package_type,blank)
formatted_line = cleaned_line.replace("']",blank)
field_result = formatted_line
local_list.append(str(field_result))
elif package_id in line:
line_needed = line
cleaned_line = line_needed.replace(package_id,blank)
formatted_line = cleaned_line.replace("']",blank)
field_result = formatted_line
local_list.append(str(field_result))
else:
pass
local_list.sort()
entity_summary_list_clean.append(local_list)
f.close()
# print(entity_summary_list_clean)
#for i in entity_summary_list_clean:
# print(i)
entity_summary_list_clean.sort()
with open(RINGENTITY_LOC+ENTITY_LEDGER_SLUG, 'w') as f:
for i in entity_summary_list_clean:
f.write(str(i)+'\n')
f.close()
## GenerateEntityLedger.clean_entity_list()
# Make a Verbose Index of your Entity Library
def generate_entity_library():
"""
Scans your active .entity files and makes an archive of them
in the file DS0000000.ds, with all relevant items in entity form
"""
pass
def check_entity_structure():
"""
Scans all .entity files listed in DS0000000.ds and checks each for
schematics
"""
pass
## Make a Verbose Index of your Entity Library
def generate_entity_list():
"""
Scans active category lists and creates a sub-list for entity scanning
This takes the
"""
pass
def append_to_entity_list():
"""
Takes an entity list, 'el' and adds an entry to the entity list file
"""
pass
def delete_from_entity_list():
"""
Takes an entity list, 'el' and deletes an entry to the entity list file
at a defined position
"""
pass
def modify_from_entity_list():
"""
Takes an entity list, 'el' and modifies an entry at the
defined position, 'p'
"""
pass
## Archive Handling: gzip
class ArchiveHandler:
"""
Control various types of archives
"""
def archive_TEMP_UNZIP():
"""
Build and operate 'on the fly' archive extractions
"""
root_directory = \
'.'
temp_directory = \
SLUG_BROWSEMEH
temp_DIR_NAME = 'temp1'
temp_PATH = Path(temp_directory)
temp_FILE = NamedTemporaryFile(delete=False)
dir(temp_FILE)
print(temp_FILE)
print(temp_FILE.name)
FILE_TO_OPEN = input(
"What file would you like to open? "
)
open_temp_FILE = FILE_TO_OPEN
open_temp_FILE_path = Path(open_temp_FILE)
ext_DO = open_temp_FILE_path.name
global TEMP_FILE
if ext_DO.lower().endswith('.zip'):
"""
Unpack a .zip file into a temp directory
"""
#if TEMP_FILE is not None:
# pass
#else:
# print("Starting file extraction...")
#sleep(2)
print("Unpacking a zip archive into a temp directory.")
with ZipFile(open_temp_FILE, 'r') as TEMP_FILE:
print(TEMP_FILE)
chdir(temp_PATH)
print(temp_FILE.name)
# TEMP_FILE.extractall(path=temp_FILE)
TEMP_FILE.extractall(temp_DIR_NAME)
sleep(2)
# For automatic deleting of the temp directory
# shutil.rmtree('temp1', ignore_errors=True)
print("Done...")
#Close the temp file automatically
TEMP_FILE.close()
elif ext_DO.lower().endswith('.tar'):
"""
Unpack a .gzip file into a temp directory
"""
#if TEMP_FILE is not None:
# pass
#else:
# print("Starting file extraction...")
#sleep(2)
with tarfile.open(open_temp_FILE, 'r') as TEMP_FILE:
print(TEMP_FILE)
chdir(temp_PATH)
print(temp_FILE.name)
# TEMP_FILE.extractall(path=temp_FILE)
TEMP_FILE.extractall(temp_DIR_NAME)
sleep(2)
# For automatic deleting of the temp directory
# shutil.rmtree('temp1', ignore_errors=True)
print("Done...")
#Close the temp file automatically
TEMP_FILE.close()
elif ext_DO.lower().endswith('.gz'):
"""
Unpack a .gzip file into a temp directory
"""
#if TEMP_FILE is not None:
# pass
#else:
# print("Starting file extraction...")
#sleep(2)
with tarfile.open(open_temp_FILE, 'r') as TEMP_FILE:
print(TEMP_FILE)
chdir(temp_PATH)
print(temp_FILE.name)
# TEMP_FILE.extractall(path=temp_FILE)
TEMP_FILE.extractall(temp_DIR_NAME)
sleep(6)
# For automatic deleting of the temp directory
# shutil.rmtree('temp1', ignore_errors=True)
print("Done...")
# Close the temp file automatically
TEMP_FILE.close()
# Temp directory
# elif ext_DO.lower().endswith('.tar.gz'):
# with open(open_temp_FILE, 'rb') as TEMP_FILE:
# print(TEMP_FILE)
# chdir(temp_PATH)
# print(temp_FILE.name)
# gz_contents = TEMP_FILE.read()
# temp_GZ_FOLDER = 'temp1'
# mkdir(temp_GZ_FOLDER)
# chdir(temp_GZ_FOLDER)
# print(gz_contents)
# sleep(6)
# Navigate back to the original folder to delete temp1
# chdir('../')
# shutil.rmtree(temp_GZ_FOLDER, ignore_errors=True)
# print("Done...")
# TEMP_FILE.close()
else:
"""
Don't allow files of undefined 'type'
"""
print(
"Only .tar, .tar.gz, and .zip archive types included at this time."
)
def close_temp():
root_directory = '.'
temp_directory = \
SLUG_BROWSEMEH
temp_DIR_NAME = TEMP_DIR_NAMING_SLUG
temp_PATH = Path(temp_directory)
temp_FILE = NamedTemporaryFile(delete=False)
try:
if TEMP_FILE is not None:
TEMP_FILE.close()
# For automatic deleting of the temp directory
chdir(temp_PATH)
try:
rmtree(temp_DIR_NAME, ignore_errors=True)
print("Folder tree cleared...")
except FileNotFoundError:
print("TEMP_FILE isn't currently populated")
else:
print("TEMP_FILE is not defined")
except NameError:
print("TEMP_FILE is not defined")
## Get Operations Below
# V2TEMPEST WATERFALL: get_*category*_*NAME_OF_FUNCTION*
# Release output - Documentation
# README
def get_information_README():
"""
Read and Display the README.ds
"""
print(DIRDOCS+README+": ")
with open(DIRDOCS+README, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+README)
f.close()
## changelog
def get_information_CHANGELOG():
"""
Read and Display the changelog.ds
"""
print(DIRDOCS+CHANGELOG+": ")
with open(DIRDOCS+CHANGELOG, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+CHANGELOG)
f.close()
## Get CORE logs
def GET_local_COREES_LOG():
line_bucket = []
with open(DIRDATA+COREES, 'r') as f:
for line in f:
line_bucket.append(line)
f.close()
line_bucket.reverse()
return(line_bucket[:])
def GET_global_CONSTRUCTEDCOREES_LOG():
line_bucket = []
with open(DIRDATA+CONSTRUCTEDCOREES, 'r') as f:
for line in f:
line_bucket.append(line)
f.close()
line_bucket.reverse()
return(line_bucket[:])
## allowed C.ORE languages
class get_information_ALLOWED_LANGUAGES():
def allowed_lang_BASH():
"""
Allowed Programming Language: Bash
"""
print(ALLOWED_LANG_BASH)
def allowed_lang_PHP():
"""
Allowed Programming Languge: PHP
"""
print(ALLOWED_LANG_PHP)
def allowed_lang_PYTHON():
"""
Allowed Programming Language: Python
"""
print(ALLOWED_LANG_PYTHON)
def allowed_lang_RUBY():
"""
Allowed Programming Language: Ruby
"""
print(ALLOWED_LANG_RUBY)
## AIREP
def get_information_ABOUT_ACCESS():
"""
ABOUT_ACCESS.ds
"""
print(DIRDOCS+ABOUT_ACCESS+": ")
with open(DIRDOCS+ABOUT_ACCESS, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_ACCESS)
f.close()
def get_information_ABOUT_IDENTITY():
"""
ABOUT_IDENTITY.ds
"""
print(DIRDOCS+ABOUT_IDENTITY+": ")
with open(DIRDOCS+ABOUT_IDENTITY, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_IDENTITY)
f.close()
def get_information_ABOUT_RING():
"""
ABOUT_RING.ds
"""
print(DIRDOCS+ABOUT_RING+": ")
with open(DIRDOCS+ABOUT_RING, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_RING)
f.close()
## Additional
def get_information_ABOUT_ASSET():
"""
ABOUT_ASSET.ds
"""
print(DIRDOCS+ABOUT_ASSET+": ")
with open(DIRDOCS+ABOUT_ASSET, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_ASSET)
f.close()
def get_information_ABOUT_SUPPORTING():
"""
SUPPORTING.ds
"""
print(DIRDOCS+ABOUT_SUPPORTING+": ")
with open(DIRDOCS+ABOUT_SUPPORTING, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_SUPPORTING)
f.close()
## Formats
def get_information_ABOUT_DS():
"""
ABOUT_DS.ds
"""
print(DIRDOCS+ABOUT_DS+": ")
with open(DIRDOCS+ABOUT_DS, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_DS)
f.close()
def get_information_ABOUT_ENTITY():
"""
ABOUT_ENTITY.ds
"""
print(DIRDOCS+ABOUT_ENTITY+": ")
with open(DIRDOCS+ABOUT_ENTITY, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+ABOUT_ENTITY)
f.close()
## Usage
def get_information_USAGE():
"""
USAGE.ds
"""
print(DIRDOCS+USAGE+": ")
with open(DIRDOCS+USAGE, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+USAGE)
f.close()
# #Packages
def get_information_CORE_PACKAGES():
"""
Get all additionally installed env-packages
"""
print(DIRDATA+PACKAGES+": ")
with open(DIRDATA+PACKAGES, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+PACKAGES)
f.close()
## Print your copyright information
def get_licenses_LICENSE():
"""
Return the LICENSE of the full program
"""
print(DIRLICENSES+LICENSE+": ")
with open(DIRLICENSES+LICENSE, 'r') as f:
try:
for line in f:
print(line)
except:
FileNotFoundError
print("Unable to locate "+LICENSE)
f.close()
class AlienVault:
def get_ALIEN_VAULT_CODES(HASHED_LOGIN_KEY):
vault_1 = sha512()
vault_1
#
encryptor = HASHED_LOGIN_KEY.encode()
vault_1.update(encryptor)
# vault_1#
#
sent = vault_1.hexdigest()
# return(sent)
if sent == KEY_TEMP.key_holder_plain[0]:
return True
KEY_TEMP.key_holder_plain.clear()
else:
return False
KEY_TEMP.key_holder_plain.clear()
def make_ALIEN_VAULT_CODES(HASHED_LOGIN_KEY):
vault_1 = sha512()
vault_1
#
encryptor = HASHED_LOGIN_KEY.encode()
vault_1.update(encryptor)
# vault_1#
#
sent = vault_1.hexdigest()
return(sent)
class PathRetrieverBrowsemeh:
def get_CONSTRUCTED_PATH():
"""
Return an active tuple of the broken apart path
for 'core_FRONTEND'
"""
p = Path(SLUG_BROWSEMEH)
z = p.absolute()
z.parts
#for part in z.parts:
# print(part)
# return(z.parts)
return(z.as_uri())
Return HOME