source: publico/il.spdo/trunk/il/spdo/seguranca.py @ 5942

Última Alteração nesse arquivo desde 5942 foi 5942, incluída por fabianosantos, 7 anos atrás

Exigir ROLE_GESTOR ou ROLE_ADMIN para definir a área de lotação de uma pessoa

File size: 18.3 KB
Linha 
1# -*- coding: utf-8 -*-
2
3import bcrypt
4
5from zope.interface import Interface
6from zope.component import getUtility
7from zope.app.component.hooks import getSite
8from zope.globalrequest import getRequest
9from five import grok
10from AccessControl import Unauthorized
11from Products.CMFCore.utils import getToolByName
12
13from il.spdo.config import MessageFactory as _
14from il.spdo.config import Session
15from il.spdo import db
16from il.spdo.interfaces import ISPDOAPI, ISecurityChecker
17from il.spdo.log import logger
18
19ROLE_USUARIO = 'Usuario SPDO'
20ROLE_OPERADOR = 'Operador SPDO'
21ROLE_GESTOR = 'Gestor SPDO'
22ROLE_ADMIN = 'Manager'
23
24TRAMITANDO = ('protocolo_tramita_area', 'protocolo_situacao_final', 'protocolo_apensado')
25
26CONFIG = {
27    'acessar_add_anexo': ('privilegio_admin', TRAMITANDO),
28    'acessar_add_apenso': (('protocolo_tramita_area', 'protocolo_situacao_final'),),
29    'acessar_add_notificacao': ('privilegio_usuario',),
30    'acessar_add_observacao': ('privilegio_admin', TRAMITANDO),
31    'acessar_add_pessoadestino': (TRAMITANDO,),
32    'acessar_add_pessoaorigem': (TRAMITANDO,),
33    'acessar_add_pessoa': ('privilegio_admin', 'privilegio_operador'),
34    'acessar_add_protocolo': ('privilegio_operador',),
35    'acessar_add_referencia': (TRAMITANDO,),
36    'acessar_add_tramite': (TRAMITANDO,),
37    'acessar_edit_observacao': ('privilegio_admin', TRAMITANDO + ('privilegio_criador_observacao',)),
38    'acessar_edit_pessoadestino': (TRAMITANDO,),
39    'acessar_edit_pessoa': ('privilegio_admin', 'privilegio_operador'),
40    'acessar_edit_protocolo': (('protocolo_tramita_area', 'protocolo_apensado'),),
41    'acessar_envio_tramite': ('privilegio_operador',),
42    'acessar_list_notificacao': ('privilegio_usuario',),
43    'acessar_list_pessoa': ('privilegio_admin', 'privilegio_operador'),
44    'acessar_list_protocolo': ('privilegio_operador',),
45    'acessar_list_referencia': ('privilegio_admin', 'privilegio_usuario'),
46    'acessar_print_etiquetas': ('privilegio_operador',),
47    'acessar_recebimento_tramite_barra': ('privilegio_operador',),
48    'acessar_recebimento_tramite': ('privilegio_operador',),
49    'acessar_recuperacao_tramite': ('privilegio_operador',),
50    'acessar_remove_anexo': ('privilegio_admin', TRAMITANDO + ('privilegio_criador_anexo',)),   
51    'acessar_remove_notificacao': ('privilegio_usuario',),
52    'acessar_remove_observacao': ('privilegio_admin', TRAMITANDO + ('privilegio_criador_observacao',)),
53    'acessar_remove_pessoadestino': (TRAMITANDO,),
54    'acessar_remove_pessoaorigem': (TRAMITANDO,),
55    'acessar_remove_referencia': (TRAMITANDO,),
56    'acessar_search_protocolo_barra': ('privilegio_admin', 'privilegio_usuario'),
57    'acessar_search_protocolo': ('privilegio_admin', 'privilegio_operador'),
58    'acessar_show_pessoa': ('privilegio_admin', 'privilegio_operador'),
59    'acessar_show_protocolo': ('privilegio_admin', 'privilegio_usuario'),
60    'acessar_ws_add_protocolo': ('privilegio_operador',),
61    'ciclo_apenso': ('protocolo_apenso_ciclo',),
62    'fluxo_rigoroso': ('protocolo_fluxo_area_inicial',),
63    'lotacao_pessoa': ('privilegio_admin', ('privilegio_operador', 'privilegio_gestor')),
64    'momento_tramitacao': ('protocolo_apenso_momento',),
65    'tramitar_envio': (TRAMITANDO,),
66    'tramitar_recebimento': (('protocolo_enviado', 'protocolo_situacao_final', 'protocolo_apensado'),),
67    'tramitar_recuperacao': (('protocolo_nao_recebido', 'protocolo_situacao_final', 'protocolo_apensado'),),   
68    'visualizar_anexos': ('privilegio_admin', 'protocolo_tramita_area', ('privilegio_operador', 'privilegio_gestor')),
69    'visualizar_despachos': ('privilegio_admin', 'protocolo_tramita_area', ('privilegio_operador', 'privilegio_gestor')),
70    'visualizar_observacoes': ('privilegio_admin', 'protocolo_tramita_area', ('privilegio_operador', 'privilegio_gestor')),
71    }
72
73class SecurityCheckerUtility(grok.GlobalUtility):
74    """Verificador de Segurança.
75    """
76
77    grok.provides(ISecurityChecker)
78
79    def _run_verificacoes(self, acao, verificacoes, **kwargs):
80        """Esse método executa as verificações, de acordo com as
81        definições da estrutura de dados CONFIG, considerando que as
82        componentes da tupla representam um OR das diversas
83        possibilidades de verificação. Quando a componente for uma
84        tupla, a validação deve ser calculada como um AND das
85        componentes.
86        """
87
88        # o widget de autocomplete faz chamadas ajax no contexto da
89        # view e esses requests não devem ser bloqueados
90        if kwargs.get('ajax', None) is not None and acao.startswith('acessar_'):
91            r = getRequest()
92            url = r.URL
93            view = acao.replace('acessar_', '/@@').replace('_', '-')
94            if not url.endswith(view):
95                logger(_(u'Permitindo acesso direto a URL: ') + unicode(url, 'utf-8'))
96                return True
97
98        context = self
99        for v in verificacoes:
100            if type(v) == type(()):
101                result = True
102                for vi in v:
103                    func = getattr(context, "_valida_%s" % vi, None)
104                    if func is None:
105                        result = False
106                        break
107                    ret = func(**kwargs)
108                    if not ret:
109                        result = False
110                        break
111                if result:
112                    return True
113            else:
114                func = getattr(context, "_valida_%s" % v, None)
115                if func is not None:
116                    ret = func(**kwargs)
117                    if ret:
118                        return True
119        return False
120
121    def check(self, acao, **kwargs):
122        verificacoes = CONFIG.get(acao, None)
123        if verificacoes is None:
124            logger(_(u'Ação informada não existe na configuração do sistema. Ação: ') + unicode(acao, 'utf-8'))
125            raise Unauthorized
126        return self._run_verificacoes(acao, verificacoes, **kwargs)
127
128    def enforce(self, acao, **kwargs):
129        if not self.check(acao, **kwargs):
130            logger(kwargs.get('msg', _(u'Privilégios Insuficientes. Ação: ') + unicode(acao, 'utf-8')))
131            raise Unauthorized
132
133    # Métodos Auxiliares #
134
135    def _valida_role(self, role):
136        pm = getToolByName(getSite(), 'portal_membership')
137        return role in pm.getAuthenticatedMember().getRoles()
138
139    def _valida_permissao(self, permissao):
140        pu = getToolByName(getSite(), 'portal_url')
141        pm = getToolByName(getSite(), 'portal_membership')
142        portal = pu.getPortalObject()
143        return pm.checkPermission(permissao, portal)
144
145    def _get_area_usuario(self):
146        pm = getToolByName(getSite(), 'portal_membership')
147        user_id = str(pm.getAuthenticatedMember())
148        api = getUtility(ISPDOAPI)
149        pessoa = api.getPessoaByEmail(user_id)
150        if pessoa is not None:
151            return pessoa.area_id
152
153    def _valida_area_responsavel(self, area_id):
154        session = Session()
155        return bool(session.query(db.Responsavel).\
156                    filter_by(area_id=area_id).first())
157
158    # Verificações #
159
160    def _valida_privilegio_usuario(self, **kwargs):
161        """Estar autenticado com role ROLE_USUARIO e possuir um
162        registro pessoa que case com o e-mail de login.
163        """
164        pm = getToolByName(getSite(), 'portal_membership')
165        user_id = str(pm.getAuthenticatedMember())
166        api = getUtility(ISPDOAPI)
167        return self._valida_role(ROLE_USUARIO) and api.getPessoaByEmail(user_id) is not None
168
169    def _valida_privilegio_operador(self, **kwargs):
170        """Estar autenticado com role ROLE_OPERADOR e possuir um
171        registro pessoa lotado em uma área do organograma.
172        """
173        if not self._valida_role(ROLE_OPERADOR):
174            return False
175        area_id = self._get_area_usuario()
176        if area_id is None:
177            return False
178        return self._valida_area_responsavel(area_id)
179
180    def _valida_privilegio_gestor(self, **kwargs):
181        """Estar autenticado com role ROLE_GESTOR e possuir um
182        registro pessoa lotado em uma área do organograma.
183        """
184        if not self._valida_role(ROLE_GESTOR):
185            return False
186        area_id = self._get_area_usuario()
187        if area_id is None:
188            return False
189        return self._valida_area_responsavel(area_id)
190
191    def _valida_privilegio_admin(self, **kwargs):
192        """Estar autenticado com ROLE_ADMIN e ter a permissão “Manage
193        portal” no contexto da raiz do site.
194        """
195        return self._valida_role(ROLE_ADMIN) and self._valida_permissao('Manage portal')
196
197    def _valida_privilegio_criador_anexo(self, **kwargs):
198        """O anexo deve ter sido criado pelo usuário que quer
199        apagá-lo. Não deve existir um tramite com data de
200        disponibilização posterior a data do anexo.
201        """
202        anexo_id = kwargs.get('anexo_id', None)
203        protocolo_id = kwargs.get('protocolo_id', None)
204        if anexo_id is None or protocolo_id is None:
205            logger(_(u'O método _valida_privilegio_criador_anexo não recebeu os parâmetros anexo_id ou protocolo_id.'))
206            return False
207        session = Session()
208        api = getUtility(ISPDOAPI)
209        usuario = api.getAuthId()
210        anexo = session.query(db.Anexo).\
211                filter_by(id=anexo_id).\
212                filter_by(protocolo_id=protocolo_id).\
213                filter_by(usuario=usuario).first()
214        if anexo is None:
215            return False
216        tramite = session.query(db.Tramite).\
217                  filter_by(protocolo_id=anexo.protocolo_id).\
218                  filter(db.Tramite.data_disponibilizacao > anexo.data_anexo).first()
219        return not bool(tramite)
220
221    def _valida_privilegio_criador_observacao(self, **kwargs):
222        """A observação deve ter sido criada pelo usuário que quer
223        modificá-la ou apagá-la. Não deve existir um tramite com data
224        de disponibilização posterior a data da observação.
225        """
226        observacao_id = kwargs.get('observacao_id', None)
227        protocolo_id = kwargs.get('protocolo_id', None)
228        if observacao_id is None or protocolo_id is None:
229            logger(_(u'O método _valida_privilegio_criador_observacao não recebeu os parâmetros observacao_id ou protocolo_id.'))
230            return False
231        session = Session()
232        api = getUtility(ISPDOAPI)
233        usuario = api.getAuthId()
234        observacao = session.query(db.Observacao).\
235                     filter_by(id=observacao_id).\
236                     filter_by(protocolo_id=protocolo_id).\
237                     filter_by(usuario=usuario).first()
238        if observacao is None:
239            return False
240        tramite = session.query(db.Tramite).\
241                  filter_by(protocolo_id=observacao.protocolo_id).\
242                  filter(db.Tramite.data_disponibilizacao > observacao.data_observacao).first()
243        return not bool(tramite)
244
245    def _valida_protocolo_apensado(self, **kwargs):
246        """Protocolo não estar apensado em outro protocolo.
247        """
248        api = getUtility(ISPDOAPI)
249        protocolo_id = kwargs.get('protocolo_id', api.getProtocoloId())
250        if protocolo_id is None:
251            logger(_(u'O método _valida_protocolo_apensado não recebeu o parâmetro protocolo_id.'))
252            return False
253        protocolo = api.getProtocolo(protocolo_id)
254        if protocolo is None:
255            return False
256        return protocolo.apenso_id is None
257
258    def _procura_ciclo(self, protocolo, apenso_id, ids_visitados):
259        ret = False
260        for p in protocolo.apenso:
261            # ids_visitados evita uma recursão infinita
262            if p.id in ids_visitados:
263                continue
264            ids_visitados.append(p.id)
265            if p.id == apenso_id or self._procura_ciclo(p, apenso_id, ids_visitados):
266                ret = True
267                break
268        return ret
269
270    def _valida_protocolo_apenso_ciclo(self, **kwargs):
271        """Não podem existir ciclos nas definições de apensos.
272        """
273        protocolo_id = kwargs.get('protocolo_id', None)
274        apenso_id = kwargs.get('apenso_id', None)
275        if protocolo_id is None or apenso_id is None:
276            logger(_(u'O método _valida_protocolo_apenso_ciclo não recebeu os parâmetros protocolo_id ou apenso_id.'))
277            return False
278        api = getUtility(ISPDOAPI)
279        protocolo = api.getProtocolo(protocolo_id)
280        if protocolo is None:
281            return False
282        ids_visitados = []
283        return not self._procura_ciclo(protocolo, apenso_id, ids_visitados)
284
285    def _compara_protocolos(self, p1, p2):
286        if p1.tipoprotocolo != p2.tipoprotocolo or p1.tipodocumento_id != p2.tipodocumento_id:
287            return False
288        inbox1 = [i.area_id for i in p1.tramite_inbox]; inbox1.sort()
289        inbox2 = [i.area_id for i in p2.tramite_inbox]; inbox2.sort()
290        if inbox1 != inbox2: return False
291        outbox1 = [i.area_id for i in p1.tramite_outbox]; outbox1.sort()
292        outbox2 = [i.area_id for i in p2.tramite_outbox]; outbox2.sort()
293        if outbox1 != outbox2: return False
294        return True
295
296    def _valida_protocolo_apenso_momento(self, **kwargs):
297        """Para que um protocolo seja apensado em outro é necessário
298        que ambos compartilhem o mesmo momento na tramitação, ou seja,
299        estejam tramitando nas mesmas áreas e tenham o mesmo tipo de
300        documento e protocolo.
301        """
302        protocolo_id = kwargs.get('protocolo_id', None)
303        apenso_id = kwargs.get('apenso_id', None)
304        if protocolo_id is None or apenso_id is None:
305            logger(_(u'O método _valida_protocolo_apenso_momento não recebeu os parâmetros protocolo_id ou apenso_id.'))
306            return False
307        api = getUtility(ISPDOAPI)
308        protocolo = api.getProtocolo(protocolo_id)
309        if protocolo is None:
310            return False
311        apenso = api.getProtocolo(apenso_id)
312        if apenso is None:
313            return False
314        return self._compara_protocolos(protocolo, apenso)
315
316    def _valida_protocolo_enviado(self, **kwargs):
317        """Protocolo enviado para área de lotação do usuário mas ainda
318        não recebido.
319        """
320        api = getUtility(ISPDOAPI)
321        protocolo_id = kwargs.get('protocolo_id', api.getProtocoloId())
322        if protocolo_id is None:
323            logger(_(u'O método _valida_protocolo_enviado não recebeu o parâmetro protocolo_id.'))
324            return False
325        area_id_auth = self._get_area_usuario()
326        if area_id_auth is None:
327            return False
328        session = Session()
329        return bool(session.query(db.Tramite).\
330                    filter_by(area_id=area_id_auth).\
331                    filter_by(protocolo_id=protocolo_id).\
332                    filter_by(data_recebimento=None).first())
333
334    def _valida_protocolo_fluxo_area_inicial(self, **kwargs):
335        """O tipo de protocolo e o tipo de documento possuem uma
336        definição de fluxo rigoroso. A área onde o usuário está lotado
337        deve corresponder a uma área inicial de uma das transições
338        desse fluxo.
339        """
340        tipoprotocolo = kwargs.get('tipoprotocolo', None)
341        tipodocumento_id = kwargs.get('tipodocumento_id', None)
342        if tipoprotocolo is None or tipodocumento_id is None:
343            logger(_(u'O método _valida_protocolo_fluxo_area_inicial não recebeu os parâmetros tipoprotocolo e tipodocumento_id.'))
344            return False
345        area_id_auth = self._get_area_usuario()
346        if area_id_auth is None:
347            return False
348        session = Session()
349        fluxo = session.query(db.Fluxo).\
350                filter_by(tipoprotocolo=tipoprotocolo).\
351                filter_by(tipodocumento_id=tipodocumento_id).\
352                filter_by(flexivel=False).first()
353        if fluxo is None:
354            return True
355        return bool(session.query(db.Transicao).\
356                    filter_by(fluxo_id=fluxo.id).\
357                    filter_by(area_origem_id=area_id_auth).\
358                    filter_by(inicial=True).first())
359
360    def _valida_protocolo_nao_recebido(self, **kwargs):
361        """Protocolo enviado pela área de lotação do usuário mas ainda
362        não recebido pela área destino.
363        """
364        api = getUtility(ISPDOAPI)
365        protocolo_id = kwargs.get('protocolo_id', api.getProtocoloId())
366        if protocolo_id is None:
367            logger(_(u'O método _valida_protocolo_nao_recebido não recebeu o parâmetro protocolo_id.'))
368            return False
369        area_id_auth = self._get_area_usuario()
370        if area_id_auth is None:
371            return False
372        session = Session()
373        return bool(session.query(db.TramiteOutbox).\
374                    filter_by(area_id=area_id_auth).\
375                    filter_by(protocolo_id=protocolo_id).first())
376
377    def _valida_protocolo_situacao_final(self, **kwargs):
378        """Protocolo não pode estar em situação final.
379        """
380        api = getUtility(ISPDOAPI)
381        protocolo_id = kwargs.get('protocolo_id', api.getProtocoloId())
382        if protocolo_id is None:
383            logger(_(u'O método _valida_protocolo_situacao_final não recebeu o parâmetro protocolo_id.'))
384            return False
385        protocolo = api.getProtocolo(protocolo_id)
386        if protocolo is None:
387            return False
388        return not protocolo.situacao.final
389
390    def _valida_protocolo_tramita_area(self, **kwargs):
391        """Protocolo tramita na área onde o usuário autenticado está
392        lotado.
393        """
394        api = getUtility(ISPDOAPI)
395        protocolo_id = kwargs.get('protocolo_id', api.getProtocoloId())
396        if protocolo_id is None:
397            logger(_(u'O método _valida_protocolo_tramita_area não recebeu o parâmetro protocolo_id.'))
398            return False
399        area_id_auth = self._get_area_usuario()
400        if area_id_auth is None:
401            return False
402        session = Session()
403        return bool(session.query(db.TramiteInbox).\
404                    filter_by(protocolo_id=protocolo_id).\
405                    filter_by(area_id=area_id_auth).first())
406
407
408def verifica_senha(email, senha_plain):
409    """Verifica se a senha informada confere com o hash armazenado."""
410    api = getUtility(ISPDOAPI)
411    pessoa = api.getPessoaByEmail(email)
412    return bcrypt.hashpw(senha_plain, pessoa.senha) == pessoa.senha
413
414def modifica_senha(email, senha_plain):
415    """Altera a senha de uma pessoa, gerando um novo hash."""
416    api = getUtility(ISPDOAPI)
417    pessoa = api.getPessoaByEmail(email)
418    pessoa.senha = bcrypt.hashpw(senha_plain, bcrypt.gensalt())
Note: Veja TracBrowser para ajuda no uso do navegador do trac.
 

The contents and data of this website are published under license:
Creative Commons 4.0 Brasil - Atribuir Fonte - Compartilhar Igual.