# -*- coding: utf-8 # Copyright 2004-2006 by Vahur Rebas """This is the main module for Korpus.""" import Globals from Globals import Acquisition, Persistent from AccessControl import ClassSecurityInfo from OFS.Folder import Folder from Acquisition import aq_base, aq_inner, aq_parent, Explicit from Products.PageTemplates.PageTemplateFile import PageTemplateFile from OFS.OrderSupport import OrderSupport from zope.interface import implements from OFS.interfaces import IFolder from zExceptions import BadRequest from permissions import * from utils import _checkPermission from interfaces import IKorpus PG_CONN_ID = 'evkk_pg_conn' class Korpus(Folder, Persistent, Explicit, OrderSupport): """ Main class for Korpus """ meta_type = 'Korpus' security = ClassSecurityInfo() security.declareObjectPublic() implements(IKorpus) manage_options = Folder.manage_options security.declarePrivate('manage_afterAdd') def manage_afterAdd(self, item, container): self.setupFolders() self.setupCookieCrumbler() self.setupZCatalog() self.setupPermissions() self.setupErrors() self.setupWWW() self.setupSite() security.declareProtected(perm_manage, 'setupSite') def setupSite(self): """ setup localsite """ from zope.component import queryMultiAdapter from Products.Five.component import enableSite from zope.interface import Interface enableSite(self) components_view = queryMultiAdapter((self, self.REQUEST), Interface, 'components.html') components_view.makeSite() from interfaces import IErrors, IDocuments, IWWW, ICorpusManager sm = self.getSiteManager() sm.registerUtility(self, IKorpus) sm.registerUtility(getattr(self, 'Errors'), IErrors) sm.registerUtility(getattr(self, 'Documents'), IDocuments) sm.registerUtility(getattr(self, 'wwwdata'), IWWW) sm.registerUtility(getattr(self, 'corpora'), ICorpusManager) security.declareProtected(perm_manage, 'setupPermissions') def setupPermissions(self): """ setup permissions """ for role in korpus_roles: self._addRole(role) self.manage_permission(perm_view, ('Manager', 'Anonymous',), 1) self.manage_permission(perm_manage, ('Manager',), 0) self.manage_permission(perm_edit_page, ('Manager', 'Admin'), 0) self.manage_permission(perm_edit_document, ('Manager', 'Owner', 'Inserter', 'Marker', 'Admin'), 0) self.manage_permission(perm_edit_tree, ('Manager', 'Owner', 'Admin'), 0) self.manage_permission(perm_mark_document, ('Manager', 'Admin', 'Marker', 'Owner'), 0) self.manage_permission(perm_change_informant, ('Manager', 'Owner', 'Admin', 'Inserter', 'Marker'), 0) self.manage_permission(perm_view_document, ('Manager', 'Anonymous'), 0) self.manage_permission(perm_change_status, ('Manager', 'Owner', 'Marker', 'Admin'), 0) self.manage_permission(perm_list_documents, ('Manager', 'Owner', 'Inserter', 'Marker', 'Admin'), 0) self.manage_permission(perm_del_documents, ('Manager', 'Owner', 'Admin'), 0) self.manage_permission(perm_add_documents, ('Manager', 'Owner', 'Inserter', 'Marker', 'Admin'), 0) security.declareProtected(perm_manage, 'setupErrors') def setupErrors(self): """ setup errors container """ from Errors import Errors e = Errors() self._setObject(e.id, e) security.declareProtected(perm_manage, 'setupCookieCrumbler') def setupCookieCrumbler(self): """ setup cookiecrumbler """ from Products.PluggableAuthService.PluggableAuthService import addPluggableAuthService from Products.PluggableAuthService.Extensions.upgrade import _replaceUserFolder, replace_acl_users if hasattr(self, 'acl_users'): _replaceUserFolder(self) try: addPluggableAuthService(self) except BadRequest: pass # cookie auth from Products.PluggableAuthService.plugins.CookieAuthHelper import addCookieAuthHelper try: addCookieAuthHelper(self.acl_users, 'cookie_auth', '', '__ac') except BadRequest: pass self.acl_users.cookie_auth.manage_activateInterfaces(['IExtractionPlugin', 'IChallengePlugin', 'ICredentialsUpdatePlugin', 'ICredentialsResetPlugin']) security.declareProtected(perm_manage, 'setupFolders') def setupFolders(self): """ setup folder """ from Documents import Documents from Members import Members from MarksContainer import MarksContainer from Search import Search from WordTree import WordTree from Corpus import CorpusManager try: self._setObject('Documents', Documents()) except BadRequest: pass try: self._setObject('Members', Members()) except BadRequest: pass try: self._setObject('Marks', MarksContainer()) except BadRequest: pass try: self._setObject('Search', Search()) except BadRequest: pass try: self._setObject(WordTree.id, WordTree()) except BadRequest: pass try: self._setObject(CorpusManager.id, CorpusManager()) except BadRequest: pass security.declareProtected(perm_manage, 'setupWWW') def setupWWW(self): """ setup WWW data folder """ from WWW import WWW self._setObject('wwwdata', WWW()) security.declareProtected(perm_manage, 'setupZCatalog') def setupZCatalog(self): """ setup ZCatalog """ from Products.ZCatalog.ZCatalog import ZCatalog from Products.ZCatalog.Catalog import CatalogError from Products.ZCTextIndex import ZCTextIndex from Products.PluginIndexes.TextIndex.Vocabulary import Vocabulary class args: def __init__(self, **kw): self.__dict__.update(kw) cat = None if not hasattr(self, 'zcatalog'): cat = ZCatalog('zcatalog', 'Catalog for documents') else: cat = getattr(self, 'zcatalog') extra = {'default_encoding': 'utf-8', 'use_normalizer': 1, 'dedicated_storage': 1, 'splitter_casefolding': 1, 'index_unknown_languages': 1} try: cat.addIndex('getDocument', 'TextIndexNG3', extra=extra) except CatalogError: pass try: cat.addIndex('getMarkedWords', 'TextIndexNG3', extra=extra) except CatalogError: pass try: cat.addIndex('rawDate', 'DateIndex') except CatalogError: pass try: cat.addIndex('getUsedCodes', 'KeywordIndex') except CatalogError: pass try: cat.addIndex('is_deleted', 'FieldIndex') # these are considered yer except CatalogError: pass try: cat.addIndex('document_status', 'FieldIndex') # ... except CatalogError: pass # added for sorting currently try: cat.addIndex('get_n_of_words', 'FieldIndex') except CatalogError: pass try: cat.addIndex('get_n_of_errors', 'FieldIndex') except CatalogError: pass try: cat.addIndex('get_n_of_diff_errors', 'FieldIndex') except CatalogError: pass try: cat.addIndex('get_n_of_sentences', 'FieldIndex') except CatalogError: pass try: cat.addIndex('getTitle', 'TextIndexNG3', extra=extra) except CatalogError: pass # just in case try: cat.addIndex('getId', 'FieldIndex') except CatalogError: pass try: cat.addColumn('absolute_url') except CatalogError: pass try: cat.addColumn('get_n_of_errors') except CatalogError: pass try: cat.addColumn('get_n_of_diff_errors') except CatalogError: pass try: cat.addColumn('get_n_of_words') except CatalogError: pass try: cat.addColumn('get_n_of_sentences') except CatalogError: pass try: cat.addColumn('prettyDate') except CatalogError: pass try: cat.addColumn('document_status') # ... except CatalogError: pass try: cat.addColumn('rawDate') except CatalogError: pass try: cat.addColumn('getMarkedWords') except CatalogError: pass try: cat.addColumn('getTitle') except CatalogError: pass try: cat.addColumn('getDescription') except CatalogError: pass try: cat.addColumn('textdoc') except CatalogError: pass try: cat.addColumn('getId') except CatalogError: pass try: cat.addColumn('getSubmitter') except CatalogError: pass try: cat.addColumn('getLastModder') except CatalogError: pass if not hasattr(self, 'zcatalog'): self._setObject('zcatalog', cat) self.updateZCatalogsIndexes() #security.declareProtected(perm_view, 'checkRoles') security.declarePublic('checkRoles') def checkRoles(self, context, user, role='Authenticated'): """ check if user has some roles """ if role in user.getRolesInContext(context): return 1 return 0 def _setupPAS(self): """ Setup Pluggable Auth Service """ from Products.PluggableAuthService.PluggableAuthService import addPluggableAuthService # PAS try: addPluggableAuthService(self) except BadRequest: pass #cookie auth from Products.PluggableAuthService.plugins.CookieAuthHelper import addCookieAuthHelper try: addCookieAuthHelper(self.acl_users, 'cookie', '', '__ac') except BadRequest: pass self.acl_users.cookie.manage_activateInterfaces(['IExtractionPlugin', 'IChallengePlugin', 'ICredentialsUpdatePlugin', 'ICredentialsResetPlugin']) self.acl_users.cookie.login_path = '../../' # acl_users/ZODBUser source from Products.PluggableAuthService.plugins.ZODBUserManager import addZODBUserManager try: addZODBUserManager(self.acl_users, 'users', '') except BadRequest: pass self.acl_users.users.manage_activateInterfaces(['IAuthenticationPlugin', 'IUserEnumerationPlugin', 'IUserAdderPlugin']) # roles from Products.PluggableAuthService.plugins.ZODBRoleManager import addZODBRoleManager try: addZODBRoleManager(self.acl_users, 'roles', '') except BadRequest: pass self.acl_users.roles.manage_activateInterfaces(['IRolesPlugin', 'IRoleEnumerationPlugin', 'IRoleAssignerPlugin']) def checkPermission(self, permission): """ check if the current user has the permission on the object """ return _checkPermission(permission, self) security.declareProtected(perm_view, 'korpus_root') def korpus_root(self): return self # statistics related methods security.declareProtected(perm_view, 'get_n_of_total_words') def get_n_of_total_words(self): """ total words in corpus """ n = 0 for x in self.Documents.getDocuments({'stat':1}): n += x.get_n_of_words for x in self.Documents.getDocuments({'stat':0}): n += x.get_n_of_words return n security.declareProtected(perm_view, 'get_n_of_words_marked') def get_n_of_words_marked(self): """ total words marked """ n = 0 for x in self.Documents.getDocuments({'stat':1}): n += x.get_n_of_words return n security.declareProtected(perm_view, 'get_n_of_total_errors') def get_n_of_total_errors(self): """ return a number of errors marked """ n = 0 for x in self.Documents.getDocuments({'stat':1}): n += x.get_n_of_errors return n security.declareProtected(perm_view, 'compute_stats') def compute_stats(self, REQUEST): """ compute stats """ from schemas import all scs = all() variables = {} for s in scs: for f in s.fields: variables[f.getName()] = f.getKeys() n = {} for x in variables.keys(): for y in variables[x]: n[x+'_'+y+'_n'] = 0.0 n[x+'_'+y+'_word'] = 0 query = {'document_status':1, 'is_deleted':0} if REQUEST.get('base', None): base = REQUEST.get('base').split('_') key = base[0] value = base[1] if len(base)>2: key = '' for x in range(len(base)-1): if key: key += '_' key += base[x] value = base[-1] query[key] = value if REQUEST.get('vea_liik', None): query['getUsedCodes'] = REQUEST.get('vea_liik') docs = self.zcatalog(query) div = len(docs) if not div: return {} for x in docs: for y in variables.keys(): if not getattr(x, y): #print "oioi. ", y, " tyhi" continue try: n[y+'_'+getattr(x, y)+'_n'] += 1 n[y+'_'+getattr(x, y)+'_word'] += x.get_n_of_words except KeyError: print y, getattr(x, y) n[y+'_'+getattr(x, y)+'_n'] = 1 n[y+'_'+getattr(x, y)+'_word'] = x.get_n_of_words for x in variables.keys(): for y in variables[x]: n[x+'_'+y+'_perc'] = round(n[x+'_'+y+'_n']/div*100, 1) return n security.declareProtected(perm_list_documents, 'logged_in') def logged_in(self, REQUEST): """ user just logged in """ url = REQUEST.get('came_from', '') self.acl_users.cookie.login() return REQUEST.RESPONSE.redirect(self.korpus_root().absolute_url()) def getLanguage(self, REQUEST): """ sets/gets language """ if REQUEST.get('language', None): REQUEST.SESSION.set('language', REQUEST.get('language', None)) return REQUEST.SESSION.get('language', 'et') security.declareProtected(perm_manage, 'doMigration') def doMigration(self, REQUEST): """ do migration """ import migration k = dir(migration) m_names = [ m for m in k if m.startswith('step') ] m_names.sort() runonly = REQUEST.get('step', None) for mname in m_names: print "* -", mname if runonly is not None and mname != 'step'+runonly: print "... skipped" continue meth = getattr(migration, mname) r = meth(self) if not r: return 'failed:'+mname return "ok" Globals.InitializeClass(Korpus) def manage_addKorpus(self, REQUEST=None, id='korpus'): """ form handler """ korpus = Korpus(id) self._setObject(id,korpus) if REQUEST: return self.manage_main(self, REQUEST) manage_addKorpusForm = PageTemplateFile('addKorpusForm.pt', globals())