# geotecha - A software suite for geotechncial engineering
# Copyright (C) 2018 Rohan T. Walker (rtrwalker@gmail.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/gpl.html.
"""Input/output operations,attribute checking, proccess input files that
have python syntax.
"""
from __future__ import division, print_function
try:
#py27
import __builtin__
except ImportError:
#py34
import builtins as __builtin__
import sys
import ast
import imp
import textwrap
import inspect
import numpy as np
from geotecha.piecewise.piecewise_linear_1d import PolyLine
from sympy.printing.fcode import FCodePrinter
import multiprocessing
import time
try:
#py27
from StringIO import StringIO
except ImportError:
#py34
from io import StringIO
import re
import os
import pandas as pd
import matplotlib.pyplot as plt
import time
import time
from datetime import timedelta
import fnmatch
import argparse
import logging
from contextlib import contextmanager
import pkg_resources
from geotecha.plotting.one_d import copy_dict
import pkgutil
import importlib
[docs]class SyntaxChecker(ast.NodeVisitor):
"""Check if a string contains only allowabel python syntax
`SyntaxChecker` provides functionality to check syntax of string using the
ast module. Basically white-list allowable functions and attributes. A
SyntaxError will be raised if any code is found that is not allowed.
Parameters
----------
allow : list of str
List of SyntaxChecker 'allow' methods to call. Default = [].
e.g. allow=['ast', 'numpy'] will call SyntaxChecker.allow_ast(),
SyntaxChecker.allow_numpy()
Attributes
----------
allowed_functions : dict
A dictionary of allowable functions. e.g. allowed_functions['cos'] =
math.cos would permit the use of 'cos' from the math module.
allowed_functions['math'] = math would allow use of the math
module (note to allow things like 'math.sin' you would also have to
add 'sin' to the allowable_attributes). Adding functions one by one
can be cumbersome; see the 'allow_<some_name>' methods for bulk adding
of common functionality.
allowed_node_types : dict
Dictionary of allowable ast node names.
e.g. allowed_node_types['Name'] = ast.Name would allow ast.Name nodes.
typically use SytaxChecker.allow_ast to allow a reasonable set of ast
nodes
allowed_attributes : set of string
set of allowable attributes. e.g. you have already allowed the math
module with allowed_functions['math'] = math.
allowable_attributes.add('tan') will allow use of math.tan(34) etc.
Note that the attribute 'tan' could now be used as an attribute for
any of the functions in allowed_functions even though this may not
make any logical sense. i.e. it will pass the SyntaxChecker but would
fail if the code was executed. Adding attributes one by one
can be cumbersome; see the 'allow_<some_name>' methods for bulk adding
of common functionality.
safe_names : dict
Dictionary of safe names.
Default safe_names={'True': True, 'False': False, 'None': None}.
print_each_node : bool
Print out each node when they are visited.
Default print_each_node=False. Note that
nodes may be printed twice, once for a generic visit and once for a
specific visit such as visit_Name, visit_Attribute etc.
Methods
-------
allow_ast
Allow a subset of ast node types.
allow_builtin
Allow a subset of __builtins__ functions.
allow_numpy
Allow a subset of numpy functionality via np.funtion syntax.
allow_PolyLine
Allow PolyLine class from geotecha.piecewise.piecewise_linear_1d.
See Also
--------
ast.NodeVisitor : Parent class. Descriptions of python syntax grammar.
object_members : Easily print a string of an objects routines for use in
a 'allow_<some_name>' method.
Notes
-----
If subclassing new 'allow_<some_name>' methods can be written to bulk add
allowable functions and attributes.
Examples
--------
>>> syntax_checker = SyntaxChecker(allow=['ast', 'builtin', 'numpy'])
>>> tree = ast.parse('a=np.cos(0.5)', mode='exec')
>>> syntax_checker.visit(tree)
>>> syntax_checker = SyntaxChecker(allow=['ast', 'builtin', 'numpy'])
>>> tree = ast.parse('a=cos(0.5)', mode='exec')
>>> syntax_checker.visit(tree)
Traceback (most recent call last):
...
SyntaxError: cos is not an allowed function!
"""
#resources that helped in making this:
#http://stackoverflow.com/questions/1515357/simple-example-of-how-to-use-ast-nodevisitor
#http://stackoverflow.com/questions/10661079/restricting-pythons-syntax-to-execute-user-code-safely-is-this-a-safe-approach
#http://stackoverflow.com/questions/12523516/using-ast-and-whitelists-to-make-pythons-eval-safe
#http://docs.python.org/2/library/ast.html#abstract-grammar
#http://eli.thegreenplace.net/2009/11/28/python-internals-working-with-python-asts/
def __init__(self, allow=[]):
"""Initialize a SyntaxChecker object"""
#super(SyntaxChecker, self).__init__() # not sure if I need this
self.allowed_functions = dict()
self.allowed_node_types = dict()
self.allowed_attributes = set()
self.safe_names = {'True': True, 'False': False, 'None': None}#dict()
self.print_each_node = False
for v in allow:
s = 'allow_{0}'.format(v)
if hasattr(self,s):
getattr(self, s)()
else:
raise AttributeError("'SyntaxChecker' object has no attribute "
"'{0}'. i.e. '{1}' is not a valid member "
"of the allow list".format(s,v))
[docs] def visit_Call(self, node):
"""Custom visit a 'Call' node"""
if self.print_each_node:
print('CALL:', ast.dump(node))
if hasattr(node.func,'id'):
if node.func.id not in self.allowed_functions:
raise SyntaxError("{} is not an allowed function!".format(node.func.id))
else:
ast.NodeVisitor.generic_visit(self, node)
else:
ast.NodeVisitor.generic_visit(self, node)
[docs] def visit_Name(self, node):
"""Custom visit a 'Name' node"""
if self.print_each_node:
print('NAME: ', ast.dump(node))
if type(node.ctx).__name__=='Store':
self.allowed_attributes.add(node.id)
elif type(node.ctx).__name__=='Load':
if node.id not in self.safe_names and node.id not in self.allowed_functions and node.id not in self.allowed_attributes:
raise SyntaxError("cannot use {} as name, function, or attribute!".format(node.id))
sys.exit(0)
ast.NodeVisitor.generic_visit(self, node)#
[docs] def visit_Attribute(self, node):
"""Custom visit an 'Attribute' node"""
if self.print_each_node:
print('ATTRIBUTE:', ast.dump(node))
if node.attr not in self.allowed_functions and node.attr not in self.allowed_attributes:
raise SyntaxError("{} is not an allowed attribute!".format(node.attr))
sys.exit(0)
ast.NodeVisitor.generic_visit(self, node)
[docs] def generic_visit(self, node):
"""Custom visit a generic node"""
if self.print_each_node:
print('GENERIC: ', ast.dump(node))
if type(node).__name__ not in self.allowed_node_types:
raise SyntaxError("{} is not allowed!".format(type(node).__name__))
sys.exit(0)
else:
ast.NodeVisitor.generic_visit(self, node)
def _split_string_allow(self, s, add_to='attributes', fn_module=None):
"""Split a string and add items to allowed lists
Adds items to self.allowed_attributes or self.allowed_functions
Parameters
----------
s : str
String containing items to allow.
add_to : ['attributes', 'functions']
If add_to='attributes' items will be added to
self.allowed_attributes. If add_to='functions' the items will be
added to self.allowed_functions.
fn_module : module
Module where functions are stored. Only used when
add_to='functions'.
"""
in_list = ['attributes','functions']
if add_to not in in_list:
raise ValueError('add_to cannot be {}. It must be one of '
'[{}]'.format("'{}'".format(add_to),
", ".join(["'{}'".format(v) for v in in_list])))
if add_to=='functions':
for v in [a for a in s.split() if not a.startswith('#')]:
if hasattr(fn_module, v):
self.allowed_functions[v] = getattr(fn_module, v)
return
if add_to=='attributes':
for v in [a for a in s.split() if not a.startswith('#')]:
self.allowed_attributes.add(v)
return
[docs] def allow_ast(self):
"""Allow subset of ast node types"""
#py27
#object_members(ast, 'class')
s=textwrap.dedent("""\
#py27
#AST Add And Assert Assign Attribute AugAssign AugLoad
AugStore BinOp BitAnd BitOr BitXor BoolOp Break Call
#ClassDef Compare Continue Del Delete Dict DictComp Div
Ellipsis Eq ExceptHandler #Exec Expr Expression ExtSlice
FloorDiv For FunctionDef GeneratorExp Global Gt GtE If
IfExp #Import #ImportFrom In Index Interactive Invert Is
IsNot LShift Lambda List ListComp Load Lt LtE Mod Module
Mult Name #NodeTransformer #NodeVisitor Not NotEq NotIn Num
Or Param Pass Pow Print RShift Raise Repr Return Set
SetComp Slice Store Str Sub Subscript Suite TryExcept
TryFinally Tuple UAdd USub UnaryOp While With Yield #alias
arguments boolop #cmpop comprehension #excepthandler #expr
#expr_context keyword #mod #operator slice #stmt #unaryop
#py34_addtional
Bytes NameConstant #Nonlocal Starred Try YieldFrom arg
withitem
""")
for v in [a for a in s.split() if not a.startswith('#')]:
if hasattr(ast,v):
self.allowed_node_types[v] = getattr(ast, v)
return
[docs] def allow_builtin(self):
"""Allow subset of __builtins__ functions"""
#object_members(__builtin__ , 'routine')
s=textwrap.dedent("""\
#py27
#__import__ abs all any apply bin #callable chr cmp
#coerce compile #debugfile #delattr #dir divmod #eval
#evalsc #execfile filter format #getattr #globals hasattr
#hash hex id input #intern isinstance issubclass iter
len locals map max min next oct #open #open_in_spyder
ord pow print range raw_input reduce #reload repr
round #runfile #setattr sorted sum unichr #vars zip
#py34_addtional
#__build_class__ ascii #exec repr
""")
self._split_string_allow(s, add_to='functions', fn_module=__builtin__)
#object_members(__builtin__ , 'class')
s=textwrap.dedent("""\
#py27
basestring bool #buffer bytearray bytes #classmethod
complex dict enumerate #file float frozenset int list
long #memoryview #object #property reversed set slice
#staticmethod str super tuple type unicode xrange
#py34_addtional
#__loader__ filter map range reversed zip
""")
self._split_string_allow(s, add_to='functions',fn_module=__builtin__)
#object_members(complex , 'routine')
s=textwrap.dedent("""\
#py27
conjugate
#py34_addtional
""")
self._split_string_allow(s, add_to='attributes')
#object_members(dict , 'routine')
s=textwrap.dedent("""\
#py27
copy fromkeys get has_key items iteritems iterkeys
itervalues keys pop popitem setdefault update values
viewitems viewkeys viewvalues
#py34_addtional
clear
""")
self._split_string_allow(s, add_to='attributes')
#object_members(list , 'routine')
s=textwrap.dedent("""
#py27
append count extend index
insert pop remove reverse sort
#py34_addtional
clear copy
""")
self._split_string_allow(s, add_to='attributes')
#object_members(str , 'routine')
s=textwrap.dedent("""
#py27
capitalize center count decode encode endswith
expandtabs find format index isalnum isalpha isdigit
islower isspace istitle isupper join ljust lower
lstrip partition replace rfind rindex rjust
rpartition rsplit rstrip split splitlines startswith
strip swapcase title translate upper zfill
#py34_addtional
casefold format_map isdecimal isidentifier
isnumeric isprintable isspace maketrans
""")
self._split_string_allow(s, add_to='attributes')
#object_members(float , 'routine')
s=textwrap.dedent("""
#py27
as_integer_ratio conjugate fromhex hex
is_integer
#py34_addtional
""")
self._split_string_allow(s, add_to='attributes')
#object_members(set , 'routine')
s=textwrap.dedent("""
#py27
add clear copy difference
difference_update discard intersection
intersection_update isdisjoint issubset issuperset
pop remove symmetric_difference
symmetric_difference_update union update
#py34_addtional
""")
self._split_string_allow(s, add_to='attributes')
#object_members(slice , 'routine')
s=textwrap.dedent("""
#py27
indices
#py34_addtional
""")
self._split_string_allow(s, add_to='attributes')
return
[docs] def allow_numpy(self):
"""Allow a subset of numpy functionality via np.attribute syntax"""
self.allowed_functions['np'] = np
self.allowed_functions['numpy'] = np
#object_members(np , 'routine')
s=textwrap.dedent("""
#numpy_v_1_9_0
add_docstring add_newdoc add_newdoc_ufunc alen all allclose
alltrue alterdot amax amin angle any append apply_along_axis
apply_over_axes arange argmax argmin argpartition argsort
argwhere around array array2string array_equal array_equiv
array_repr array_split array_str asanyarray asarray
asarray_chkfinite ascontiguousarray asfarray asfortranarray
asmatrix asscalar atleast_1d atleast_2d atleast_3d average
bartlett base_repr bench binary_repr bincount blackman bmat
broadcast_arrays busday_count busday_offset byte_bounds can_cast
choose clip column_stack common_type compare_chararrays compress
concatenate convolve copy copyto corrcoef correlate count_nonzero
cov cross cumprod cumproduct cumsum datetime_as_string
datetime_data delete deprecate deprecate_with_doc diag
diag_indices diag_indices_from diagflat diagonal diff digitize
disp dot dsplit dstack ediff1d einsum empty empty_like
expand_dims extract eye fastCopyAndTranspose fill_diagonal
find_common_type fix flatnonzero fliplr flipud frombuffer
fromfile fromfunction fromiter frompyfunc fromregex fromstring
full full_like fv genfromtxt get_array_wrap get_include
get_printoptions getbuffer getbufsize geterr geterrcall geterrobj
gradient hamming hanning histogram histogram2d histogramdd hsplit
hstack i0 identity imag in1d indices info inner insert
int_asbuffer interp intersect1d ipmt irr is_busday isclose
iscomplex iscomplexobj isfortran isneginf isposinf isreal
isrealobj isscalar issctype issubclass_ issubdtype issubsctype
iterable ix_ kaiser kron lexsort linspace load loads loadtxt
logspace lookfor mafromtxt mask_indices mat max maximum_sctype
may_share_memory mean median meshgrid min min_scalar_type
mintypecode mirr msort nan_to_num nanargmax nanargmin nanmax
nanmean nanmedian nanmin nanpercentile nanstd nansum nanvar
ndfromtxt ndim nested_iters newbuffer nonzero nper npv obj2sctype
ones ones_like outer packbits pad partition percentile piecewise
pkgload place pmt poly polyadd polyder polydiv polyfit polyint
polymul polysub polyval ppmt prod product promote_types ptp put
putmask pv rank rate ravel ravel_multi_index real real_if_close
recfromcsv recfromtxt repeat require reshape resize restoredot
result_type roll rollaxis roots rot90 round round_ row_stack
safe_eval save savetxt savez savez_compressed sctype2char
searchsorted select set_numeric_ops set_printoptions
set_string_function setbufsize setdiff1d seterr seterrcall
seterrobj setxor1d shape show_config sinc size sometrue sort
sort_complex source split squeeze std sum swapaxes take tensordot
test tile trace transpose trapz tri tril tril_indices
tril_indices_from trim_zeros triu triu_indices triu_indices_from
typename union1d unique unpackbits unravel_index unwrap vander
var vdot vsplit vstack where who zeros zeros_like pi
""")
self._split_string_allow(s, add_to='attributes')
#object_members(np.ndarray , 'routine')
s=textwrap.dedent("""
#numpy_v_1_9_0
all any argmax argmin argpartition argsort astype byteswap choose
clip compress conj conjugate copy cumprod cumsum diagonal dot
dump dumps fill flatten getfield item itemset max mean min
newbyteorder nonzero partition prod ptp put ravel repeat reshape
resize round searchsorted setfield setflags sort squeeze std sum
swapaxes take tobytes tofile tolist tostring trace transpose var
view
""")
self._split_string_allow(s, add_to='attributes')
self.allowed_attributes.add('math')
#object_members(np.math,'routine')
s=textwrap.dedent("""
#numpy_v_1_9_0
acos acosh asin asinh atan atan2 atanh ceil copysign cos cosh
degrees erf erfc exp expm1 fabs factorial floor fmod frexp fsum
gamma hypot isfinite isinf isnan ldexp lgamma log log10 log1p
log2 modf pow radians sin sinh sqrt tan tanh trunc
""")
self._split_string_allow(s, add_to='attributes')
#print(object_members(np, 'ufunc'))
s=textwrap.dedent("""
#numpy_v_1_9_0
abs absolute add arccos arccosh arcsin arcsinh arctan arctan2
arctanh bitwise_and bitwise_not bitwise_or bitwise_xor ceil conj
conjugate copysign cos cosh deg2rad degrees divide equal exp exp2
expm1 fabs floor floor_divide fmax fmin fmod frexp greater
greater_equal hypot invert isfinite isinf isnan ldexp left_shift
less less_equal log log10 log1p log2 logaddexp logaddexp2
logical_and logical_not logical_or logical_xor maximum minimum
mod modf multiply negative nextafter not_equal power rad2deg
radians reciprocal remainder right_shift rint sign signbit sin
sinh spacing sqrt square subtract tan tanh true_divide trunc
""")
self._split_string_allow(s, add_to='attributes')
#to find more numpy functionality to add try:
#object_members(np.polynomial,'routine')
#mem=object_members(np.polynomial,'class')
#object_members(np.polynomial.polynomial,'routine')
[docs] def allow_PolyLine(self):
"""Allow PolyLine class from geotecha.piecewise.piecewise_linear_1d"""
self.allowed_functions['PolyLine']=PolyLine
s=textwrap.dedent("""
x y xy x1 x2 y1 y2 x1_x2_y1_y2
""")
self._split_string_allow(s, add_to='attributes')
return
[docs]def object_members(obj, info='function', join=True):
"""Get a list of object members.
Parameters
----------
obj : object
Object to get members of.
info : string, optional
Type of members to gather. Members will be gathered according to
inspect.is<member_type>. e.g. info='function' will check in object for
inspect.isfunction. Other values include 'method' 'function'
'routine'. Default info='function'. NOTE that it is safer to use
'routine' when getting functions. There is a custom check for
info='ufunc' to give ufunc members of numpy.
join : bool, optional
If join==True then list will be joined together into one large
space separated string.
Returns
-------
members : list of str, or str
List of member names of specified types, or space separated names
i single string.
"""
#useful resources
#http://www.andreas-dewes.de/en/2012/static-code-analysis-with-python/
#isroutine vs isfunction: http://stackoverflow.com/a/22428966
#pkgutil : http://stackoverflow.com/a/1310912
#add ufunc
def isufunc(object):
"""Return true if the object is a numpy.ufunc"""
return isinstance(object, np.ufunc)
inspect.isufunc = isufunc
if info=="module":
# for some reason getmembers with modules does not allways return a
# complete list.
pkgpath = os.path.dirname(obj.__file__)
members = [name for _, name, _ in pkgutil.iter_modules([pkgpath])]
else:
members = [i for i,j in
inspect.getmembers(obj,
getattr(inspect,'is{}'.format(info)))]
if join:
members='\n'.join(textwrap.wrap(" ".join(members),
break_long_words=False, width=65))
return members
[docs]def make_module_from_text(reader, syntax_checker=None):
"""Make a module from file, StringIO, text etc.
Parameters
----------
reader : file_like object
Object to get text from.
syntax_checker : SyntaxChecker object, optional
Specifies what syntax is allowed when executing the text
within `reader`. Default = None, which means that text will be
executed with the all powerful all dangerous exec function. Only use
this option if you trust the input.
Returns
-------
m : module
text as module
See Also
--------
SyntaxChecker : allow certain syntax
Notes
-----
I suspect it is best if `reader` is a string. i.e reader can be pickled.
fileobjects may be cause issues if used with multiprocessing.process
"""
#useful resources:
#for making module out of strings/files see http://stackoverflow.com/a/7548190/2530083
#http://old-blog.ooz.ie/2011/03/python-exec-module-in-namespace.html
#http://stackoverflow.com/questions/7969949/whats-the-difference-between-globals-locals-and-vars
#http://lucumr.pocoo.org/2011/2/1/exec-in-python/
# see down bottom for config stuff
#http://stackoverflow.com/a/2372145/2530083
# limit namespace
mymodule = imp.new_module('mymodule') #may need to randomise the name; not sure
if syntax_checker is None:
exec(reader, mymodule.__dict__)
return mymodule
if not isinstance(syntax_checker, SyntaxChecker):
raise TypeError('syntax_checker should be a SyntaxChecker instance.')
tree = ast.parse(reader, mode='exec')
syntax_checker.visit(tree)
compiled = compile(tree, '<string>', "exec")
mymodule.__dict__.update(syntax_checker.allowed_functions)
exec(compiled, mymodule.__dict__)
return mymodule
[docs]class print_all_nodes(ast.NodeVisitor):#http://stackoverflow.com/a/1515403/2530083
"""Simple ast.NodeVisitor sub class that prints each node when visited
Examples
--------
>>> text="a=[3,2]*2"
>>> x=print_all_nodes()
>>> tree=ast.parse(text)
>>> x.visit(tree)
Module : Module(body=[Assign(targets=[Name(id='a', ctx=Store())], value=BinOp(left=List(elts=[Num(n=3), Num(n=2)], ctx=Load()), op=Mult(), right=Num(n=2)))])
Assign : Assign(targets=[Name(id='a', ctx=Store())], value=BinOp(left=List(elts=[Num(n=3), Num(n=2)], ctx=Load()), op=Mult(), right=Num(n=2)))
Name : Name(id='a', ctx=Store())
Store : Store()
BinOp : BinOp(left=List(elts=[Num(n=3), Num(n=2)], ctx=Load()), op=Mult(), right=Num(n=2))
List : List(elts=[Num(n=3), Num(n=2)], ctx=Load())
Num : Num(n=3)
Num : Num(n=2)
Load : Load()
Mult : Mult()
Num : Num(n=2)
"""
#useful resources
#http://stackoverflow.com/a/1515403/2530083
[docs] def generic_visit(self, node):
print(" "*4, type(node).__name__,':', ast.dump(node))
ast.NodeVisitor.generic_visit(self, node)
[docs]def fcode_one_large_expr(expr, prepend=None, **settings):
"""Fortran friendly printing of sympy expression ignoring any loops/indexed
The normal FcodePrinter.doprint method will try to figure out what fortran
loops you need by looking for indexed expressions. Sometimes you just want
to change square brackets [] to parenteses () and wrap/indent the code for
Fortran with appropriate line continuations. `fcode_one_large_expr`
can do that for you. You will have to write your own fortran do loops.
Parameters
----------
expr : sympy expression
A single large sympy expression.
prepend : string, optional
A string to prepend to your fortran code. Assuming you are
going to cut and paste into a fortran routine `prepend` should be
correct fortran format. (note you do not need an initial indent
for your prepend it will be put in for you). e.g.
prepend = 'a(i, i) = a(i, i) + '. Defualt prepend=None i.e. nothing
prepended.
settings : keyword arguments
See `sympy.printing.fcode` docs, specifically FCodePrinter.__init__ .
Note that 'assign_to will not work'.
Returns
-------
out : str
Fortran ready code that can be copy and pasted into a Fortran routine.
See Also
--------
sympy.printing.fcode : contains all the functionality.
"""
# FCodePrinter.indent_code uses ''.join to combine lines. Should it be
# '\n'.join ? This is my work around:
class FCodePrinter2(FCodePrinter):
@property
def _lead(self):
if self._settings['source_format'] == 'fixed':
# return {'code': " ", 'cont': " @ ", 'comment': "C "} orig
return {'code': " ", 'cont': "&\n ", 'comment': "C "}
elif self._settings['source_format'] == 'free':
return {'code': "", 'cont': " ", 'comment': "! "}
printer = FCodePrinter2(settings)
printer._lead_cont = '&\n ' # this should account for eariler version of sympy
expr = printer.parenthesize(expr, 50)
if not prepend is None:
expr = prepend + expr
return printer.indent_code(expr)
[docs]def copy_attributes_between_objects(from_object,
to_object,
attributes=[],
defaults=dict(),
not_found_value=None):
"""Copy attributes from one object to another
Looks up `attributes` in `from_obj` and copies them to `to_object`. If
not present in `from_object` the corresponding attribute in `to_object`
will be set to `defaults` or `not_found_value` if not in `defaults`.
Parameters
----------
from_object : object or module
Object to copy attributes from.
to_object : object or module
Object to copy attributes to.
attributes : list of strings
A list of attribute names to copy from `from_object` to `object_to`.
defaults : dict of string-value pairs
Dictionary specifying the default value of each attribute that is not
found in `from_object`.
not_found_value : optional
Default value to set attribute to if not found in `from_object` or in
`defaults`. Default not_found_value=None.
Returns
-------
None
"""
for v in attributes:
to_object.__setattr__(v, from_object.__dict__.get(v, defaults.get(v, not_found_value)))
return
[docs]def copy_attributes_from_text_to_object(reader, *args, **kwargs):
"""Wrapper for `copy_attributes_between_objects` where `from_object`
is fileobject, StringIO, text
Parameters
----------
reader : fileobject, StringIO, text
text to turn into a module and pass as from_object
*args :
Positional arguments that will be passed to
`copy_attributes_between_objects`.
**kwargs :
Keyword arguments that will be passed to
`copy_attributes_between_objects`. You can use a SyntaxChecker
object to interpret the `reader` by using the keyword argument
'syntax_checker'=<some SyntaxChecker object>.
See Also
--------
copy_attributes_between_objects : See for args and kwargs input.
SyntaxChecker : Restrict the allowable syntax in the `reader`.
Notes
-----
Put 'syntax_checker= ' in the kwargs to add a SyntaxChecker
"""
syn_checker=kwargs.pop('syntax_checker', None)
copy_attributes_between_objects(make_module_from_text(reader,
syn_checker), *args, **kwargs)
return
[docs]def check_attribute_is_list(obj, attributes=[], force_list=False):
"""Check if attributes of an object are lists
If attribute is not a list and `force_list`=True then attribute will be
placed in list. e.g. say obj.a = 6, 6 is not a list so obj.a will be
changed to obj.a = [6]. An error is raised if attribute is not a list.
Parameters
----------
obj : object
Object with attributes to check.
attributes : list of strings
A list of attribute names to check.
force_list : bool, optional
If True not-list attributes will be put in a list. If False then
non-list attributes will raise an error. Default force_list=False.
Returns
-------
None
"""
g = obj.__getattribute__
for v in attributes:
if not g(v) is None:
if not isinstance(g(v), list):
if force_list:
obj.__setattr__(v, [g(v)])
else:
raise ValueError('{0} should be a list. It is {1}'.format(v, type(g(v))))
return
[docs]def check_attribute_PolyLines_have_same_x_limits(obj, attributes=[]):
"""Check if attributes of object that are PolyLine have the same x values
Each attribute can be a single instance of PolyLine or a list of PolyLines.
An error is raised if x-values are different.
Parameters
----------
obj : object
Object with attributes to check.
attributes : list of list of strings
Each sublist is a list of attribute names to check.
See Also
--------
geotecha.piecewise.piecewise_linear_1d.PolyLine : PolyLine class
"""
g = obj.__getattribute__
for sublist in attributes:
#find first x values
for v in sublist:
if not g(v) is None:
if isinstance(g(v), list):
xcheck = np.array([g(v)[0].x[0], g(v)[0].x[-1]])
xstr = v
break
else:
a = g(v).x[0]
xcheck = np.array([g(v).x[0], g(v).x[-1]])
xstr = v
break
#check against other x values
for v in sublist:
if not g(v) is None:
if isinstance(g(v), list):
for j, u in enumerate(g(v)):
if not np.allclose([u.x[0], u.x[-1]], xcheck):
raise ValueError('All upper and lower x limits must be the same. Check ' + v + ' and ' + xstr + '.')
else:
if not np.allclose([g(v).x[0], g(v).x[-1]], xcheck):
raise ValueError('All upper and lower x limits must be the same. Check ' + v + ' and ' + xstr + '.')
return
[docs]def check_attribute_pairs_have_equal_length(obj, attributes=[]):
"""Check if attribute pairs have the same length
Compares pairs only if both pairs are not None. Raises error if pair items
have unequal lenghth.
Parameters
----------
obj : object
Object with attributes to check.
attributes : list of list of two string
List of attribute names to check. Each sub-list should have two
elements.
"""
g = obj.__getattribute__
# for iterating in chuncks see http://stackoverflow.com/a/434328/2530083
#for v1, v2 in [attributes[pos:pos + 2] for pos in xrange(0, len(attributes), 2)]:
for pair in attributes:
if len(pair)>2:
raise ValueError('Can only compare two items. you have {}'.format(str(pair)))
v1, v2 = pair
if sum([v is None for v in [g(v1), g(v2)]])==0:
if sum([hasattr(v,'__len__') for v in [g(v1), g(v2)]])!=2:
raise TypeError("either {0} and {1} have no len attribute and so cannot be compared".format(v1,v2))
if len(g(v1)) != len(g(v2)):
raise ValueError("{} has {:d} elements, {} has {:d} "
"elements. They should have the same number of "
"elements.".format(v1, len(g(v1)), v2, len(g(v2))))
# elif sum([v is None for v in [g(v1), g(v2)]])==1:
# raise ValueError("Cannot compare length of {0} and {1} when one of them is None".format(v1,v2))
return
[docs]def check_attribute_combinations(obj, zero_or_all=[], at_least_one=[], one_implies_others=[]):
"""Check for incomplete combinations of attributes
Raises ValueError if any combination fails.
Parameters
----------
zero_or_all : list of list of string
Each element of `zero_or_all` is a list of attribute names that should
either all be None or all be not None.
at_least_one : list of list of string
Each element of `at_least_one` is a list of attribute names of which at
least one should not be None.
one_implies_others : list of list of string
Each element of `one_implies_others` is a list of attribute names. If
the first attribute in the list is not None then all other members of
the list should also not be None.
Returns
-------
None
"""
g = obj.__getattribute__
for check in zero_or_all:
if sum([g(v) is None for v in check]) in range(1, len(check)):
raise ValueError('Either zero or all of the following variables must be defined: ' + ', '.join(check))
for check in at_least_one:
if not any([not g(v) is None for v in check]):
if len(check)==1:
raise ValueError('Need the following variable: ' + ', '.join(check))
else:
raise ValueError('Need at least one of the following variables: ' + ', '.join(check))
for i, check in enumerate(one_implies_others):
if len(check)<=1:
raise ValueError("each member of 'one_implies_others' must be a list with at least two elements. Member {0} is {1}".format(i,', '.join(check)))
if not g(check[0]) is None:
if not all([not g(v) is None for v in check[1:]]):
raise ValueError('If {0} is defined then the following variables must also be defined: '.format(check[0]) + ', '.join(check[1:]))
return
[docs]def force_attribute_same_len_if_none(obj, same_len_attributes=[], value=None):
"""Make list of None with len the same as an existing attribute
If attributes after the first are None then make those attributes a list
of len(first_attribute) filled with value. This is useful when you want
to zip each member of two attributes. If the second attribute is None,
then any zipping is impossible. However, if the second attribute
is set to a list of None then zipping is possible.
Parameters
----------
obj : object
Object that has the attributes.
same_len_attributes : list of list of string
For each group of attribute names, if attributes after the first are
None then those attributes will be made a list of len(first_attribute)
filled with `value`.
value : obj, optional
Value to fill each list. Default value=None.
"""
for group in same_len_attributes:
a1 =getattr(obj, group[0])
if a1!=None:
for v in group[1:]:
if getattr(obj, v) is None:
setattr(obj, v, [value] * len(a1))
[docs]def initialize_objects_attributes(obj,
attributes=[],
defaults=dict(),
not_found_value=None):
"""Initialize an objects attributes
For each attribute, set it to the value found in `defaults` dictionary
or, if the attribute is not found, set it to `not_found_value`.
Parameters
----------
obj : object
Object to set attributes in.
attributes : list of strings
A list of attribute names to set.
defaults : dict of string-value pairs, optional
Dictionary specifying the default value of each attribute.
Default defautls=dict() i.e. no explicit default value.
not_found_value : optional
Default value to set attribute to if not found in `defaults` dict.
Default not_found_value=None.
Notes
-----
If using this function to initialize attributes in a class then just
be aware that the attributes will not be available for auto-complete
until an instance of the class is created. This can be annoying when
coding the class itself because typing 'self.' will not have any
autocomplete options. To work around this use a temporary explicit
assignment e.g. 'self.a = 6' and then later comment it out when coding of
the class is finsihed.
See Also
--------
code_for_explicit_attribute_initialization : Use for temporary explicit
attribute initialization to facilitate auto-complete, then comment
out when done.
"""
for v in attributes:
obj.__setattr__(v, defaults.get(v, not_found_value))
return
[docs]def code_for_explicit_attribute_initialization(attributes=[],
defaults={},
defaults_name='_attribute_defaults',
object_name='self',
not_found_value=None):
"""Generate code to initialize an objects attributes
Parameters
----------
object_name : string , optional
Name of object to set attributes. default='self'.
attributes : list of strings
A list of attribute names to set.
defaults : dict of string-value pairs
Dictionary specifying the default value of each attribute.
defaults_name : string, optional
If ``None`` then the actual values in `defaults` will be used
to set attributes. If not ``None`` then those attributes with
defaults will be initialized by pointing to a dictionary called
`defaults_name`. Default defaults_name='_attribute_defaults',
not_found_value : optional
Default value to set attribute to if not found in `defaults`.
default not_found_value=``None``
Returns
-------
out : string
Code that can be pasted into an object (usually in the __init__
methos) to explicitly initialize attributes, so the attributes appear
in autocomplete.
See Also
--------
initialize_objects_attributes : Similar functionality with no copy paste.
"""
out = ""
if defaults_name is None:
for v in attributes:
v2 = defaults.get(v, not_found_value)
if isinstance(v2, str):
v2 = "'{0}'".format(v2)
out+='{0} = {1}\n'.format('.'.join([object_name,v]), v2)
else:
for v in attributes:
if v in list(defaults.keys()):
v2 = not_found_value
if isinstance(v2, str):
v2 = "'{0}'".format(v2)
out+='{0} = {1}.get({2}, {3})\n'.format('.'.join([object_name, v]),
'.'.join([object_name, defaults_name]),
"'{}'".format(v),
v2)
else:
v2 = not_found_value
if isinstance(v2, str):
v2 = "'{0}'".format(v2)
out+='{0} = {1}\n'.format('.'.join([object_name,v]), v2)
return out
[docs]def string_of_object_attributes(obj, attributes=[], none_at_bottom=True,
numpy_array_prefix = "np."):
"""Put repr of objects attributes into one string
The repr of each attribute will be put in a line within the output string.
If the attribute is not found then None will be given.
Parameters
----------
obj : object
Object containing attributes.
attributes : list of string, optional
List of attribute names.
none_at_bottom : True/False
If True then any attribute that is None will be placed at the end
of the output string.
numpy_array_prefix : string, optional
repr of numpy arrays by default do not print with anything in front of
'array'. with `numpy_array_prefix` wou can add a 'np.' etc. default=
'np.' Use `None` for no prefix.
Returns
-------
out : str
String
"""
def f(v):
if isinstance(v, str):
return "'"
else:
return ""
if not numpy_array_prefix is None:
a = PrefixNumpyArrayString(numpy_array_prefix)
else:
a = PrefixNumpyArrayString('')
a.turn_on()
writer = ""
if none_at_bottom:
writer+='\n'.join(['{0} = {1}{2}{1}'.format(v, f(getattr(obj, v, None)),
getattr(obj, v, None))
for v in attributes if not getattr(obj, v, None) is None])
writer+='\n\n\n'
writer+='\n'.join(['{0} = {1}{2}{1}'.format(v, f(getattr(obj, v, None)),
getattr(obj, v, None))
for v in attributes if getattr(obj, v, None) is None])
else:
writer+='\n'.join(['{0} = {1}{2}{1}'.format(v, f(getattr(obj, v, None)),
getattr(obj, v, None))
for v in attributes])
writer+='\n'
a.turn_off()
return writer
[docs]class PrefixNumpyArrayString(object):
"""When printing a numpy array string prefix 'array'
If you use `repr` to print a numpy array it will print as 'array([...])'.
If you have imported numpy with 'import numpy.array as array' or some such
import then you can paste the printed output straight back into Python.
However, I use 'import numpy as np' so I have to manually put in the 'np.'
before I can use the printed output in my code. Using
PrefixNumpyArrayString can put in the 'np.' automatically when using the
__str__ of the numpy array. i.e. __repr__ remains unchanged but __str__
will have the prefix
Attributes
----------
prefix : string, optional
string to prefix with. Default prefix='np.'.
Examples
--------
>>> print(np.arange(3))
[0 1 2]
>>> a=PrefixNumpyArrayString('numpy.')
>>> a.turn_on()
>>> print(np.arange(3))
numpy.array([0, 1, 2])
>>> a.turn_off()
>>> print(np.arange(3))
[0 1 2]
"""
def __init__(self, prefix = 'np.'):
self.prefix = prefix
def _pprint(self, arr):
"""Function to pass to np.set_string_function"""
return '{}{}'.format(self.prefix, repr(arr))
[docs] def turn_on(self):
"""Turn_on printing numpy array with prefix"""
np.set_string_function(self._pprint, False)
[docs] def turn_off(self):
"""Turn_on printing numpy array with prefix"""
np.set_string_function(None, False)
[docs]def next_output_stem(prefix, path=None, start=1, inc=1, zfill=3,
overwrite=False):
"""Find next unique prefix-number in sequence of numbered files/folders
Looks in folder of `path` for files/folders with prefix-number combos
of the form 'prefixdddxyz.ext'. If found
then the ddd is incremented by `inc` and the new 'prefixddd' is returned.
E.g. if file 'prefix004_exact.out' exists you will get 'prefix_005' as
the next output stem.
Parameters
----------
prefix : string
Start of file/folder name to search for.
path : string, optional
Folder to seach in. Default path=None i.e. search in current working
directory.
start : int, optional
If no existing files/folders match the numbering will start at `start`.
Default start=1.
inc : int, optional
If matching files/folders are found then the number will increment
by inc. Default inc=1.
zfill : int, optional
Fill number with zeros to the left. eg. if `zfill` is 3 and the number
is 8, then the number will be output as '008'.
overwrite : True/False
When True the prefix-number combo will not be incremented and the
highest exisiting prefix-number combo will be returned.
Returns
-------
stem : string
Next output stem.
"""
if path is None:
cwd = os.curdir
else:
if not os.path.isdir(path):
raise ValueError('folder does not exist: ' + path)
cwd = path
pattern = re.compile(prefix + "(?P<x>\d+).*")
nums = []
for nm in os.listdir(cwd):
match = pattern.match(nm)
if match:
nums.append(int(match.group('x')))
if len(nums)>0:
num = max(nums)
if not overwrite:
num += inc
else:
num = start
return prefix+""+str(num).zfill(zfill)
[docs]def make_array_into_dataframe(data, column_labels=None, row_labels=None,
row_labels_label='item'):
"""Make an array into a pandas dataframe
The data frame will have no index. Use df.set_index().
Parameters
----------
data : 2d array
Data to be put int
column_labels : list or 1d array, optional
Column lables for data. Default column_labels=None which will give
column numbers 0,1,2, etc.
row_labels : list or 1d array, optional
Default row_labels=None which will use row numbers, 0,1,2,etc.
row_labels_label : string, optional
Column label for the row_labels column. Default
row_labels_label='item'.
Returns
-------
df : pandas dataframe
dDtaframe of data with column and row labels added
"""
if isinstance(data, pd.DataFrame):
return data
if column_labels is None:
df = pd.DataFrame(data)
else:
df = pd.DataFrame(data=data, columns=column_labels)
if not row_labels is None:
s = pd.Series(row_labels)
df.insert(loc=0, column=row_labels_label, value = row_labels)
return df
[docs]def save_grid_data_to_file(data_dicts, directory=None, file_stem='out_000',
create_directory=True, ext='.csv'):
"""Save grid data to files using a common filename stem
Saves grid data in `directory`. Each data filename will begin with
`file_stem`. Grid data is basically 2d data with rows and columns.
Parameters
----------
data_dicts : list of dict or single dict
Each data_dict contains info for outputing a piece of data to a file.
Although the data_dicts parameter is listed last, each data_dict
should appear at the start of the argument list when calling the
function.
================== ============================================
data_dict key description
================== ============================================
name String to be added to end of stem to create
file_name.
data array of data to save to file. If data
is a pandas dataframe then row_labels and
column labels will be ignored (i.e. it will
be assumed that the dataframe already has
row labels and column labels).
row_labels List of row labels for data. default = None
which will give no row labels.
row_labels_label If there are row_labels then the column
label for the row_label column.
Default = None which will give 'item'.
column_labels Column lables for data.
Default = None which will give nothing.
header String to appear before data.
Default = None.
df_kwargs dict of kwargs to pass to
pandas.DataFrame.to_csv(). Default ={}.
================== ============================================
directory : string, optional
Path of directory to save data in. Default directory=None which will
save to the current working directory.
file_stem : string, optional
beginning of file name, Default file_stem='out_000'. Filename will be
made from `file_stem` + [`data_dict`['name']] + `ext`.
ext : string, optional
File extension. Default ext=".csv".
create_directory : True/False, optional
If True, all data files will be placed in a directory named file_stem.
So you might get files
path_to_directory\\out_000\\out_000_cow.csv etc.
If False, all data files will be placed in `directory`. So you would
get files path_to_directory\\out_000_cow.csv etc. Default
create_directory=True.
"""
if not isinstance(data_dicts, list):
data_dicts=[data_dicts]
if len(data_dicts)==0:
return
if not all([isinstance(v, dict) for v in data_dicts]):
raise ValueError('all elements of data_dicts must be a dict.')
if directory is None:
directory = os.path.curdir
if not os.path.isdir(directory):
os.mkdir(directory)
if create_directory:
dirname = os.path.join(directory, file_stem)
if not os.path.isdir(dirname):
os.mkdir(dirname)
else:
dirname = directory
for d in data_dicts:
data = d.pop('data', None)
name = d.pop('name',"")
row_labels = d.pop('row_labels', None)
row_labels_label = d.pop('row_labels_label', 'item')
column_labels = d.pop('column_labels', None)
header = d.pop('header', None)
df_kwargs = d.pop('df_kwargs',{})
if data is None:
continue
df = make_array_into_dataframe(data=data,
column_labels=column_labels, row_labels=row_labels,
row_labels_label=row_labels_label)
df_kwargs['index_label'] = "idex" # ensure all columns have labels incl. index
filename = os.path.join(dirname, str(file_stem) + str(name) + ext )
with open(filename,'w') as myfile:
if not header is None:
myfile.write(header+'\n')
df.to_csv(myfile, **df_kwargs)
[docs]def hms_string(sec_elapsed):
"""Convert seconds to human readable h:min:seconds
Parameters
----------
sec_elapsed : float
Elapsed seconds
References
----------
Copied verbatim from http://arcpy.wordpress.com/2012/04/20/146/
"""
h = int(sec_elapsed / (60 * 60))
m = int((sec_elapsed % (60 * 60)) / 60)
s = sec_elapsed % 60.
return "{}:{:>02}:{:>05.2f}".format(h, m, s)
[docs]@contextmanager
def working_directory(path):
"""Change working directory for duration of with statement
Parameters
----------
path : str
Path to new current working directory
Notes
-----
e.g. with working_directory(\data\stuff):
References
----------
Taken straight from http://stackoverflow.com/a/3012921/2530083
"""
current_dir = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(current_dir)
[docs]def get_filepaths(wildcard=""):
""" dialog box to select files
Opens a dialog box from which the user can select files
Parameters
----------
wildcard : string, optional
Only show file types of this kind. Default wildcard="" which means
no file type filtering.
e.g. "BMP files (*.bmp)|*.bmp|GIF files (*.gif)|*.gif"
Returns
-------
filepaths: list of paths
List of user selected file paths. Will return None if the cancel
button was selected.
"""
try:
try:
from PyQt5 import QtGui
except ImportError:
from PyQt4 import QtGui
# pyqt idea from https://www.daniweb.com/software-development/python/code/452624/get-multiple-file-names-pyqtpyside
app = QtGui.QApplication([])
caption = 'Select file(s)'
# use current/working directory
directory = './'
filter_mask = wildcard
filepaths = QtGui.QFileDialog.getOpenFileNames(None,
caption, directory, filter_mask)
if len(filepaths)==0:
filepaths=None
except ImportError:
import wx
# wx idea from http://stackoverflow.com/a/9319832/2530083
app = wx.App(None)
style = wx.FD_OPEN | wx.FD_FILE_MUST_EXIST | wx.FD_MULTIPLE
dialog = wx.FileDialog(None, 'Open', wildcard=wildcard, style=style)
if dialog.ShowModal() == wx.ID_OK:
filepaths = dialog.GetPaths()
else:
filepaths = None
dialog.Destroy()
return filepaths
[docs]def modules_in_package(package_name, exclude=['test']):
"""List of modules in a package
Parameters
----------
package_name : string
Name of package to import and get modules from.
exclude : list of string, optional
Module names to exclude. Default exclude=['test']
Returns
-------
module_list : lsit of string
List of modules in package_name ommiting any names in the
exclude list.
"""
pkg = importlib.import_module(package_name)
pkgpath = os.path.dirname(pkg.__file__)
return [name for _, name, _ in pkgutil.iter_modules([pkgpath])
if name not in exclude]
[docs]class SimpleTimer(object):
"""Simple timer to display start, end, and elapsed wall clock time of
code execution.
Messages displayed when start and finish methods called.
Attributes
----------
start_times : dict
Start times corresponding to each timing level.
messages : dict
Message/title corresponding to each timeing level.
end_times : dict
end times corresponding to each timing level.
"""
def __init__(self):
self.start_times = dict()
self.messages = dict()
self.end_times = dict()
[docs] def start(self, i, msg=None):
"""Print message saying timing level i has started.
Parameters
---------
i : int
index of timed process. Numerical value of i will detemine
indent of printed message. If i==0 then row of astericks will be
printed before message.
msg : string, optional
title of timing process. printed message will be "Started " + msg.
Default msg=None which will make "Started Level i" appear.
"""
self.start_times[i] = time.time()
if msg is None:
self.messages[i] = "Level {}".format(i)
else:
self.messages[i] = msg
if i==0:
print("*"*40)
print(" "*i + "Started " + self.messages[i])
[docs] def finish(self, i):
"""Print message saying timing level i has finished along with the
elapsed time.
Parameters
---------
i : int
index of timed process. Numerical value of i will detemine
indent of printed message. If i==0 then row of astericks will be
printed after finish message.
"""
self.end_times[i] = time.time()
elapsed_time = (self.end_times[i] - self.start_times[i])
print(" "*i + "Finished {}, run time = {}".format(self.messages[i],
str(timedelta(seconds=elapsed_time))))
if i==0:
print("*"*40)
if __name__ == '__main__':
import nose
nose.runmodule(argv=['nose', '--verbosity=3', '--with-doctest'])
# nose.runmodule(argv=['nose', '--verbosity=3'])