Skip to content

Commit

Permalink
fixes #705 and improves parsing (#761)
Browse files Browse the repository at this point in the history
* fixes #705 and improves parsing

* fixes for python 2

* adds changelog

* fixes spelling

* fixes stuff; adds tests

* fixes useless tests

* fixes useless tests

* fixes changelog

* improves handling of addresses and other pr-comments

* small mr-fix
  • Loading branch information
toydarian authored Nov 11, 2024
1 parent 8492943 commit ea9d620
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 28 deletions.
3 changes: 3 additions & 0 deletions changelogs/fragments/705-pg_hba-invalid-and-parsing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
bugfixes:
- "postgresql_pg_hba - fixes #705 by preventing invalid strings to be written (https://github.com/ansible-collections/community.postgresql/pull/761)"
- "postgresql_pg_hba - improves parsing of quoted strings and escaped newlines (https://github.com/ansible-collections/community.postgresql/pull/761)"
310 changes: 284 additions & 26 deletions plugins/modules/postgresql_pg_hba.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@
PG_HBA_HDR = ['type', 'db', 'usr', 'src', 'mask', 'method', 'options']

WHITESPACES_RE = re.compile(r'\s+')
TOKEN_SPLIT_RE = re.compile(r'(?<=[\s"])')
WHITESPACE_OR_QUOTE_RE = re.compile(r'[\s"]')
ONLY_SPACES_RE = re.compile(r"^\s+$")
OPTION_RE = re.compile(r"([^=]+)=(.+)")
IPV4_ADDR_RE = re.compile(r'^"?((\d{1,3}\.){3}\d{1,3})(/(\d{1,2}))?"?$')
# this regex allows for some invalid IPv6 addresses like ':::', but I honestly don't care
IPV6_ADDR_RE = re.compile(r'^"?([a-f0-9]*:[a-f0-9:]*:[a-f0-9]*)(/(\d{1,3}))?"?$')


class PgHbaError(Exception):
Expand Down Expand Up @@ -305,6 +312,126 @@ class PgHbaRuleValueError(PgHbaRuleError):
'''


class TokenizerException(Exception):
"""
This exception is raised when a string can't be tokenized
"""


def parse_hba_file(input_string):
"""
This function parses a complete pg_hba.conf file into a list of tuples where each tuple represents a rule in the
file.
:param input_string: The whole string in the pg_hba.conf file (not just a single line)
:return: A list of Rule objects that represents the contents of the input string.
"""
rules = []
line_iter = iter(input_string.split("\n"))
line = next(line_iter, None)
while line is not None:
# if that line continues, we just glue the next line onto the end until it ends
# we can and have to do that, as continuation even applies within comments and quoted strings [sic]
# https://www.postgresql.org/docs/current/auth-pg-hba-conf.html#AUTH-PG-HBA-CONF
comment = None
while line.endswith("\\"):
cont_line = next(line_iter, None)
if cont_line is None:
# we got a line continuation, but there was no more line
raise PgHbaRuleError("The last line ended with a '\\' (line continuation).")
line += "\n" + cont_line # add the newline so we don't lose that information
# handle comment-only lines
if line.strip().startswith('#'):
parsed_line = "COMMENT"
comment = line
# handle empty lines
elif not line.strip():
parsed_line = "EMPTY"
# handle "normal" lines
else:
# handle lines with comments
sanitized_line = line
if line.find('#') >= 0:
comment = sanitized_line[sanitized_line.index("#"):]
sanitized_line = sanitized_line[0:sanitized_line.index("#")]
# remove continuation tokens
sanitized_line = sanitized_line.replace("\\\n", "")
tokens = tokenize(sanitized_line)
parsed_line = tokens
# create Rule
rules.append({"tokens": parsed_line, "line": line, "comment": comment})
line = next(line_iter, None)
return rules


def tokenize(string):
"""
This function tokenizes a string respecting quotes. It needs to be fed a complete string where all quotes are
properly closed (there needs to be an even amount of `"`) otherwise it raises an exception.
You can, for example use this to tokenize a full line of a pg_hba-file (make sure to handle any escaped newlines or
comments before) or a string of options.
:param string: A string to tokenize
:return: The tokenized string as a list of strings
"""

# We need to do this charade for splitting to be compatible with Python 3.6 which has been EOL for three years
# at the time of writing. If you come across this after support for Python 3.6 has been dropped, please replace
# WHITESPACE_OR_QUOTE_RE in the beginning of the file with `TOKEN_SPLIT_RE = re.compile(r'(?<=[\s"])')`
# and the next 8 lines (including bare_tokens.append) with `bare_tokens = TOKEN_SPLIT_RE.split(string)`
bare_tokens = []
lastpos = 0
nextmatch = WHITESPACE_OR_QUOTE_RE.search(string)
while nextmatch:
bare_tokens.append(string[lastpos:nextmatch.end()])
lastpos = nextmatch.end()
nextmatch = WHITESPACE_OR_QUOTE_RE.search(string, lastpos)
bare_tokens.append(string[lastpos:])

tokens = []
state = "START"
current_symbol = ""

for token in bare_tokens:

# if the previous token ended a quoted string, we need to decide how to continue
if state == "QUOTE_END":
state = "START"
# if the token consists of only spaces, we know for sure this symbol is finished
if token == "" or ONLY_SPACES_RE.match(token):
tokens.append(current_symbol.strip())
current_symbol = ""
continue
# otherwise it might continue with more characters or even another quote

if token == "":
continue

# we either start a new symbol or continue after a finished quote
if state == "START":
# outside of quotes, whitespaces are ignored
if ONLY_SPACES_RE.match(token):
continue

current_symbol += token
# we use endswith here, to correctly handle strings like 'somekey="somevalue"'
# if there was a space before it, the quote will be alone, so that is not an issue
if token.endswith("\""):
state = "QUOTE"
else:
tokens.append(current_symbol.strip())
current_symbol = ""

# if we are inside a quoted string we consume and append tokens until the quoted string ends
elif state == "QUOTE":
current_symbol += token
if token.endswith("\""):
state = "QUOTE_END"

if state != "START":
raise TokenizerException("Unterminated quote")

return tokens


class PgHba(object):
"""
PgHba object to read/write entries to/from.
Expand Down Expand Up @@ -351,32 +478,80 @@ def read(self):
# read the pg_hbafile
try:
with open(self.pg_hba_file, 'r') as file:
for line in file:
# split into line and comment
line = line.strip()
comment = None
if '#' in line:
line, comment = line.split('#', 1)
if comment == '':
comment = None
line = line.rstrip()
# if there is just a comment, save it
if line == '':
if comment is not None:
self.comment.append('#' + comment)
else:
if comment is not None and not self.keep_comments_at_rules:
# save the comment independent of the line
self.comment.append('#' + comment)
comment = None
try:
self.add_rule(PgHbaRule(line=line, comment=comment))
except PgHbaRuleError:
pass
self.unchanged()
self.preexisting_rules = dict(self.rules)
hba_string = file.read()
except IOError:
pass
return

for line in parse_hba_file(hba_string):
if line["tokens"] == "COMMENT":
self.comment.append(line["comment"])
elif line["tokens"] != "EMPTY":
if not line["comment"]:
self._from_tokens(line["tokens"])
else:
if self.keep_comments_at_rules:
self._from_tokens(line["tokens"], line["comment"])
else:
self.comment.append(line["comment"])
self._from_tokens(line["tokens"])
self.unchanged()
self.preexisting_rules = dict(self.rules)

def _from_tokens(self, symbols, comment=None):
if len(symbols) < 4:
raise PgHbaRuleError("The rule has too few symbols")

contype = _strip_quotes(symbols[0])
if contype not in PG_HBA_TYPES:
raise PgHbaRuleValueError("Found an unknown connection-type {0}".format(symbols[0]))

# don't strip quotes from database or user, as they have a special meaning there [sic]
# > Quoting one of the keywords in a database, user, or address field (e.g., all or replication) makes the word
# > lose its special meaning, and just match a database, user, or host with that name.
database = handle_db_and_user_strings(symbols[1])
user = handle_db_and_user_strings(symbols[2])

mask = None
address = None
if contype == "local":
method_token = 3
else:
address, address_type, prefix_len = handle_address_field(symbols[3])
# it is an IP, but without a CIDR suffix, so we expect a netmask in the next token
if address_type.startswith("IP") and prefix_len == -1:
mask, mask_type, prefix_len = handle_netmask_field(symbols[4], raise_not_valid=False)
if mask_type == "invalid":
raise PgHbaRuleError("The rule either needs a hostname, full CIDR or an IP-address and a netmask")
if mask_type != address_type:
raise PgHbaRuleError("Can't mix IPv4 and IPv6 netmasks and addresses")
if len(symbols) < 6:
raise PgHbaRuleError("The rule has too few symbols")
method_token = 5 # the method should be after the netmask
# if it is anything but a bare IP address, we expect the method on index 4
else:
if len(symbols) < 5:
raise PgHbaRuleError("The rule has too few symbols")
method_token = 4
# convert address so the rule understands it, we will handle it better in the future
if address_type != "hostname":
address = str(address) + "/" + str(prefix_len)

auth_method = _strip_quotes(symbols[method_token])
if auth_method not in PG_HBA_METHODS:
raise PgHbaRuleValueError("Found an unknown method: {0}".format(symbols[method_token]))

auth_options = None
# if there is anything after the method, that must be options
if len(symbols) > method_token + 1:
# we will handle options in a smarter way in the future
# auth_options = parse_auth_options(symbols[method_token + 1:])
# now we run it just to validate the options
parse_auth_options(symbols[method_token + 1:])
auth_options = " ".join(symbols[method_token + 1:])

self.add_rule(
PgHbaRule(contype=contype, databases=database, users=user, source=address, netmask=mask, method=auth_method,
options=auth_options, comment=comment))

def write(self, backup_file=''):
'''
Expand Down Expand Up @@ -471,7 +646,10 @@ def render(self):
rule_lines = []
for rule in self.get_rules(with_lines=True):
if 'comment' in rule:
rule_lines.append(rule['line'] + '\t#' + rule['comment'])
if not rule['comment'].startswith('#'):
rule_lines.append(rule['line'] + '\t#' + rule['comment'])
else:
rule_lines.append(rule['line'] + '\t' + rule['comment'])
else:
rule_lines.append(rule['line'])
result = comment + '\n' + '\n'.join(rule_lines)
Expand Down Expand Up @@ -747,6 +925,82 @@ def user_weight(self):
return 1


