From 3930557150ef123137bafbc442cd39bf946eaf3d Mon Sep 17 00:00:00 2001 From: LinuxSquare Date: Sun, 1 Dec 2024 21:13:41 +0100 Subject: [PATCH] Rework mysql_secure_installation (again) --- mysql/library/mysql_secure_installation.py | 478 --------------------- mysql/pkg.ansible.yml | 2 +- mysql/secure.ansible.yml | 30 +- 3 files changed, 19 insertions(+), 491 deletions(-) delete mode 100644 mysql/library/mysql_secure_installation.py diff --git a/mysql/library/mysql_secure_installation.py b/mysql/library/mysql_secure_installation.py deleted file mode 100644 index 42cb6c4..0000000 --- a/mysql/library/mysql_secure_installation.py +++ /dev/null @@ -1,478 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2020, eslam.gomaa - -DOCUMENTATION = ''' ---- -module: mysql_secure_installation - -short_description: An idempotent module to perform "mysql_secure_installation" script steps - -version_added: "1.0" - -description: - - An idempotent module to perform "mysql_secure_installation" script steps - - Change MySQL Root Password - for a list of hosts i.e [localhost, 127.0.0.1, ::1,] .etc. - - Remove Anonymous User - - Disallow Root Login Remotely - - Remove Test Database - -options: - login_password: - description: - - Root's password to login to MySQL - required: true - type: str - - new_password: - description: - - New desired Root password - required: true - type: str - user: - description: - - MySQL user to login - default: "root" - type: str - - login_host: - description: - - host to connect to - default: "localhost" - type: str - hosts: - description: - - List of hosts for the provided user i.e ['localhost', '127.0.0.1', '::1'], Note - all will have the same new password - default: ["localhost"] - type: list - change_root_password: - description: - - whether or not to change root password - default: True - type: bool - - remove_anonymous_user: - description: - - whether or not to remove anonymous user - default: True - type: bool - - disallow_root_login_remotely: - description: - - whether or not to disallow root login remotely - default: False - type: bool - - remove_test_db: - description: - - whether or not to remove test db - default: True - type: bool - - -author: - - eslam.gomaa (linkedin.com/in/eslam-sa3dany) -''' - -EXAMPLES = ''' -# with a fresh MySQL Installation -- name: test mysql_secure_installation - mysql_secure_installation: - login_password: '' - new_password: password22 - user: root - login_host: localhost - hosts: ['localhost', '127.0.0.1', '::1'] - change_root_password: true - remove_anonymous_user: true - disallow_root_login_remotely: true - remove_test_db: true - register: mysql_secure - -- debug: - var: mysql_secure - -# Change an existing password -- name: test mysql_secure_installation - mysql_secure_installation: - login_password: password22 - new_password: password23 - user: root - login_host: localhost - hosts: ['localhost', '127.0.0.1', '::1'] - change_root_password: true - remove_anonymous_user: true - disallow_root_login_remotely: true - remove_test_db: true -''' - -RETURN = ''' -change_root_pwd: - description: 0 == Success, 1 == Fail - type: Int - returned: always -disallow_root_remotely: - description: 0 == Success, 1 == Fail - type: Int - returned: always -hosts_success: - description: provides a list of the hosts succeeded to change password to i.e ['root@localhost', 'root@mysql.example.com'] - type: List - returned: always -hosts_failed: - description: provides a list of the hosts failed to change password to i.e ['root@localhost', 'root@mysql.example.com'] - type: List - returned: always -remove_anonymous_user: - description: 0 == Success, 1 == Fail - type: Int - returned: always -remove_test_db: - description: 0 == Success, 1 == Fail - type: Int - returned: always -stdout: - description: Prints the result of the code as compared to the desired state - type: Str - returned: always -stderr: - description: Prints an error message in case of code error - type: Str - returned: In case of code error -''' - -############################################## -####################### -############ -# import MySQLdb as mysql # deprecated -import re -import pymysql as mysql -from itertools import chain -import os - -def runcommand(cmd): - """ - function to execute shell commands and returns a dic of - """ - import subprocess - - info = {} - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - universal_newlines=True, ) - std_out, std_err = proc.communicate() - info['rc'] = proc.returncode - info['stdout'] = std_out.rstrip() - info['stderr'] = std_err.rstrip() - ## Python's rstrip() method - # strips all kinds of trailing whitespace by default, not just one newline - return info - -# Checking mysql version first, as Mariadb 10.4.x requires a different command to update user password. -mysql_version_above_10_3 = False -# Get socket path -socket_path = runcommand('mysqladmin variables | grep -w "sock" | xargs | cut -d " " -f 4') -connected_with_socket = False -connected_with_password = False -connected_with_new_password = False -pasword_incorrect_warn = False -mysql_version = None - - -def check_mysql_connection(host, user, password='', unix_socket=True): - """ - A function used to check the ability to login to MySQL/Mariadb - :param host: ie. 'localhost' - :type String - :param user: mysql user ie. 'root' - :type String - :param password: mysql user's password - :type String - :return: True||False - """ - - def connect_socket(host=host, user=user, socket_path=socket_path['stdout']): - if unix_socket: - if socket_path: - if os.path.exists(socket_path): - try: - mysql.connect(host=host, user=user, passwd=password, unix_socket=socket_path) - global connected_with_socket - connected_with_socket = True - return True - # except mysql.err.InternalError: - except: - return False - - def connect_password(host=host, user=user, password=password): - try: - mysql.connect(host=host, user=user, passwd=password) - return True - except: - return False - - if connect_password() or connect_socket(): - return True - return False - -def mysql_secure_installation(login_password, new_password, user='root', login_host='localhost', hosts=['hostname'], - change_root_password=True, remove_anonymous_user=True, disallow_root_login_remotely=False, - remove_test_db=True, disable_unix_socket=False): - """ - A function to perform the steps of mysql_secure_installation script - :param login_password: Root's password to login to MySQL - :param new_password: New desired Root password :type String - :param user: MySQL user - default: 'root' :type String - :param login_host: host to connect to - default: 'localhost' :type String - :param hosts: List of hosts for the provided user i.e ['localhost', '127.0.0.1', '::1'] :type List - :param change_root_password: default: True - :type Boolean - :param remove_anonymous_user: default: True - :type: Boolean - :param disallow_root_login_remotely: default: False - :type Boolean - :param remove_test_db: default: True - :type: Boolean - :return: - """ - if isinstance(hosts, str): - hosts = hosts.split(',') - info = {'change_root_pwd': None, 'hosts_failed': [], 'hosts_success': [], 'remove_anonymous_user': None, - 'remove_test_db': None, 'disallow_root_remotely': None} - - - - def remove_anon_user(cursor): - if remove_anonymous_user: - cursor.execute("select user, host from mysql.user where user='';") - anon_user = cursor.fetchall() - if len(anon_user) >= 1: - cursor.execute('use mysql;') - cursor.execute("DELETE FROM user WHERE user='';") - #cursor.execute("update mysql.user set plugin=null where user='root';") # That shouldn't be here :D (used for testing) - cursor.execute("select user, host from mysql.user where user='';") - check = cursor.fetchall() - info['remove_anonymous_user'] = True - if len(check) >= 1: - info['remove_anonymous_user'] = False - else: - info['remove_anonymous_user'] = "False -- meets the desired state" - - def remove_testdb(cursor): - if remove_test_db: - cursor.execute("show databases;") - testdb = cursor.fetchall() - if 'test' in list(chain.from_iterable(testdb)): # if database "test" exists in the "db's list" - cursor.execute("drop database test;") - info['remove_test_db'] = True - - cursor.execute("show databases;") # Test if the "test" db deleted - check_test_db = cursor.fetchall() - if 'test' in list(chain.from_iterable(check_test_db)): # return 1 if the db still exists - info['remove_test_db'] = False - else: # means "test" db does not exist - info['remove_test_db'] = "False -- meets the desired state" - - def disallow_root_remotely(cursor): - if disallow_root_login_remotely: - cursor.execute( - "select user, host from mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');") - remote = cursor.fetchall() - if len(remote) >= 1: - cursor.execute( - "DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');") - cursor.execute("flush privileges;") - info['disallow_root_remotely'] = True - - cursor.execute( - "select user, host from mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');") - check_remote = cursor.fetchall() - if len(check_remote) >= 1: # Check if the hosts still exist - info['disallow_root_remotely'] = "False -- Failed" - - else: - info['disallow_root_remotely'] = "False -- meets the desired state" - - if check_mysql_connection(host=login_host, user=user, password=new_password, unix_socket=False): - global connected_with_new_password - connected_with_new_password = True - - if check_mysql_connection(host=login_host, user=user, password=login_password, unix_socket=True): - try: - if globals()['connected_with_socket']: - connection = mysql.connect(host=login_host, user=user, passwd=login_password, db='mysql', - unix_socket=socket_path['stdout']) - mysql_version = runcommand('mysqladmin version | grep -wi server | xargs | cut -d " " -f 3') - else: - # print("socket? " + str(globals()['connected_with_socket'])) - connection = mysql.connect(host=login_host, user=user, passwd=login_password, db='mysql') - if not login_password: - mysql_version = runcommand( - "mysqladmin version -u {} -h {} | grep -wi server | xargs | cut -d ' ' -f 3".format( - user, - login_host)) - else: - mysql_version = runcommand( - "mysqladmin version -u {} -h {} -p{}| grep -wi server | xargs | cut -d ' ' -f 3".format( - user, - login_host, - login_password)) - - cursor = connection.cursor() - - # Get MySQL Version - mysql_version_n = re.findall("[0-9]+.[0-9]+", mysql_version['stdout']) - if float(mysql_version_n[0]) >= 10.4: - global mysql_version_above_10_3 - mysql_version_above_10_3 = True - - if disable_unix_socket: - cursor.execute( - "SELECT host, user, password, plugin FROM mysql.user where User='{}' LIMIT 0,1;".format(user)) - socket_exists = cursor.fetchall() - if 'unix_socket' in list(chain.from_iterable(socket_exists)): - cursor.execute("UPDATE mysql.user SET plugin = '' WHERE user = '{}';".format(user)) - - remove_anon_user(cursor) - remove_testdb(cursor) - disallow_root_remotely(cursor) - - if not globals()['connected_with_new_password']: - if change_root_password: - pwd = {} - for host in hosts: - cursor.execute('use mysql;') - # Need to use SET PASSWORD starting from Mariadb 10.4 - # https://mariadb.com/kb/en/set-password/ - if globals()['mysql_version_above_10_3']: - # Will return an error if the host does NOT exist, so checking first. - cursor.execute( - 'select user, host, password from mysql.user where user="{}" and host="{}";'.format( - user, host)) - data = cursor.fetchall() - for d in data: - if d[1] == host: - cursor.execute( - "SET PASSWORD FOR '{}'@'{}' = PASSWORD('{}');".format(user, host, new_password)) - else: - cursor.execute( - "update user set password=PASSWORD('{}') where User='{}' AND Host='{}';".format( - new_password, - user, host)) - cursor.execute('flush privileges;') - cursor.execute('select user, host, password from mysql.user where user="{}";'.format(user)) - data = cursor.fetchall() - for d in data: - if d[1] == host: - pwd['{}'.format(d[1])] = d[2] - - out = set(hosts).symmetric_difference(set(pwd.keys())) - - info['hosts_failed'] = list(out) - hosts_ = list(set(hosts) - set(list(out))) - - for host in hosts_: - if pwd[host] == pwd[login_host]: - info['hosts_success'].append(host) - else: - info['hosts_failed'].append(login_host) - - if len(info['hosts_success']) >= 1: - info['stdout'] = 'Password for user: {} @ Hosts: {} changed to the desired state'.format(user, - info[ - 'hosts_success']) - - if len(info['hosts_failed']) >= 1 and len(info['hosts_success']) >= 1: - info['change_root_pwd'] = "True -- But not for all of the hosts" - elif len(info['hosts_failed']) == 0 and len(info['hosts_success']) >= 1: - info['change_root_pwd'] = True - elif len(info['hosts_failed']) == 1 and len(info['hosts_success']) == 0: - info['change_root_pwd'] = False - else: - info['change_root_pwd'] = "False -- meets the desired state" - - - connection.close() - except mysql.Error as e: - info['change_root_pwd'] = False - info['stderr'] = e - - elif check_mysql_connection(host=login_host, user=user, password=new_password, unix_socket=True): - - - if globals()['connected_with_socket']: - connection = mysql.connect(host=login_host, user=user, passwd=new_password, db='mysql', - unix_socket=socket_path['stdout']) - else: - if not new_password: - connection = mysql.connect(host=login_host, user=user, db='mysql') - else: - connection = mysql.connect(host=login_host, user=user, passwd=new_password, db='mysql') - - cursor_ = connection.cursor() - remove_anon_user(cursor_) - remove_testdb(cursor_) - disallow_root_remotely(cursor_) - info['change_root_pwd'] = "False -- meets the desired state" - - else: - info['change_root_pwd'] = False - info['stdout'] = 'Neither the provided old passwd nor the new passwd are correct' - pasword_incorrect_warn = True - if globals()['connected_with_new_password']: - info['stdout'] = 'Password of {}@{} Already meets the desired state'.format(user, login_host) - info['connected_with_socket?'] = globals()['connected_with_socket'] - info['new_password_correct?'] = globals()['connected_with_new_password'] - info['mysql_version_above_10_3?'] = globals()['mysql_version_above_10_3'] - return info - -### Testing ### -# check = check_mysql_connection('localhost', 'root', '', unix_socket=True) -# print(check) -# print("") -# -# secure = mysql_secure_installation(login_password="", new_password='password22', hosts=['localhost', '127.0.0.1'], login_host='localhost', remove_test_db=True, disallow_root_login_remotely=True) -# print(secure) -############ -####################### -############################################## - -from ansible.module_utils.basic import * - - -def main(): - fields = { - "login_password": {"required": True, "type": "str", "no_log": True}, - "new_password": {"required": True, "type": "str", "no_log": True}, - "user": {"type": "str", "default": "root"}, - "login_host": {"type": "str", "default": "localhost"}, - "hosts": {"type": "list", "default": ["localhost"]}, - "change_root_password": {"type": "bool", "default": True, "choices": [True, False]}, - "remove_anonymous_user": {"type": "bool", "default": True, "choices": [True, False]}, - "disallow_root_login_remotely": {"type": "bool", "default": False, "choices": [True, False]}, - "remove_test_db": {"type": "bool", "default": True, "choices": [True, False]}, - "disable_unix_socket": {"type": "bool", "default": True, "choices": [True, False]} - } - - module = AnsibleModule(argument_spec=fields) - - run = mysql_secure_installation(login_password=module.params['login_password'], - new_password=module.params['new_password'], - user=module.params['user'], - login_host=module.params['login_host'], - hosts=module.params['hosts'], - change_root_password=module.params['change_root_password'], - remove_anonymous_user=module.params['remove_anonymous_user'], - disallow_root_login_remotely=module.params['disallow_root_login_remotely'], - remove_test_db=module.params['remove_test_db'], - disable_unix_socket=module.params['disable_unix_socket']) - - if pasword_incorrect_warn: - module.warn('mysql_secure_installation --> Neither the provided old passwd nor the new passwd are correct -- Skipping') - - if len(run["hosts_success"]) >= 1: - changed_ = True - else: - changed_ = False - - module.exit_json(changed=changed_, meta=run) - - -if __name__ == '__main__': - main() diff --git a/mysql/pkg.ansible.yml b/mysql/pkg.ansible.yml index c6eadb7..1a22931 100644 --- a/mysql/pkg.ansible.yml +++ b/mysql/pkg.ansible.yml @@ -7,6 +7,6 @@ - mariadb - mariadb-client - mariadb-openrc - - py3-pymysql + - py3-mysqlclient \ No newline at end of file diff --git a/mysql/secure.ansible.yml b/mysql/secure.ansible.yml index 00b75a2..4100056 100644 --- a/mysql/secure.ansible.yml +++ b/mysql/secure.ansible.yml @@ -5,18 +5,24 @@ path: /var/lib/mysql/.secured register: MYSQL_SECURED_RESULT ignore_errors: true - - name: mysql_secure_installation - mysql_secure_installation: - login_password: '' - new_password: "{{mysql_root_password}}" - user: root - login_host: localhost - hosts: ['localhost', '127.0.0.1', '::1'] - change_root_password: true - remove_anonymous_user: true - disallow_root_login_remotely: true - remove_test_db: true - disable_unix_socket: false + - name: Set the root password + mysql_user: login_user=root login_password="" user=root password="{{mysql_root_password}}" host="{{item}}" + with_items: + - 127.0.0.1 + - ::1 + - localhost + when: not MYSQL_SECURED_RESULT.stat.exists + - name: Delete anonymous MySQL user + mysql_user: login_user=root login_password="{{mysql_root_password}}" name="" host="{{item}}" state=absent + with_items: + - localhost + - "{{ansible_nodename}}" + when: not MYSQL_SECURED_RESULT.stat.exists + - name: Delete Hostname based MySQL user + mysql_user: login_user=root login_password="{{mysql_root_password}}" user=root host="{{ansible_nodename}}" state=absent + when: not MYSQL_SECURED_RESULT.stat.exists + - name: Remove MySQL test database + mysql_db: login_user=root login_password="{{mysql_root_password}}" name=test state=absent when: not MYSQL_SECURED_RESULT.stat.exists - name: Create .secured lock file file: