Commit 397f2566 authored by xiang's avatar xiang
Browse files

Merge branch 'CD-13_file_handler' into 'master'

Cd 13 file handler

See merge request ceotr/app_common!22
parents bc962109 6f94bba1
......@@ -6,5 +6,4 @@ from .file_system_manager import FileSystemManager
fsm = FileSystemManager()
# !!! not finish don't use it
raise NotImplementedError
......@@ -3,7 +3,6 @@ import abc
import tarfile
import os
from subprocess import Popen, PIPE
from functools import partial
from typing import (
Any,
......@@ -13,6 +12,7 @@ from typing import (
from .utils import WrapperProxy
from .linux_command import LinuxCommand
from .remote_server_connector import RemoteServer
from .loacl_command_executor import execute_local_command
class LinuxFileSystemController(WrapperProxy):
......@@ -65,51 +65,17 @@ class LinuxFileSystemController(WrapperProxy):
return self._result_cache
class DummyLinuxFileSystemController(LinuxFileSystemController):
"""Fake controller doesn't do anything for testing"""
def commit(self, parallel=True) -> None:
for command in self._un_commit_list:
self._result_cache.append(command)
self._un_commit_list = []
class DummyLinuxFileSystemControllerUsingOsSystem(LinuxFileSystemController):
"""
For testing purpose
"""
def commit(self, parallel=True) -> None:
"""Do nothing with the commands"""
for command in self._un_commit_list:
os.system(command)
self._un_commit_list = []
class LocalLinuxFileSystemController(LinuxFileSystemController):
"""Run commands on local linux server"""
command_execute_function = execute_local_command
def __init__(self, user: str = None, host: str = None) -> None:
super().__init__()
self.user = user
self.host = host
def commit(self, parallel=True) -> None:
if parallel:
for command in self._un_commit_list:
with Popen(command, shell=True, stdout=PIPE, stderr=PIPE) as popen_res:
popen_res.wait()
out = popen_res.stdout.readlines()
self._result_cache.append((out, popen_res.returncode))
else:
the_comment = ";".join(self._un_commit_list)
with Popen(the_comment, shell=True, stdout=PIPE, stderr=PIPE) as popen_res:
popen_res.wait()
out = popen_res.stdout.readlines()
self._result_cache.append((out, popen_res.returncode))
def commit(self, parallel: bool = True) -> None:
execute_local_command(parallel, self._un_commit_list, self._result_cache)
self._un_commit_list = []
@staticmethod
......@@ -136,19 +102,20 @@ class LocalLinuxFileSystemController(LinuxFileSystemController):
class RemoteLinuxFileSystemController(LinuxFileSystemController):
"""Run command on remote linux server"""
RemoteServerClass = RemoteServer
def __init__(self, host: str, user: str, password: str = None) -> None:
super().__init__()
self.user = user
self.host = host
self.remote_server = RemoteServer(user, host, password)
self.remote_server = self.RemoteServerClass(user, host, password)
def back_up_remote_dir(self, abs_dir: str) -> None:
back_up_dir = abs_dir + ".bak"
self.move_files(abs_dir, back_up_dir)
self.commit()
def commit(self, parallel=True) -> None:
def commit(self, parallel: bool = True) -> None:
with self.remote_server as remote_server_handler:
for command in self._un_commit_list:
ouput_res = remote_server_handler.send_command(command)
......
"""Linux system commands wrapped by Python code"""
import os
from .utils import absolute_path_complement, convert_list_to_str
from .utils import path_complement, convert_list_to_str
REMOTE_USER_HOST = "{user}@{remote_host}"
......@@ -48,33 +48,33 @@ class LinuxCommand:
@staticmethod
def current_path() -> str:
return "pwd"
@staticmethod
def compress_files(file_dir_or_files_list, result_path, res_name):
"""Linux command for compress one file or a list of files
"""
file_input_type = type(file_dir_or_files_list)
basic_command_format = "tar -cjvf {} {}"
result_path = os.path.join(result_path, res_name)
if file_input_type is str:
return basic_command_format.format(result_path, file_dir_or_files_list)
elif file_input_type is list:
return basic_command_format.format(result_path, convert_list_to_str(file_dir_or_files_list))
else:
raise AttributeError(
"file_dir_or_files_list:() must be file path, or a list of file path".format(file_dir_or_files_list))
@staticmethod
def uncompress_files(file_path, path_to_extract_dir=None):
"""
Uncompress the file
"""
basic_command_format = "tar -xzvf {} {}"
if path_to_extract_dir:
extractdir = "-C {}".format(path_to_extract_dir)
else:
extractdir = ""
return basic_command_format.format(file_path, extractdir)
#
# @staticmethod
# def compress_files(file_dir_or_files_list, result_path, res_name):
# """Linux command for compress one file or a list of files
# """
# file_input_type = type(file_dir_or_files_list)
# basic_command_format = "tar -cjvf {} {}"
# result_path = os.path.join(result_path, res_name)
# if file_input_type is str:
# return basic_command_format.format(result_path, file_dir_or_files_list)
# elif file_input_type is list:
# return basic_command_format.format(result_path, convert_list_to_str(file_dir_or_files_list))
# else:
# raise AttributeError(
# "file_dir_or_files_list:() must be file path, or a list of file path".format(file_dir_or_files_list))
#
# @staticmethod
# def uncompress_files(file_path, path_to_extract_dir=None):
# """
# Uncompress the file
# """
# basic_command_format = "tar -xzvf {} {}"
# if path_to_extract_dir:
# extractdir = "-C {}".format(path_to_extract_dir)
# else:
# extractdir = ""
# return basic_command_format.format(file_path, extractdir)
@staticmethod
def copy_file(src_path: str, dst_dir: str, new_name=None):
......@@ -131,7 +131,7 @@ class LinuxCommand:
"""
if folder:
recursive = "-r"
scp_from = absolute_path_complement(scp_from)
scp_from = path_complement(scp_from)
else:
recursive = ""
user_host = REMOTE_USER_HOST.format(user=user, remote_host=host, dst=scp_to)
......
from subprocess import Popen, PIPE
def execute_local_command(parallel, un_commit_list, result_cache):
if parallel:
for command in un_commit_list:
with Popen(command, shell=True, stdout=PIPE, stderr=PIPE) as popen_res:
popen_res.wait()
out = popen_res.stdout.readlines()
result_cache.append((out, popen_res.returncode))
else:
the_comment = ";".join(un_commit_list)
with Popen(the_comment, shell=True, stdout=PIPE, stderr=PIPE) as popen_res:
popen_res.wait()
out = popen_res.stdout.readlines()
result_cache.append((out, popen_res.returncode))
......@@ -6,55 +6,70 @@ class RemoteServer:
"""
This class allow user to build and store a ssh connection session.
"""
pxssh_session = pxssh.pxssh()
exception_pxssh = pxssh.ExceptionPxssh
def __init__(self, user: str, host: str, password: str = None) -> None:
self.user = user
self.host = host
self.password = password
self._session = None
self.session = self.pxssh_session
self.connected = False
def __login(self) -> None:
connected = False
errors = []
def _password_login(self, errors):
"""Login remote server with password
put raise in errors
"""
try:
self.session.login(server=self.host, username=self.user, password=self.password)
except self.exception_pxssh as e:
errors.append(e.value)
else:
self.connected = True
def _password_free_login(self, errors):
"""Login remote server without password
put raise in errors
"""
try:
self.session.login(server=self.host, username=self.user)
except self.exception_pxssh as e:
errors.append(e.value)
else:
self.connected = True
def _login(self) -> None:
"""
Login with or without password
raise if tried both way and failed
"""
erros = []
if self.password:
try:
self.session.login(server=self.host, username=self.user, password=self.password)
except pxssh.ExceptionPxssh as e:
errors.append(e)
else:
connected = True
if not connected:
try:
self.session.login(server=self.host, username=self.user)
except pxssh.ExceptionPxssh as e:
connected = False
errors.append(e)
else:
connected = True
if errors:
if len(errors) == 2:
...
def __logout(self) -> None:
self._password_login(erros)
if not self.connected:
self._password_free_login(erros)
if len(erros) > 0 and not self.connected:
raise self.exception_pxssh(erros)
def _logout(self) -> None:
"""close connection"""
self.session.logout()
self.session.close()
@property
def session(self) -> pxssh.pxssh:
if self._session is None:
self._session = pxssh.pxssh()
self.__login()
return self._session
def send_command(self, command) -> str:
"""
send command to remote and return response
"""
if not self.connected:
self._login()
self.session.sendline(command)
self.session.prompt()
text = self.session.before
return text
def __enter__(self):
self.session
self._login()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.__logout()
self._logout()
......@@ -21,18 +21,18 @@ def get_files(file_dir):
def convert_list_to_str(files: list or str) -> str:
"""convert a list of files name to a string
["name1","name2"] -> "name1 name2"
["name1","name2"] -> "/name1 /name2"
"""
file_lists_str = ""
if type(files) is str:
return absolute_path_complement(files)
return path_complement(files)
for result in files:
file_lists_str = file_lists_str + absolute_path_complement(result) + " "
file_lists_str = file_lists_str + path_complement(result) + " "
return file_lists_str
def absolute_path_complement(file: str, folder: bool = False) -> str:
"""To check is give path a absolute path"""
def path_complement(file: str, folder: bool = False) -> str:
"""To check is give path a absolute path format"""
if not file.startswith("/"):
file = '/' + file
......@@ -48,7 +48,9 @@ def absolute_path_complement(file: str, folder: bool = False) -> str:
class WrapperProxy(ABC):
"""proxy for wrapped object
Subclass can access all public variable or method from wrapped object
Subclass can access all public variable or public method from wrapped object
overwrite the _setup method to get wrapped obj
"""
def __init__(self, decorator=None) -> None:
......
......@@ -17,7 +17,6 @@ setup(name='app_common',
"PyYAML",
"pexpect",
"mockssh"
],
zip_safe=True
)
from pexpect import pxssh
class DummyMockPxssh:
def __init__(self, success=True):
self.res = []
self.success = success
def login(self, *args, **kwargs):
self.res.append((args, kwargs))
if not self.success:
raise pxssh.ExceptionPxssh("connection fail")
def logout(self):
self.res.append("logout")
def close(self):
self.res.append("close")
def sendline(self, command):
self.res.append(command)
def prompt(self):
self.res.append("prompt")
@property
def before(self):
self.res.append("before")
return None
import os
import unittest
import getpass
from unittest import mock
from .common_mock import DummyMockPxssh
from app_common.linux_file_handler.file_system_controller import (LocalLinuxFileSystemController,
RemoteLinuxFileSystemController)
from app_common.utilities.file_prepare import check_create_dir
from app_common.test_common.base import TestBase
from app_common.linux_file_handler.remote_server_connector import RemoteServer
def test_fun1(a):
def example_test_fun1(a):
return a + "_decorated"
......@@ -24,7 +23,7 @@ class TestLinuxFileSystemController(unittest.TestCase):
:return:
"""
decorated_func = self.file_controller.send_decorate(test_fun1)
decorated_func = self.file_controller.send_decorate(example_test_fun1)
test_res = decorated_func('ls')
res = self.file_controller._un_commit_list
expected_res = 'ls_decorated'
......@@ -44,6 +43,9 @@ class TestLinuxFileSystemController(unittest.TestCase):
self.file_controller.make_directory(dir2)
self.assertEqual([expect_value1, expect_value2], res_from_un_commit_list)
def test_get_result(self):
self.assertEqual([], self.file_controller.get_result())
class TestLocalLinuxFileSystemController(TestBase):
def setUp(self):
......@@ -89,6 +91,7 @@ class TestLocalLinuxFileSystemController(TestBase):
"""
send multiple commands and test change directory command
"""
# create a test file first
create_dir_name = "test"
folder_path = os.path.join(self.output_dir, create_dir_name)
create_file_name = "test.txt"
......@@ -99,15 +102,11 @@ class TestLocalLinuxFileSystemController(TestBase):
self.file_controller.create_file(file_path)
self.file_controller.list_files()
self.file_controller.commit(parallel=False)
result = self.file_controller.get_result()
# test if the command finished successfully
self.assertEqual(0, self.file_controller.get_result()[0][1])
# was in root directory
self.assertTrue(b'output\n' in self.file_controller.get_result()[0][0])
# changed into output_dir
self.assertTrue(b'test.txt\n' in self.file_controller.get_result()[0][0])
def test_commit_command_for_move_files(self):
...
res = self.file_controller.get_result()[0][0]
self.assertTrue(b'test.txt\n' in res)
def test_commit_command_for_remove_files(self):
create_file_name = "test.txt"
......@@ -131,39 +130,38 @@ class TestLocalLinuxFileSystemController(TestBase):
self.assertTrue(username in str(res[0][0]))
def test_commit_command_for_current_path(self):
current_path = os.path.dirname(os.path.abspath(__file__))
self.file_controller.change_directory(current_path)
self.file_controller.current_path()
self.file_controller.commit()
self.file_controller.commit(parallel=False)
res = self.file_controller.get_result()
current_path = os.path.dirname(os.path.abspath(__file__))
self.assertTrue(current_path in str(res[0][0]))
# todo test those commands which require a mock FTP server
def test_compress_dir(self):
""" create a file first and then compress it
"""
# first test compress a dir
dir_name = "test"
dir_path = os.path.join(self.output_dir, dir_name)
res_name = "test.bar.gz"
res_path = os.path.join(self.output_dir, res_name)
os.mkdir(dir_path)
self.file_controller.compress_files(dir_path, self.output_dir, res_name)
self.file_controller.commit()
self.assertTrue(os.path.isfile(res_path))
# second test compress a list of dirs
file_name1 = "sample1.txt"
file_name2 = "sample2.txt"
file_name3 = "sample3.txt"
file_name_list = [file_name1, file_name2, file_name3]
file_path_list = []
res_name2 = "test2.bar.gz"
res_path2 = os.path.join(self.output_dir, res_name2)
for x in file_name_list:
file_path_list.append(os.path.join(self.output_dir, x))
self.file_controller.compress_files(file_name_list, self.output_dir, res_name2)
self.file_controller.commit()
self.assertTrue(os.path.isfile(res_path2))
# def test_compress_dir(self):
# """ create a file first and then compress it
# """
# # first test compress a dir
# dir_name = "test"
# dir_path = os.path.join(self.output_dir, dir_name)
# res_name = "test.bar.gz"
# res_path = os.path.join(self.output_dir, res_name)
# os.mkdir(dir_path)
# self.file_controller.compress_files(dir_path, self.output_dir, res_name)
# self.file_controller.commit()
# self.assertTrue(os.path.isfile(res_path))
# # second test compress a list of dirs
# file_name1 = "sample1.txt"
# file_name2 = "sample2.txt"
# file_name3 = "sample3.txt"
# file_name_list = [file_name1, file_name2, file_name3]
# file_path_list = []
# res_name2 = "test2.bar.gz"
# res_path2 = os.path.join(self.output_dir, res_name2)
# for x in file_name_list:
# file_path_list.append(os.path.join(self.output_dir, x))
# self.file_controller.compress_files(file_name_list, self.output_dir, res_name2)
# self.file_controller.commit()
# self.assertTrue(os.path.isfile(res_path2))
def test_make_soft_link(self):
"""
......@@ -216,4 +214,12 @@ class TestLocalLinuxFileSystemController(TestBase):
class TestLinuxRemoteFileSystemController(unittest.TestCase):
def setUp(self):
self.file_controller = RemoteLinuxFileSystemController()
RemoteServer.pxssh_session = DummyMockPxssh()
RemoteLinuxFileSystemController.RemoteServerClass = RemoteServer
self.file_controller = RemoteLinuxFileSystemController("test_host", "the_user")
def test_commit(self):
self.file_controller._un_commit_list = ["a", "b"]
self.file_controller.commit()
self.assertEqual([None, None], self.file_controller.get_result())
self.assertEqual([], self.file_controller._un_commit_list)
import unittest
import os
from app_common.linux_file_handler.linux_command import LinuxCommand
......@@ -98,86 +99,86 @@ class TestLinuxCommand(unittest.TestCase):
Test Compress file command
"""
def test_compress_file1(self):
"""test compress a file with no dst_dir
"""
test_file_name = "/test/file/name/file_name.txt"
test_dst_file_name = "test/dst/dir/dst_file_name.tar"
expected_command1 = "tar -cjvf {} {}".format(test_dst_file_name, test_file_name)
res1 = LinuxCommand.compress_files(test_file_name, test_dst_file_name)
self.assertEqual(expected_command1, res1)
def test_compress_file2(self):
""" test compress a one list with one file with dst_dir and dst_name
"""
test_dst_file_name = "test/dst/dir/dst_file_name.tar"
test_file_list1 = ["/test/file/name/file_name.txt"]
expected_command2 = "tar -cjvf {} {}".format(test_dst_file_name, "/test/file/name/file_name.txt")
res2 = LinuxCommand.compress_files(test_file_list1, test_dst_file_name)
self.assertEqual(expected_command2.strip(), res2.strip())
def test_compress_file3(self):
"""test compress a one list with two file with dst_dir and dst_name
"""
test_dst_file_name = "test/dst/dir/dst_file_name.tar"
test_file_list = ["/test/file/name/file_name.txt", "/test/file/name/file_name.txt2"]
expected_command = "tar -cjvf {} {}".format(test_dst_file_name,
"/test/file/name/file_name.txt /test/file/name/file_name.txt2")
res = LinuxCommand.compress_files(test_file_list, test_dst_file_name)
self.assertEqual(expected_command.strip(), res.strip())
def test_compress_file4(self):
"""test compress a one list with no file with dst_dir and dst_name
"""
test_dst_file_name = "test/dst/dir/dst_file_name.tar"
test_file_list4 = []
expected_command4 = "tar -cjvf {} {}".format(test_dst_file_name,
" ")
res4 = LinuxCommand.compress_files(test_file_list4, test_dst_file_name)
self.assertEqual(expected_command4.strip(), res4.strip())
def test_compress_file5(self):
"""Test given invalid file name
It should raise Attribute error since It is not suppose to
"""
test_file_list4 = 1
test_dst_file_name = "test/dst/dir/dst_file_name.tar"
try:
LinuxCommand.compress_files(test_file_list4, 1)
except AttributeError:
self.assertTrue(True)
else:
self.assertTrue(False)
# def test_compress_file1(self):
# """test compress a file with no dst_dir
# """
# test_file_name = "/test/file/name/file_name.txt"
# test_dst_file_dir = "test/dst/dir/"
# test_dst_file_name = "dst_file_name.tar"
# expected_command1 = "tar -cjvf {} {}".format(test_file_name, os.path.join(test_dst_file_dir, test_dst_file_name))
# res1 = LinuxCommand.compress_files(test_file_name, test_dst_file_dir, test_dst_file_name)
# self.assertEqual(expected_command1, res1)
# def test_compress_file2(self):
# """ test compress a one list with one file with dst_dir and dst_name
# """
# test_dst_file_name = "test/dst/dir/dst_file_name.tar"
# test_file_list1 = ["/test/file/name/file_name.txt"]
# expected_command2 = "tar -cjvf {} {}".format(test_dst_file_name, "/test/file/name/file_name.txt")