Changeset 2691


Ignore:
Timestamp:
01/16/12 14:59:45 (4 months ago)
Author:
jussi
Message:

very much unfinished business moving to memory based sqlite for workers

Location:
simo/branches/memory_worker
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • simo/branches/memory_worker/dev/cfg/buildout.cfg

    r2676 r2691  
    111111[geos] 
    112112recipe = hexagonit.recipe.cmmi 
    113 url = http://download.osgeo.org/geos/geos-3.3.1.tar.bz2 
     113url = http://download.osgeo.org/geos/geos-3.3.2.tar.bz2 
    114114 
    115115[proj4] 
  • simo/branches/memory_worker/src/runner2.py

    r2515 r2691  
    8787        db_type, host, port, user, pw, name, schema = self.db_info 
    8888        if db_type.upper() == 'POSTGRESQL': 
    89             db = runner.PostgreSQL(db_name='config', db_host=host,  
    90                                    db_port=int(port), user=user, pw=pw,  
    91                                    wipe=False, spatial=False,  
     89            db = runner.PostgreSQL(db_name='config', db_host=host, 
     90                                   db_port=int(port), user=user, pw=pw, 
     91                                   wipe=False, spatial=False, 
    9292                                   db_database=name, db_schema=schema) 
    9393        elif db_type.upper() == 'ORACLE': 
    94             db = runner.Oracle(db_name='config', db_host=host, user=user,  
     94            db = runner.Oracle(db_name='config', db_host=host, user=user, 
    9595                               pw=pw, wipe=False, spatial=False) 
    9696        else: 
     
    131131        self.cfg = cfg 
    132132        if db_type.upper() == 'POSTGRESQL': 
    133             db = runner.PostgreSQL(db_name='config', db_host=host,  
    134                                    db_port=int(port), user=user, pw=pw,  
    135                                    wipe=True, spatial=False,  
     133            db = runner.PostgreSQL(db_name='config', db_host=host, 
     134                                   db_port=int(port), user=user, pw=pw, 
     135                                   wipe=True, spatial=False, 
    136136                                   db_database=name, db_schema=schema) 
    137137        elif db_type.upper() == 'ORACLE': 
    138             db = runner.Oracle(db_name='config', db_host=host, user=user,  
     138            db = runner.Oracle(db_name='config', db_host=host, user=user, 
    139139                               pw=pw, wipe=True, spatial=False) 
    140140        else: 
     
    199199        """ 
    200200        max_errors = self.cfg['error_handling.max_error_count_total'] 
     201        import_date = self.cfg['import.import_date'] 
    201202        basedir = os.getcwd() 
    202203        sim_dir = self.cfg['program_execution.base_folder'] 
     
    208209                      % datamapping_def 
    209210                raise Exception(msg) 
    210             data_imp = DataImporter(self.input_db, dm, True, self.logger, 
    211                                     'data import', self.lx, max_errors) 
     211            data_imp = DataImporter(self.input_db, dm, import_date, 
     212                                    self.logger, 'data import', self.lx, 
     213                                    max_errors) 
    212214            return data_imp 
    213215        finally: 
    214216            os.chdir(basedir) 
     217 
     218    mappingdef = self.cfg['import.mapping_definition'] 
     219    format = self.cfg['import.format'] 
     220    folder = self.cfg['import.data_dir'] 
     221    datafiles = self.cfg['import.data_file'] 
     222    separator = self.cfg['import.data_delimiter'] 
     223    skipfirst = self.cfg['import.skip_first_line'] 
     224    forced_ops = self.cfg['import.operations.operation_import'] 
     225    if forced_ops: 
     226        # Import forced operations into input database 
     227        oformat = self.cfg['import.operations.operation_format'] 
     228        ofile = self.cfg['import.operations.operation_file'] 
     229        omap = self.cfg['import.operations.operation_modelchains'] 
     230        oconv = self.cfg['import.operations.operation_conversion'] 
     231    infomsg = 'Initializing SIMO importer!' 
     232    self.logger.log_message(log_name, 'info', infomsg) 
     233    simlevel = self.sim_level_name 
     234    mapping = self.struct_db.get_obj(sc.TEXT2DATA_DEF, mappingdef) 
     235    if mapping is None: 
     236        errormsg = "Data mapping definition '%s' couldn't be loaded" \ 
     237                   % mappingdef 
     238        self.logger.log_message(log_name, 'error', errormsg) 
     239        return False 
     240    imp = DataImporter(self.input_db, mapping, import_date, self.logger, 
     241                       log_name, self.lx, max_errors) 
     242 
     243    initial_date = self.cfg['import.data_date'] 
     244    month_first = self.cfg.get('import.month_first') 
     245    if month_first is None: month_first = False 
    215246 
    216247    def _load_op_importer(self, text2operation=None, 
     
    339370            pass 
    340371 
    341 def do_import(runner_obj): 
     372def do_import(runner_obj, ds, op_ds): 
    342373    """ 
    343     Run import on the database _if_ there is nothing imported yet. 
     374    Import data and forced operations into input dbs from data source 
     375 
     376    runner_obj -- executor of import 
     377    ds -- simulation data in string buffer 
     378    op_ds -- operaton data in string buffer 
    344379    """ 
    345380    basedir = os.getcwd() 
     
    360395    finally: 
    361396        os.chdir(basedir) 
     397 
     398    dataset = {'format': 'by_level', 
     399               'buffers': [buff], 
     400               'sim_level': 'simulation', 
     401               'level_ind': [level_ind], 
     402               'data_date': data_date, 
     403               'clear_old': True 
     404              } 
     405    opdata = {'format': 'text', 
     406              'opbuffer': buff, 
     407              'separator': None 
     408             } 
     409 
     410    buffers = [] 
     411    for file in datafiles: 
     412        fpath = os.path.join(folder, file) 
     413        buffers.append(open(fpath)) 
     414    imp.import_data(format, buffers, separator=separator, 
     415                    skip_first=skipfirst, sim_level=simlevel, 
     416                    data_date=initial_date, month_first=month_first) 
     417    cont = not imp.terminate_import 
     418    if not imp.terminate_import and forced_ops: 
     419        conv_def, op2mc = None, None 
     420        if oformat in ('text',): 
     421            conv_def = self.struct_db.get_obj(sc.TEXT2OPERATION_DEF, oconv) 
     422            if conv_def is None and oformat: 
     423                errormsg = "Operation mapping definition '%s' couldn't "\ 
     424                           "be loaded"  % oconv 
     425                self.logger.log_message(log_name, 'error', errormsg) 
     426                return False 
     427        elif oformat in ('xml', 'db'): 
     428            op2mc = self.struct_db.get_obj(sc.OPERATION2MODELCHAINS_DEF, 
     429                                           omap) 
     430        imp = OperationImporter(self.input_db, self.fops_db, oformat, 
     431                                conv_def, op2mc, self.logger, log_name, 
     432                                max_errors) 
     433        if oformat != 'db': 
     434            op_buffer = open(ofile) 
     435        else: 
     436            op_buffer = SQLiteDB(ofile, wipe=False, commit=False) 
     437        imp.import_operations(oformat, op_buffer, separator) 
     438        cont = not imp.terminate_import 
     439    return cont 
    362440 
    363441def get_runner(initial_setup, inifile, db_info, do_wipe=False, 
Note: See TracChangeset for help on using the changeset viewer.