def _strip_quotes(string):
if not string:
return string
return string[1:-1] if string.startswith("\"") else string


def parse_auth_options(options):
option_dict = {}
for option in options:
split_option = OPTION_RE.match(_strip_quotes(option))
if not split_option:
raise PgHbaRuleValueError(
"Found invalid option '{}'. Options need to be in the format 'key=value'".format(option))
if split_option.group(1) in option_dict.keys():
raise PgHbaRuleValueError(
"The rule contains two options with the same key ('{0}')".format(split_option.group(1)))
option_dict[split_option.group(1)] = split_option.group(2)

return option_dict


def handle_db_and_user_strings(string):
# if the string is quoted or a regex, we return it unaltered
if "\"" in string or string.startswith("/"):
return string
# we sort the dbs/users alphabetically
else:
return ",".join(sorted(string.split(",")))


def handle_address_field(address):
suffix = -1

try:
ret_addr = ipaddress.ip_network(address, strict=True)
ret_type = "IPv" + str(ret_addr.version)
if "/" in address:
suffix = ret_addr.prefixlen
ret_addr = str(ret_addr.network_address)
except ValueError as e:
# it is a network, but has host bits set
if "has host bits set" in e.args[0]:
raise PgHbaValueError("{0} has host bits set".format(address))
# it might be a quoted address or network
if address.startswith("\""):
ret_addr, ret_type, suffix = handle_address_field(_strip_quotes(address))
# if it was a quoted address, we return it without quotes
if ret_type != "hostname":
return ret_addr, ret_type, suffix
# not a valid network or address, may be a hostname or keyword
if re.search(r'[:\\/@+ ]', address) or IPV4_ADDR_RE.match(address):
raise PgHbaValueError(
"The string '{0}' is neither a valid IP address, network, hostname or keyword".format(address))
else:
ret_addr = address
ret_type = "hostname"

return ret_addr, ret_type, suffix


def handle_netmask_field(netmask, raise_not_valid=True):
mask = _strip_quotes(netmask)

try:
mask_as_ip = ipaddress.ip_address(u'{0}'.format(mask))
binvalue = "{0:b}".format(int(mask_as_ip))
if '01' in binvalue:
raise PgHbaValueError('IP mask {0} is invalid (binary value has 1 after 0)'.format(mask))
return mask, "IPv" + str(mask_as_ip.version), binvalue.count('1')
except ValueError:
if raise_not_valid:
raise PgHbaValueError("The string '{0}' is not a valid netmask".format(mask))
else:
return "", "invalid", -1


def main():
'''
This function is the main function of this module
Expand Down Expand Up @@ -854,6 +1108,10 @@ def main():
try:
for database in rule['databases'].split(','):
for user in rule['users'].split(','):
if len(tokenize(database)) != 1:
module.fail_json(msg="Invalid string for database: {0}".format(database))
if len(tokenize(user)) != 1:
module.fail_json(msg="Invalid string for users: {0}".format(user))
pg_hba_rule = PgHbaRule(rule['contype'], database, user, rule['address'], rule['netmask'],
rule['method'], rule['options'], comment=rule['comment'])
if rule['state'] == "present":
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/targets/postgresql_pg_hba/meta/main.yml
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
dependencies:
- setup_postgresql_db
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,22 @@
- assert:
that:
- '"#comment1\nhost\tall\tall\t2001:db8::1/128\tmd5\nhost\tall\tall\t2001:db8::2/128\tmd5\t#comment2\nhost\tall\tall\t2001:db8::3/128\tmd5\t#comment3" == content'

- community.postgresql.postgresql_pg_hba:
dest: pg_hba.conf
users: '{ "oh": "no" }'
state: present
contype: host
create: true
register: invalid_username
ignore_errors: true

- assert:
that: invalid_username is failed

- community.postgresql.postgresql_pg_hba:
dest: pg_hba.conf
users: oops
state: present
contype: host
create: true
Loading

0 comments on commit ea9d620

Please sign in to comment.