Blame view

op_robot_tests/tests_files/service_keywords.py 16.9 KB
1
# -*- coding: utf-8 -
2
from .local_time import get_now, TZ
3
from copy import deepcopy
selurvedu authored
4
from datetime import timedelta
Taras Kozlovskyi authored
5
from dateutil.parser import parse
6
from dpath.util import new as xpathnew
7
from haversine import haversine
8 9
from iso8601 import parse_date
from json import load
Taras Kozlovskyi authored
10
from jsonpath_rw import parse as parse_path
11
from munch import fromYAML, Munch, munchify
Leits authored
12
from robot.errors import ExecutionFailed
13 14 15
from robot.libraries.BuiltIn import BuiltIn
from robot.output import LOGGER
from robot.output.loggerhelper import Message
16 17 18
# These imports are not pointless. Robot's resource and testsuite files
# can access them by simply importing library "service_keywords".
# Please ignore the warning given by Flake8 or other linter.
19
from .initial_data import (
mykhaly authored
20
    create_fake_doc,
21
    create_fake_sentence,
22
    fake,
mykhaly authored
23
    test_bid_data,
24
    test_bid_value,
mykhaly authored
25
    test_claim_answer_data,
26
    test_claim_data,
mykhaly authored
27 28
    test_complaint_data,
    test_complaint_reply_data,
29
    test_confirm_data,
30
    test_feature_data,
mykhaly authored
31 32 33 34
    test_invalid_features_data,
    test_item_data,
    test_lot_data,
    test_lot_document_data,
35
    test_related_question,
mykhaly authored
36 37 38 39 40
    test_question_answer_data,
    test_question_data,
    test_supplier_data,
    test_tender_data,
    test_tender_data_limited,
Leits authored
41
    test_tender_data_openeu,
42
    test_tender_data_openua,
43
)
Leits authored
44
from barbecue import chef
45 46 47
from restkit import request
# End of non-pointless imports
import os
48
import re
49
Leits authored
50 51
NUM_TYPES = (int, long, float)
52
53 54
def get_current_tzdate():
    return get_now().strftime('%Y-%m-%d %H:%M:%S.%f')
55
56
Leits authored
57
def add_minutes_to_date(date, minutes):
mykhaly authored
58
    return (parse(date) + timedelta(minutes=float(minutes))).isoformat()
Leits authored
59 60

61 62 63 64
def get_file_contents(path):
    with open(path, 'r') as f:
        return unicode(f.read()) or u''
65
Leits authored
66
def compare_date(left, right, accuracy="minute", absolute_delta=True):
67 68 69 70 71
    '''Compares dates with specified accuracy

    Before comparison dates are parsed into datetime.datetime format
    and localized.
Leits authored
72 73
    :param left:            First date
    :param right:           Second date
74 75 76 77 78
    :param accuracy:        Max difference between dates to consider them equal
                            Default value   - "minute"
                            Possible values - "day", "hour", "minute" or float value
                            of seconds
    :param absolute_delta:  Type of comparison. If set to True, then no matter which date order. If set to
Leits authored
79
                            False then right must be lower then left for accuracy value.
80 81 82 83 84 85 86 87 88
                            Default value   - True
                            Possible values - True and False or something what can be casted into them
    :returns:               Boolean value

    :error:                 ValueError when there is problem with converting accuracy
                            into float value. When it will be catched warning will be
                            given and accuracy will be set to 60.

    '''
Leits authored
89 90
    left = parse(left)
    right = parse(right)
91
Leits authored
92 93 94 95
    if left.tzinfo is None:
        left = TZ.localize(left)
    if right.tzinfo is None:
        right = TZ.localize(right)
96
Leits authored
97
    delta = (left - right).total_seconds()
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

    if accuracy == "day":
        accuracy = 24 * 60 * 60 - 1
    elif accuracy == "hour":
        accuracy = 60 * 60 - 1
    elif accuracy == "minute":
        accuracy = 60 - 1
    else:
        try:
            accuracy = float(accuracy)
        except ValueError:
            LOGGER.log_message(Message("Could not convert from {} to float. Accuracy is set to 60 seconds.".format(accuracy), "WARN"))
            accuracy = 60
    if absolute_delta:
        delta = abs(delta)
    if delta > accuracy:
114
        return False
115
    return True
