Source code for pyautomail.manager

"""This module is responsible for managing the pyautomail pkg.
It has below commands:

1. register: register a new contact list and a new process this command will get below arguments:
    - username: the email address of the sender
    - contacts: the path to the csv file which contains the contacts
    - name(optional): an arbitrary name for the process
    - subject(optional): the subject of the email
    - template(optional): the path to the template file
    - cpdf(optional): this is a flag that if it is set, the pdf file will be customized for each contact based on the \
    path to the pdf file which is in the contacts csv file in the column 'cpdf'
    - attachment(optional): the path to the attachment file if you want to attach a same file to all emails
    - pdf_dir(optional): the path to the directory which contains the pdf files if you want to use cpdf flag
2. start: this command will start a process and get the process id as argument you can find the process id in the list\
 of processes by: `pyautomail list` command
3. list: list processes
4. stop: this command will stop one specific process by process id
5. resume: this command will resume one specific process by process id
"""
import argparse
from pyautomail.storage import Record, Process, get_session, create_tables
import datetime
import pandas as pd
import os
import getpass
from pyautomail import EmailSender
from pyautomail.utils import init_logger
import time
# TODO: add a command to delete a process
# TODO: add a command to delete a record
# TODO: table for process in process_list function
# TODO: table for record
# TODO: add a command to edit a process
# TODO: add a command to edit a record
# TODO: create a logger pkg

__all__ = ['registration', 'start', 'stop', 'resume_process', 'list_processes', 'run']


def init(*args, **kwargs):
    project_name = input('Enter the name of your project: ')
    path = input('Enter the path to your project (default is current directory): ')
    if path == '':
        path = os.getcwd()
    else:
        path = os.path.abspath(path)
    if not os.path.exists(path):
        os.mkdir(path)
    os.chdir(path)
    session, engin = get_session()
    create_tables(engin)

    session.close()
    engin.dispose()


