Merge pull request #494 from andrewbogott/flake8

Flake8
This commit is contained in:
Anders Ingemann 2019-03-05 21:39:04 +01:00 committed by GitHub
commit 24ac85802d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 75 additions and 75 deletions

View file

@ -69,9 +69,9 @@ class AbstractPartitionMap(FSMProxy):
# Ask kpartx how the partitions will be mapped before actually attaching them.
mappings = log_check_call(['kpartx', '-l', volume.device_path])
import re
regexp = re.compile('^(?P<name>.+[^\d](?P<p_idx>\d+)) : '
'(?P<start_blk>\d) (?P<num_blks>\d+) '
'{device_path} (?P<blk_offset>\d+)$'
regexp = re.compile(r'^(?P<name>.+[^\d](?P<p_idx>\d+)) : '
r'(?P<start_blk>\d) (?P<num_blks>\d+) '
r'{device_path} (?P<blk_offset>\d+)$'
.format(device_path=volume.device_path))
log_check_call(['kpartx', '-as', volume.device_path])

View file

@ -56,11 +56,11 @@ class Source(object):
# The format is taken from `man sources.list`
# or: http://manpages.debian.org/cgi-bin/man.cgi?sektion=5&query=sources.list&apropos=0&manpath=sid&locale=en
import re
regexp = re.compile('^(?P<type>deb|deb-src)\s+'
'(\[\s*(?P<options>.+\S)?\s*\]\s+)?'
'(?P<uri>\S+)\s+'
'(?P<distribution>\S+)'
'(\s+(?P<components>.+\S))?\s*$')
regexp = re.compile(r'^(?P<type>deb|deb-src)\s+'
r'(\[\s*(?P<options>.+\S)?\s*\]\s+)?'
r'(?P<uri>\S+)\s+'
r'(?P<distribution>\S+)'
r'(\s+(?P<components>.+\S))?\s*$')
match = regexp.match(line).groupdict()
if match is None:
from .exceptions import SourceError

View file

@ -189,7 +189,7 @@ def get_all_classes(path=None, prefix='', excludes=[]):
for class_name, obj in classes:
# We only want classes that are defined in the module, and not imported ones
if obj.__module__ == module_name:
yield obj
yield obj
def check_ordering(task):

View file

@ -29,7 +29,7 @@ class Bytes(object):
@staticmethod
def parse(qty_str):
import re
regex = re.compile('^(?P<qty>\d+)(?P<unit>[KMGT]i?B|B)$')
regex = re.compile(r'^(?P<qty>\d+)(?P<unit>[KMGT]i?B|B)$')
parsed = regex.match(qty_str)
if parsed is None:
raise UnitError('Unable to parse ' + qty_str)

View file

@ -3,7 +3,7 @@ from contextlib import contextmanager
def get_partitions():
import re
regexp = re.compile('^ *(?P<major>\d+) *(?P<minor>\d+) *(?P<num_blks>\d+) (?P<dev_name>\S+)$')
regexp = re.compile(r'^ *(?P<major>\d+) *(?P<minor>\d+) *(?P<num_blks>\d+) (?P<dev_name>\S+)$')
matches = {}
path = '/proc/partitions'
with open(path) as partitions:

View file

@ -100,7 +100,7 @@ class AddManifestPreferences(Task):
@classmethod
def run(cls, info):
for name, preferences in info.manifest.packages['preferences'].iteritems():
info.preference_lists.add(name, preferences)
info.preference_lists.add(name, preferences)
class InstallTrustedKeys(Task):

View file

@ -21,7 +21,7 @@ class CheckExternalCommands(Task):
log.debug('Checking availability of ' + command)
path = find_executable(command)
if path is None or not os.access(path, os.X_OK):
if re.match('^https?:\/\/', package):
if re.match(r'^https?:\/\/', package):
msg = ('The command `{command}\' is not available, '
'you can download the software at `{package}\'.'
.format(command=command, package=package))

View file

@ -51,8 +51,8 @@ class SetGroups(Task):
from bootstrapvz.common.tools import sed_i
cloud_cfg = os.path.join(info.root, 'etc/cloud/cloud.cfg')
groups = info.manifest.plugins['cloud_init']['groups']
search = ('^ groups: \[adm, audio, cdrom, dialout, floppy, video,'
' plugdev, dip\]$')
search = (r'^ groups: \[adm, audio, cdrom, dialout, floppy, video,'
r' plugdev, dip\]$')
replace = (' groups: [adm, audio, cdrom, dialout, floppy, video,'
' plugdev, dip, {groups}]').format(groups=', '.join(groups))
sed_i(cloud_cfg, search, replace)
@ -92,7 +92,7 @@ class DisableModules(Task):
if patterns != "":
patterns = patterns + "|" + pattern
else:
patterns = "^\s+-\s+(" + pattern
patterns = r"^\s+-\s+(" + pattern
patterns = patterns + ")$"
regex = re.compile(patterns)

View file

@ -44,7 +44,7 @@ class CreateBootstrapFilterScripts(Task):
# The pattern matching when excluding is needed in order to filter
# everything below e.g. /usr/share/locale but not the folder itself
filter_lists = info._minimize_size['bootstrap_filter']
exclude_list = '\|'.join(map(lambda p: '.' + p + '.\+', filter_lists['exclude']))
exclude_list = r'\|'.join(map(lambda p: '.' + p + r'.\+', filter_lists['exclude']))
include_list = '\n'.join(map(lambda p: '.' + p, filter_lists['include']))
sed_i(filter_script, r'EXCLUDE_PATTERN', exclude_list)
sed_i(filter_script, r'INCLUDE_PATHS', include_list)

View file

@ -24,7 +24,7 @@ class SetNtpServers(Task):
import re
ntp_path = os.path.join(info.root, 'etc/ntp.conf')
servers = list(info.manifest.plugins['ntp']['servers'])
debian_ntp_server = re.compile('.*[0-9]\.debian\.pool\.ntp\.org.*')
debian_ntp_server = re.compile(r'.*[0-9]\.debian\.pool\.ntp\.org.*')
for line in fileinput.input(files=ntp_path, inplace=True):
# Will write all the specified servers on the first match, then supress all other default servers
if re.match(debian_ntp_server, line):

View file

@ -104,20 +104,20 @@ class CreateFromFolder(Task):
def set_fs_states(vol):
vol.fsm.current = 'detached'
vol.fsm.current = 'detached'
p_map = vol.partition_map
from bootstrapvz.base.fs.partitionmaps.none import NoPartitions
if not isinstance(p_map, NoPartitions):
p_map.fsm.current = 'unmapped'
p_map = vol.partition_map
from bootstrapvz.base.fs.partitionmaps.none import NoPartitions
if not isinstance(p_map, NoPartitions):
p_map.fsm.current = 'unmapped'
from bootstrapvz.base.fs.partitions.unformatted import UnformattedPartition
from bootstrapvz.base.fs.partitions.single import SinglePartition
for partition in p_map.partitions:
if isinstance(partition, UnformattedPartition):
partition.fsm.current = 'unmapped'
continue
if isinstance(partition, SinglePartition):
partition.fsm.current = 'formatted'
continue
partition.fsm.current = 'unmapped_fmt'
from bootstrapvz.base.fs.partitions.unformatted import UnformattedPartition
from bootstrapvz.base.fs.partitions.single import SinglePartition
for partition in p_map.partitions:
if isinstance(partition, UnformattedPartition):
partition.fsm.current = 'unmapped'
continue
if isinstance(partition, SinglePartition):
partition.fsm.current = 'formatted'
continue
partition.fsm.current = 'unmapped_fmt'

View file

@ -147,7 +147,7 @@ class ApplyPuppetManifest(Task):
log_check_call(['chroot', info.root, 'puppet', 'apply', '--verbose', '--debug', manifest_path])
os.remove(manifest_dst)
hosts_path = os.path.join(info.root, 'etc/hosts')
sed_i(hosts_path, '127.0.0.1\s*{hostname}\n?'.format(hostname=hostname), '')
sed_i(hosts_path, '127.0.0.1\\s*{hostname}\n?'.format(hostname=hostname), '')
class EnableAgent(Task):

View file

@ -35,19 +35,19 @@ def validate_manifest(data, validator, error):
error('Paravirtualized AMIs only support pvgrub as a bootloader', ['system', 'bootloader'])
if backing != 'ebs' and virtualization == 'hvm':
error('HVM AMIs currently only work when they are EBS backed', ['volume', 'backing'])
error('HVM AMIs currently only work when they are EBS backed', ['volume', 'backing'])
if backing == 's3' and partition_type != 'none':
error('S3 backed AMIs currently only work with unpartitioned volumes', ['system', 'bootloader'])
error('S3 backed AMIs currently only work with unpartitioned volumes', ['system', 'bootloader'])
if backing != 'ebs' and encrypted:
error('Encryption is supported only on EBS volumes')
error('Encryption is supported only on EBS volumes')
if encrypted is False and kms_key_id is not None:
error('KMS Key Id can be set only when encryption is enabled')
error('KMS Key Id can be set only when encryption is enabled')
if enhanced_networking == 'simple' and virtualization != 'hvm':
error('Enhanced networking only works with HVM virtualization', ['provider', 'virtualization'])
error('Enhanced networking only works with HVM virtualization', ['provider', 'virtualization'])
def resolve_tasks(taskset, manifest):

View file

@ -53,7 +53,7 @@ class CreatePVGrubCustomRule(Task):
grub_device = 'GRUB_DEVICE=/dev/xvda' + str(root_idx)
sed_i(script_dst, '^GRUB_DEVICE=/dev/xvda$', grub_device)
grub_root = '\troot (hd0,{idx})'.format(idx=root_idx - 1)
sed_i(script_dst, '^\troot \(hd0\)$', grub_root)
sed_i(script_dst, '^\troot \\(hd0\\)$', grub_root)
if info.manifest.volume['backing'] == 's3':
from bootstrapvz.common.tools import sed_i

View file

@ -23,7 +23,7 @@ def prepare_bootstrap(manifest, build_server):
bucket.delete_key(item.key)
s3_connection.delete_bucket(manifest.image['bucket'])
else:
yield
yield
@contextmanager

View file

@ -46,42 +46,42 @@ def waituntil(predicate, timeout=5, interval=0.05):
def read_from_socket(socket_path, termination_string, timeout, read_timeout=0.5):
import socket
import select
import errno
console = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
console.connect(socket_path)
console.setblocking(0)
import socket
import select
import errno
console = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
console.connect(socket_path)
console.setblocking(0)
from timeit import default_timer
start = default_timer()
from timeit import default_timer
start = default_timer()
output = ''
ptr = 0
continue_select = True
while continue_select:
read_ready, _, _ = select.select([console], [], [], read_timeout)
if console in read_ready:
while True:
try:
output += console.recv(1024)
if termination_string in output[ptr:]:
continue_select = False
else:
ptr = len(output) - len(termination_string)
break
except socket.error, e:
if e.errno != errno.EWOULDBLOCK:
raise Exception(e)
output = ''
ptr = 0
continue_select = True
while continue_select:
read_ready, _, _ = select.select([console], [], [], read_timeout)
if console in read_ready:
while True:
try:
output += console.recv(1024)
if termination_string in output[ptr:]:
continue_select = False
if default_timer() - start > timeout:
from .exceptions import SocketReadTimeout
msg = ('Reading from socket `{path}\' timed out after {seconds} seconds.\n'
'Here is the output so far:\n{output}'
.format(path=socket_path, seconds=timeout, output=output))
raise SocketReadTimeout(msg)
console.close()
return output
else:
ptr = len(output) - len(termination_string)
break
except socket.error, e:
if e.errno != errno.EWOULDBLOCK:
raise Exception(e)
continue_select = False
if default_timer() - start > timeout:
from .exceptions import SocketReadTimeout
msg = ('Reading from socket `{path}\' timed out after {seconds} seconds.\n'
'Here is the output so far:\n{output}'
.format(path=socket_path, seconds=timeout, output=output))
raise SocketReadTimeout(msg)
console.close()
return output
@contextmanager

View file

@ -2,7 +2,7 @@
envlist = flake8, pylint, yamllint, unit, integration, docs
[flake8]
ignore = E221,E241,E501
ignore = E221,E241,E501,W504
max-line-length = 110
[testenv]