116
117
Leits authored
118
def compare_coordinates(left_lat, left_lon, right_lat, right_lon, accuracy=0.1):
Leits authored
119
    '''Compares coordinates with specified accuracy
Leits authored
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145

    :param left_lat:        First coordinate latitude
    :param left_lon:        First coordinate longitude
    :param right_lat:       Second coordinate latitude
    :param right_lon:       Second coordinate longitude
    :param accuracy:        Max difference between coordinates to consider them equal
                            Default value   - 0.1
                            Possible values - float or integer value of kilometers

    :returns:               Boolean value

    :error:                 ValueError when there is problem with converting accuracy
                            into float value. When it will be catched warning will be
                            given and accuracy will be set to 0.1.
    '''
    for key, value in {'left_lat': left_lat, 'left_lon': left_lon, 'right_lat': right_lat, 'right_lon': right_lon}.iteritems():
        if not isinstance(value, NUM_TYPES):
            raise TypeError("Invalid type for coordinate '{0}'. "
                            "Expected one of {1}, got {2}".format(
                                key, str(NUM_TYPES), str(type(value))))
    distance = haversine((left_lat, left_lon), (right_lat, right_lon))
    if distance > accuracy:
        return False
    return True

Leits authored
146
def log_object_data(data, file_name=None, format="yaml", update=False):
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
    """Log object data in pretty format (JSON or YAML)

    Two output formats are supported: "yaml" and "json".

    If a file name is specified, the output is written into that file.

    If you would like to get similar output everywhere,
    use the following snippet somewhere in your code
    before actually using Munch. For instance,
    put it into your __init__.py, or, if you use zc.buildout,
    specify it in "initialization" setting of zc.recipe.egg.

    from munch import Munch
    Munch.__str__ = lambda self: Munch.toYAML(self, allow_unicode=True,
                                              default_flow_style=False)
    Munch.__repr__ = Munch.__str__
    """
164 165 166 167
    if not isinstance(data, Munch):
        data = munchify(data)
    if file_name:
        output_dir = BuiltIn().get_variable_value("${OUTPUT_DIR}")
Leits authored
168 169
        file_path = os.path.join(output_dir, file_name + '.' + format)
        if update:
170 171 172 173 174 175 176 177 178
            try:
                with open(file_path, "r+") as file_obj:
                    new_data = data.copy()
                    data = munch_from_object(file_obj.read(), format)
                    data.update(new_data)
                    file_obj.seek(0)
                    file_obj.truncate()
            except IOError as e:
                LOGGER.log_message(Message(e, "INFO"))
selurvedu authored
179
                LOGGER.log_message(Message("Nothing to update, "
180
                                           "creating new file.", "INFO"))
Leits authored
181 182 183 184 185 186
        data_obj = munch_to_object(data, format)
        with open(file_path, "w") as file_obj:
            file_obj.write(data_obj)
    data_obj = munch_to_object(data, format)
    LOGGER.log_message(Message(data_obj.decode('utf-8'), "INFO"))
selurvedu authored
187
Leits authored
188 189
def munch_from_object(data, format="yaml"):
    if format.lower() == 'json':
Leits authored
190
        return Munch.fromJSON(data)
Leits authored
191
    else:
Leits authored
192
        return Munch.fromYAML(data)
Leits authored
193
selurvedu authored
194
Leits authored
195 196 197 198 199
def munch_to_object(data, format="yaml"):
    if format.lower() == 'json':
        return data.toJSON(indent=2)
    else:
        return data.toYAML(allow_unicode=True, default_flow_style=False)
200
201
202
def load_data_from(file_name, mode=None):
203
    if not os.path.exists(file_name):
204
        file_name = os.path.join(os.path.dirname(__file__), 'data', file_name)
205 206
    with open(file_name) as file_obj:
        if file_name.endswith(".json"):
207
            file_data = Munch.fromDict(load(file_obj))
208
        elif file_name.endswith(".yaml"):
209 210 211 212 213 214 215 216 217
            file_data = fromYAML(file_obj)
    if mode == "brokers":
        default = file_data.pop('Default')
        brokers = {}
        for k, v in file_data.iteritems():
            brokers[k] = merge_dicts(default, v)
        return brokers
    else:
        return file_data
218 219

220 221 222 223 224 225 226 227 228 229 230 231 232 233
def compute_intrs(brokers_data, used_brokers):
    """Compute optimal values for period intervals.

    Notice: This function is maximally effective if ``brokers_data``
    does not contain ``Default`` entry.
    Using `load_data_from` with ``mode='brokers'`` is recommended.
    """
    def recur(l, r):
        l, r = deepcopy(l), deepcopy(r)
        if isinstance(l, list) and isinstance(r, list) and len(l) == len(r):
            lst = []
            for ll, rr in zip(l, r):
                lst.append(recur(ll, rr))
            return lst
Leits authored
234
        elif isinstance(l, NUM_TYPES) and isinstance(r, NUM_TYPES):
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
            if l == r:
                return l
            if l > r:
                return l
            if l < r:
                return r
        elif isinstance(l, dict) and isinstance(r, dict):
            for k, v in r.iteritems():
                if k not in l.keys():
                    l[k] = v
                else:
                    l[k] = recur(l[k], v)
            return l
        else:
            raise TypeError("Couldn't recur({0}, {1})".format(
                str(type(l)), str(type(r))))

    intrs = []
    for i in used_brokers:
        intrs.append(brokers_data[i]['intervals'])
    result = intrs.pop(0)
    for i in intrs:
        result = recur(result, i)
    return result