[docs]def registration(username, contacts, name="", cpdf=False, attachment="", pdf_dir="", **args): """This function will register a new process and a new contact list Parameters ---------- username : str the email address of the sender contacts : str the path to the csv file which contains the contacts name : str an arbitrary name for the process cpdf : bool this is a flag that if it is set, the pdf file will be customized for each contact based on the \ path to the pdf file which is in the contacts csv file in the column 'cpdf' attachment : str the path to the attachment file if you want to attach a same file to all emails pdf_dir : str the path to the directory which contains the pdf files if you want to use cpdf flag args : dict the other arguments which are passed to the function Returns ------- None Notes ----- This function will register a new process plus records for each contact in the contacts csv file. """ from pyautomail.storage import register_new_process register_new_process(title=name, email=username, contact_list=contacts, custom_pdf=cpdf, attachment=attachment, custom_pdf_dir=pdf_dir, subject=args.get('subject', ""), template=args.get('template', "")) print("Process registered successfully!") print("You can start the process with 'pyautomail start <process_id>' command")
[docs]def run(pid, resume=True): """ This function will run a process with a specific id Parameters ---------- pid : int the process id resume : bool if True, the program will resume the process if it is paused, otherwise it prints a warning message and \ return without doing anything """ session, engin = get_session() create_tables(engin) session.close() engin.dispose() logger = init_logger('manager') logger.info("Reading arguments...") process = session.query(Process).filter(Process.id == pid).first() if not process: logger.error(f"ID:{pid} => Process not found. You can see all processes with 'mailmanager list' command") return sender_email = process.sender subject = process.subject temp_file = process.temp_file if resume: if process.status != "paused": print(f"ID:{pid} => Program is already {process.status}") return contacts = session.query(Record).filter(Record.process_id == pid, Record.status == "unsent").all() else: if process.status in ["paused", "in progress", 'finished']: logger.warning(f"ID:{pid} => Program is already {process.status}") print(f"ID:{pid} => Program is already {process.status}") print(f"You can resume the program with 'mailmanager resume {pid}' command") print(f"You can see all processes with 'mailmanager list' command") while True: check = input(f"Are you sure you want to sent all email again? (y/n)") if check == "n": return elif check == "y": break else: print("Please enter y or n!") contacts = session.query(Record).filter(Record.process_id == pid).all() password = getpass.getpass("Enter you email password :") # Create a secure SSL context logger.info("Creating EmailSender obj...") sender = EmailSender(cfg="config.cfg", user=sender_email, password=password) if temp_file: sender.set_template(temp_file) process.status = "in progress" session.commit() pause = False for contact in contacts: process = session.query(Process).filter(Process.id == pid).first() if process.status == "paused": logger.info(f"ID:{pid} => Pausing the program") pause = True break logger.info(f"Sending email to {contact.receiver}") sender.send(contact.receiver, subject, contact.data) contact.status = "sent" session.commit() logger.info(f"ID:{contact.id} => Email sent to {contact.receiver}") time.sleep(10) if not pause: process.status = "finished" session.commit() logger.info(f"ID:{pid} => Program finished successfully")
[docs]def start(pid, **args): """This function will start a process with a specific id""" run(pid, resume=False)
[docs]def stop(pid, **args): """This function will stop a process with a specific id""" session, engin = get_session() create_tables(engin) session.close() engin.dispose() process = session.query(Process).filter(Process.id == pid).first() if process.status == "in progress": process.status = "paused" session.commit() print(f"ID: {pid} => Pausing the program") else: print(f"ID: {pid} => Program is not running")
[docs]def resume_process(pid, **args): """This function will resume a process with a specific id""" run(pid, resume=True)
[docs]def list_processes(pid=None, **kwargs): """This function will print the list of all processes or a specific process with a specific id Parameters ---------- pid : int the process id if you want to see the information of a specific process or None if you want to see the \ information of all processes """ session, engin = get_session() create_tables(engin) session.close() engin.dispose() if pid is None: processes = session.query(Process).all() else: processes = session.query(Process).filter(Process.id == pid).all() for process in processes: print(process.id, process.title, process.status, len(session.query(Record).filter(Record.process_id == process.id, Record.status == "unsent").all()), len(session.query(Record).filter(Record.process_id == process.id, Record.status == "sent").all()))
def _parser(): parser_obj = argparse.ArgumentParser(description='Automail', prog="pyautomail") sud_parser = parser_obj.add_subparsers(dest='command') init_parser = sud_parser.add_parser('init', help='Initialize the database') init_parser.set_defaults(func=init) register_parser = sud_parser.add_parser('register', help='Register a new user') register_parser.add_argument('username', help='Username') register_parser.add_argument('contacts', help='Contacts') register_parser.add_argument('--name', help='A name') register_parser.add_argument('--template', help='Path to HTML or TXT Template') register_parser.add_argument('--subject', help='Subject of the email') register_parser.add_argument("--attachment", help="path to pdf file that you want to attach to your E-mail") register_parser.add_argument("--cpdf", action='store_true', help="custom pdf file that you want to attach to" " your E-mail", default=False) register_parser.add_argument("--pdf_dir", default='') register_parser.set_defaults(func=registration) start_parser = sud_parser.add_parser('start', help='Start a process that is registered by id') start_parser.add_argument('pid', help='process id') start_parser.set_defaults(func=start) stop_parser = sud_parser.add_parser('stop', help='Stop a process that is running by id') stop_parser.add_argument('pid', help='process id') stop_parser.set_defaults(func=stop) resume_parser = sud_parser.add_parser('resume', help='Resume a process that is paused by id') resume_parser.add_argument('pid', help='process id') resume_parser.set_defaults(func=resume_process) list_parser = sud_parser.add_parser('list', help='List processes (all or a specific proces by id') list_parser.add_argument('--pid', help='process id', default=None, required=False) list_parser.set_defaults(func=list_processes) return parser_obj def main(args=None): parser_ = _parser() args_ = parser_.parse_args(args) args_.func(**vars(args_)) if __name__ == '__main__': main()