import logging
import os
import shutil
import time
import typer
from typing import Optional
from pathlib import Path
from pyautomail import __app_name__, __version__, EmailSender
from pyautomail import utils
from pyautomail.utils import get_config_dict, init_logger
from pyautomail.storage import get_session, create_tables
from pyautomail.storage import util, Process, Record
app = typer.Typer()
[docs]def get_logger():
return init_logger('cli', filename='cli.log', level=logging.DEBUG)
[docs]def run_process(pid, is_resume=True, wait_time=.05):
"""
This function will run a process with a specific id
Parameters
----------
pid : int
the process id
is_resume : bool
if True, the program will resume the process if it is paused, otherwise it prints a warning message and \
return without doing anything
wait_time : int
the time to wait between sending emails
"""
# initialize the logger
logger = get_logger()
logger.info("Reading arguments...")
# get the database session
session, engin = get_session()
create_tables(engin)
# get the process and check if it exists
process = session.query(Process).filter(Process.id == pid).first()
if not process:
logger.error(f"ID:{pid} => Process not found. You can see all processes with 'automail list' command")
return
sender_email = process.sender
subject = process.subject
temp_file = process.temp_file
if is_resume:
if process.status != "paused":
logger.error(f"ID:{pid} => Program is already {process.status}. "
f"For resuming the program, it should be paused")
return
contacts = session.query(Record).filter(Record.process_id == pid, Record.status == "unsent").all()
else:
if process.status in ["paused", "in progress", 'finished']:
logger.warning(f"ID:{pid} => Program is already {process.status}")
logger.warning(f"the `automail start {id}` command will start the program from the beginning")
check = typer.confirm(f"Are you sure you want to start the program from the beginning?", default=False)
if not check:
logger.info(f"ID:{pid} => Program not started")
return
contacts = session.query(Record).filter(Record.process_id == pid).all()
# get the password
logger.info("Creating EmailSender obj...")
if typer.confirm(f"Do you want to use the initial password for {sender_email}?", default=True):
sender = EmailSender(cfg="config.cfg", user=sender_email)
else:
password = typer.prompt(f"Enter password for {sender_email}", hide_input=True)
sender = EmailSender(cfg="config.cfg", user=sender_email, password=password)
# Create a secure SSL context
if temp_file:
sender.set_template(temp_file)
process.status = "in progress"
session.commit()
pause = False
for contact in contacts:
process = session.query(Process).filter(Process.id == pid).first()
if process.status == "paused":
logger.info(f"ID:{pid} => is paused by other process")
session.close()
engin.dispose()
return
logger.info(f"Sending email to: {contact.receiver}")
sender.send(contact.receiver, subject, contact.data)
contact.status = "sent"
session.commit()
logger.info(f"ID:{contact.id} => Email sent to {contact.receiver}")
time.sleep(wait_time)
process.status = "finished"
session.commit()
logger.info(f"ID:{pid} => Program finished successfully")
def _version_callback(value: bool) -> None:
"""Show the application's version and exit."""
if value:
typer.echo(f"{__app_name__} v{__version__}")
raise typer.Exit()
[docs]@app.callback()
def main(
version: Optional[bool] = typer.Option(
None,
"--version",
"-v",
help="Show the application's version and exit.",
callback=_version_callback,
is_eager=True,
)
) -> None:
return
[docs]@app.command()
def init(
db_path: str = typer.Option(
'./automail-workspace',
"--db-path",
"-db",
prompt="Where do you want to initialize the automail project?",
),
smtp_server: str = typer.Option(
'smtp.gmail.com',
"--smtp-server",
"-ss",
prompt="What is your smtp server?",
help="Your smtp server.",
),
smtp_port: int = typer.Option(
465,
"--smtp-port",
"-sp",
prompt="What is your smtp port?",
help="Your smtp port.",
),
email: str = typer.Option(
'<your-email>',
"--email",
"-e",
help="Your email address.",
prompt="Enter a default email address?",
),
password: str = typer.Option(
'None',
"--password",
"-p",
help="Your email password.",
hide_input=True,
prompt="If you want to save the password of default email please enter your password"
" otherwise press enter to continue?",
),
is_test: bool = typer.Option(
False,
"--test",
"-t",
help="Test output.",
),
) -> None:
"""Initialize the mail database."""
logger = get_logger()
if not shutil.os.path.exists(db_path):
shutil.os.makedirs(db_path)
elif not os.listdir(db_path):
pass
else:
logger.error(f"Directory {db_path} already exists.")
if shutil.os.path.exists(db_path + '/mail.db') and shutil.os.path.exists(db_path + '/config.cfg'):
# delete the folder if it exists
if typer.confirm(f"Do you want to delete {db_path} directory?", default=False):
shutil.rmtree(db_path)
logger.info(f"Deleting {db_path} directory...")
time.sleep(0.5)
shutil.os.mkdir(db_path)
else:
logger.info(f"Aborted!")
raise typer.Exit(code=0)
else:
logger.error(f"This directory is not a previous automail project."
f" please choose another directory or project name.")
raise typer.Exit(code=0)
# create the database
logger.info(f"Initializing {__app_name__} database...")
os.chdir(db_path)
session, engin = get_session()
create_tables(engin)
logger.info(f"Creating tables...")
logger.info(f"Done!")
# create the config file
logger.info(f"Creating config file...")
utils.create_config_file(
smtp_server=smtp_server, smtp_port=smtp_port, password=password, sender_email=email, is_test=is_test
)
logger.info(f"Done!")
logger.info(f"You can see and change the configuration file in the {os.path.abspath('./config.cfg')}!")
os.chdir("../")
session.close()
engin.dispose()
return
[docs]@app.command()
def register(
email: str = typer.Option(
"default(see config.cfg)",
"--email",
"-e",
prompt="What is your email address?",
help="Your email address.",
),
contact_list: Path = typer.Argument(
...,
exists=True,
help="The path to your contact list.",
),
title: str = typer.Option(
"Contact List",
"--title",
"-T",
prompt="What is the title of your contact list?",
help="The title of your contact list.",
),
custom_pdf: bool = typer.Option(
False,
"--custom-pdf",
"-CA",
help="Convert your contact list to pdf.",
),
attachment: Path = typer.Option(
None,
"--attachment",
"-a",
help="The path to the attachment file.",
),
custom_pdf_dir: Path = typer.Option(
None,
"--custom-pdf-dir",
"-cpd",
help="The path to the custom pdf directory.",
),
subject: str = typer.Option(
'None',
"--subject",
"-s",
prompt="What is the subject of your email?",
help="The subject of your email.",
),
template: Optional[Path] = typer.Option(
None,
"--template",
"-t",
help="The body of your email.",
),
) -> None:
"""Register your email account."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
if template is None:
tmp = typer.confirm("Do you want to use template?", default=False)
if tmp:
while True:
template = typer.prompt("Please enter the path to your template file.", type=Path)
if os.path.isfile(template) and (template.suffix == '.txt' or template.suffix == '.html'):
template = str(template)
break
else:
typer.secho(f"File {template} does not exist. or the file type is not supported. "
f"the file type should be .txt or .html", fg=typer.colors.RED)
continue
if not os.path.exists(title):
shutil.os.mkdir(title)
else:
logger.error(f"Directory {title} already exists. Please choose another title.")
raise typer.Exit(code=0)
shutil.copy(contact_list, title)
contact_list = os.path.join(title, os.path.basename(contact_list))
if attachment:
shutil.copy(attachment, title)
attachment = os.path.join(title, os.path.basename(attachment))
if template:
shutil.copy(template, title)
template = os.path.join(title, os.path.basename(template))
logger.info(f"Registering your email account...")
if email == "default(see config.cfg)":
email = get_config_dict()["user"]
util.register_new_process(
email=email,
contact_list=contact_list,
title=title,
custom_pdf=custom_pdf,
attachment=attachment,
custom_pdf_dir=str(custom_pdf_dir),
subject=subject,
template=template,
)
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command()
def start(
pid: Optional[int] = typer.Argument(
None,
help="The id of the process.",
),
) -> None:
"""Start sending emails."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Starting...")
if pid is None:
logger.error("Please enter the id of the process.")
raise typer.Exit(code=0)
else:
run_process(pid=pid, is_resume=False)
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command()
def resume(
pid: Optional[int] = typer.Argument(
None,
help="The id of the process.",
),
) -> None:
"""Resume sending emails."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Resuming...")
if pid is None:
typer.secho("Please enter the id of the process.", fg=typer.colors.RED)
raise typer.Exit(code=0)
else:
run_process(pid=pid, is_resume=True)
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command()
def stop(
pid: Optional[int] = typer.Argument(
None,
help="The id of the process.",
),
) -> None:
"""Stop a process."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Stopping...")
if pid is None:
logger.error("Please enter the id of the process.")
raise typer.Exit(code=0)
else:
session, engin = get_session()
process = session.query(Process).filter(Process.id == pid).first()
if process is None:
logger.error(f"Process with id {pid} does not exist.")
session.close()
engin.dispose()
raise typer.Exit(code=0)
if process.status == "in progress":
process.status = "paused"
session.commit()
logger.info(f"ID: {pid} => Pausing the program.")
else:
logger.warning(f"ID: {pid} => Program is not in progress.")
session.close()
engin.dispose()
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command()
def delete_process(
pid: Optional[int] = typer.Argument(
None,
help="The id of the process.",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-d",
help="Test output.",
),
) -> None:
"""Delete a process."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Deleting...")
if pid is None:
typer.secho("Please enter the id of the process.", fg=typer.colors.RED)
raise typer.Exit(code=0)
else:
session, engin = get_session()
process = session.query(Process).filter(Process.id == pid).first()
if process is None:
logger.error(f"Process with id {pid} does not exist.")
raise typer.Exit(code=0)
else:
if dry_run:
logger.info(f"Process with id {pid} will be deleted.")
else:
session.delete(process)
session.commit()
logger.info(f"Process with id {pid} has been deleted.")
shutil.rmtree(process.title)
session.close()
engin.dispose()
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command()
def delete_record(
rid: Optional[int] = typer.Argument(
None,
help="The id of the record.",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-d",
help="Test output.",
),
) -> None:
"""Delete a record by ID."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Deleting...")
if rid is None:
logger.error("Please enter the id of the record.")
raise typer.Exit(code=0)
else:
session, engin = get_session()
record = session.query(Record).filter(Record.id == rid).first()
if record is None:
logger.error(f"Record with id {rid} does not exist.")
raise typer.Exit(code=0)
else:
if dry_run:
logger.info(f"Record with id {rid} will be deleted.")
else:
session.delete(record)
session.commit()
logger.info(f"Record with id {rid} has been deleted.")
session.close()
engin.dispose()
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
[docs]@app.command("list")
def list_processes() -> None:
"""List all processes."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Listing...")
session, engin = get_session()
create_tables(engin)
processes = session.query(Process).all()
if len(processes) == 0:
logger.warning("There is no process.")
else:
for process in processes:
logger.info(f"ID: {process.id} | Title: {process.title} | Status: {process.status}")
session.close()
engin.dispose()
[docs]@app.command()
def list_records(
process_id: Optional[int] = typer.Argument(
None,
help="The id of the process.",
)) -> None:
"""List all records of one process."""
logger = get_logger()
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
logger.info("Listing...")
session, engin = get_session()
create_tables(engin)
if process_id is None:
records = session.query(Record).all()
else:
records = (
session.query(Record).filter(Record.process_id == process_id).all()
)
if len(records) == 0:
logger.warning("There is no record.")
else:
for record in records:
logger.info(f"ID: {record.id} | Process ID: {record.process_id} | Email: {record.receiver} |"
f" Status: {record.status}"
)
session.close()
engin.dispose()
else:
logger.error("Please initialize the automail project first.")
raise typer.Exit(code=0)
if __name__ == "__main__":
app(prog_name=__app_name__)