261
def prepare_test_tender_data(procedure_intervals, tender_parameters):
262
    # Get actual intervals by mode name
263
    mode = tender_parameters['mode']
264 265 266 267 268
    if mode in procedure_intervals:
        intervals = procedure_intervals[mode]
    else:
        intervals = procedure_intervals['default']
    LOGGER.log_message(Message(intervals))
269
    tender_parameters['intervals'] = intervals
270 271 272 273 274 275 276 277 278 279 280

    # Set acceleration value for certain modes
    if mode in ['openua', 'openeu']:
        assert isinstance(intervals['accelerator'], int), \
            "Accelerator should be an 'int', " \
            "not '{}'".format(type(intervals['accelerator']).__name__)
        assert intervals['accelerator'] >= 0, \
            "Accelerator should not be less than 0"
    else:
        assert 'accelerator' not in intervals.keys(), \
               "Accelerator is not available for mode '{0}'".format(mode)
281
282
    if mode == 'negotiation':
283
        return munchify({'data': test_tender_data_limited(tender_parameters)})
284
    elif mode == 'negotiation.quick':
285
        return munchify({'data': test_tender_data_limited(tender_parameters)})
Leits authored
286
    elif mode == 'openeu':
287
        return munchify({'data': test_tender_data_openeu(tender_parameters)})
288
    elif mode == 'openua':
289
        return munchify({'data': test_tender_data_openua(tender_parameters)})
290
    elif mode == 'reporting':
291
        return munchify({'data': test_tender_data_limited(tender_parameters)})
292
    elif mode == 'belowThreshold':
293
        return munchify({'data': test_tender_data(tender_parameters)})
294
    raise ValueError("Invalid mode for prepare_test_tender_data")
295 296

297 298 299 300 301
def run_keyword_and_ignore_keyword_definitions(name, *args, **kwargs):
    """This keyword is pretty similar to `Run Keyword And Ignore Error`,
    which, unfortunately, does not suppress the error when you try
    to use it to run a keyword which is not defined.
    As a result, the execution of its parent keyword / test case is aborted.
302
303
    How this works:
304
305 306 307
    This is a simple wrapper for `Run Keyword And Ignore Error`.
    It handles the error mentioned above and additionally provides
    a meaningful error message.
308 309
    """
    try:
310
        status, _ = BuiltIn().run_keyword_and_ignore_error(name, *args, **kwargs)
Leits authored
311 312
    except ExecutionFailed as e:
        status, _ = "FAIL", e.message
313 314 315 316 317 318 319 320 321 322
    return status, _


def set_access_key(tender, access_token):
    tender.access = munchify({"token": access_token})
    return tender


def get_from_object(obj, attribute):
    """Gets data from a dictionary using a dotted accessor-string"""
Taras Kozlovskyi authored
323
    jsonpath_expr = parse_path(attribute)
324 325 326
    return_list = [i.value for i in jsonpath_expr.find(obj)]
    if return_list:
        return return_list[0]
327 328
    else:
        raise AttributeError('Attribute not found: {0}'.format(attribute))
Taras Kozlovskyi authored
329 330

Leits authored
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
def set_to_object(obj, attribute, value):
    # Search the list index in path to value
    list_index = re.search('\d+', attribute)
    if list_index:
        list_index = list_index.group(0)
        parent, child = attribute.split('[' + list_index + '].')[:2]
        # Split attribute to path to lits (parent) and path to value in list element (child)
        try:
            # Get list from parent
            listing = get_from_object(obj, parent)
            # Create object with list_index if he don`t exist
            if len(listing) < int(list_index) + 1:
                listing.append({})
        except AttributeError:
            # Create list if he don`t exist
            listing = [{}]
        # Update list in parent
        xpathnew(obj, parent, listing, separator='.')
        # Set value in obj
        xpathnew(obj, '.'.join([parent, list_index,  child]), value, separator='.')
    else:
        xpathnew(obj, attribute, value, separator='.')
    return munchify(obj)

Taras Kozlovskyi authored
356 357 358
def wait_to_date(date_stamp):
    date = parse(date_stamp)
    LOGGER.log_message(Message("date: {}".format(date.isoformat()), "INFO"))
359
    now = get_now()
Taras Kozlovskyi authored
360 361 362 363 364
    LOGGER.log_message(Message("now: {}".format(now.isoformat()), "INFO"))
    wait_seconds = (date - now).total_seconds()
    wait_seconds += 2
    if wait_seconds < 0:
        return 0
365
    return wait_seconds
366
biviktorqc authored
367
368 369 370 371 372 373 374 375 376 377 378 379 380 381
def merge_dicts(a, b):
    """Merge dicts recursively.

    Origin: https://www.xormedia.com/recursively-merge-dictionaries-in-python/
    """
    if not isinstance(b, dict):
        return b
    result = deepcopy(a)
    for k, v in b.iteritems():
        if k in result and isinstance(result[k], dict):
                result[k] = merge_dicts(result[k], v)
        else:
            result[k] = deepcopy(v)
    return munchify(result)
382 383

384
def create_data_dict(path_to_value=None, value=None):
Leits authored
385
    data_dict = munchify({'data': {}})
386 387 388 389 390 391 392 393 394 395 396
    if isinstance(path_to_value, basestring) and value:
        list_items = re.search('\d+', path_to_value)
        if list_items:
            list_items = list_items.group(0)
            path_to_value = path_to_value.split('[' + list_items + ']')
            path_to_value.insert(1, '.' + list_items)
            set_to_object(data_dict, path_to_value[0], [])
            set_to_object(data_dict, ''.join(path_to_value[:2]), {})
            set_to_object(data_dict, ''.join(path_to_value), value)
        else:
            data_dict = set_to_object(data_dict, path_to_value, value)
Leits authored
397 398 399
    return data_dict

400 401 402 403 404 405 406 407
def munch_dict(arg=None, data=False):
    if arg is None:
        arg = {}
    if data:
        arg['data'] = {}
    return munchify(arg)

Leits authored
408
def get_id_from_object(obj):
409
    obj_id = re.match(r'(^[filq]-[0-9a-fA-F]{8}): ', obj.get('title', ''))
Leits authored
410
    if not obj_id:
411
        obj_id = re.match(r'(^[filq]-[0-9a-fA-F]{8}): ', obj.get('description', ''))
Leits authored
412
    return obj_id.group(1)
Leits authored
413 414 415 416 417 418 419 420


def get_object_type_by_id(object_id):
    prefixes = {'q': 'questions', 'f': 'features', 'i': 'items', 'l': 'lots'}
    return prefixes.get(object_id[0])


def get_object_index_by_id(data, object_id):
421 422
    if not data:
        return 0
Leits authored
423
    for index, element in enumerate(data):
Leits authored
424
        element_id = get_id_from_object(element)
Leits authored
425 426 427
        if element_id == object_id:
            break
    else:
428
        index += 1
Leits authored
429 430
    return index
431
432 433 434 435 436 437 438 439 440 441 442 443 444 445
def get_complaint_index_by_complaintID(data, complaintID):
    for index, element in enumerate(data):
        if element['complaintID'] == complaintID:
            return index
    raise IndexError


def get_document_index_by_id(data, document_id):
    for index, element in enumerate(data):
        if element['title'] == document_id:
            return index
    raise IndexError

446 447
def generate_test_bid_data(tender_data):
    bid = test_bid_data()
448
    if 'aboveThreshold' in tender_data['procurementMethodType']:
449 450
        bid.data.selfEligible = True
        bid.data.selfQualified = True
451
    if 'lots' in tender_data:
452
        bid.data.lotValues = []
453
        for lot in tender_data['lots']:
454
            value = test_bid_value(lot['value']['amount'])
455
            value['relatedLot'] = lot.get('id', '')
456 457
            bid.data.lotValues.append(value)
    else:
458 459
        bid.data.update(test_bid_value(tender_data['value']['amount']))
    if 'features' in tender_data:
460
        bid.data.parameters = []
461
        for feature in tender_data['features']:
462
            parameter = {"value": fake.random_element(elements=(0.05, 0.01, 0)), "code": feature.get('code', '')}
463 464 465
            bid.data.parameters.append(parameter)
    return bid
466
467
# GUI Frontends common
468
def add_data_for_gui_frontends(tender_data):
selurvedu authored
469
    now = get_now()
470 471 472 473 474
    # tender_data.data.enquiryPeriod['startDate'] = (now + timedelta(minutes=2)).isoformat()
    tender_data.data.enquiryPeriod['endDate'] = (now + timedelta(minutes=6)).isoformat()
    tender_data.data.tenderPeriod['startDate'] = (now + timedelta(minutes=7)).isoformat()
    tender_data.data.tenderPeriod['endDate'] = (now + timedelta(minutes=11)).isoformat()
    return tender_data
475
476 477 478 479 480 481 482 483 484 485 486 487 488

def convert_date_to_slash_format(isodate):
    iso_dt = parse_date(isodate)
    date_string = iso_dt.strftime("%d/%m/%Y")
    return date_string


def convert_datetime_to_dot_format(isodate):
    iso_dt = parse_date(isodate)
    day_string = iso_dt.strftime("%d.%m.%Y %H:%M")
    return day_string

489
def local_path_to_file(file_name):
490
    return os.path.join(os.path.dirname(__file__), 'documents', file_name)