dingtalk.client.isv 源代码

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

import logging

from dingtalk.core.utils import to_text, json_loads
from dingtalk.client import DingTalkClient
from dingtalk.client.base import BaseClient
from dingtalk.client.channel import ChannelClient
from dingtalk.core.constants import SuitePushType
from dingtalk.crypto import DingTalkCrypto
from dingtalk.storage.cache import ISVCache

logger = logging.getLogger(__name__)


class ISVDingTalkClient(DingTalkClient):
    def __init__(self, corp_id, isv_client):
        super(ISVDingTalkClient, self).__init__(corp_id, 'isv_auth:' + isv_client.suite_key,
                                                isv_client.storage, isv_client.timeout, isv_client.auto_retry)
        self.isv_client = isv_client

    def get_access_token(self):
        return self.isv_client.get_access_token_by_corpid(self.corp_id)


class ISVChannelClient(ChannelClient):
    def __init__(self, corp_id, isv_client):
        super(ISVChannelClient, self).__init__(corp_id, 'isv_channel:' + isv_client.suite_key,
                                               isv_client.storage, isv_client.timeout, isv_client.auto_retry)
        self.isv_client = isv_client

    def get_channel_token(self):
        return self.isv_client.get_channel_token_by_corpid(self.corp_id)


[文档]class ISVClient(BaseClient): def __init__(self, suite_key, suite_secret, token=None, aes_key=None, storage=None, timeout=None, auto_retry=True): super(ISVClient, self).__init__(storage, timeout, auto_retry) self.suite_key = suite_key self.suite_secret = suite_secret self.cache = ISVCache(self.storage, 'isv:' + self.suite_key) self.crypto = DingTalkCrypto(token, aes_key, suite_key) def _handle_pre_request(self, method, uri, kwargs): if 'suite_access_token=' in uri or 'suite_access_token' in kwargs.get('params', {}): raise ValueError("suite_access_token: " + uri) uri = '%s%ssuite_access_token=%s' % (uri, '&' if '?' in uri else '?', self.suite_access_token) return method, uri, kwargs def _handle_request_except(self, e, func, *args, **kwargs): if e.errcode in (33001, 40001, 42001, 40014): self.cache.suite_access_token.delete() if self.auto_retry: return func(*args, **kwargs) raise e def set_suite_ticket(self, suite_ticket): self.cache.suite_ticket.set(value=suite_ticket) @property def suite_access_token(self): self.cache.suite_access_token.get() token = self.cache.suite_access_token.get() if token is None: ret = self.get_suite_access_token() token = ret['suite_access_token'] expires_in = ret.get('expires_in', 7200) self.cache.suite_access_token.set(value=token, ttl=expires_in) return token def _handle_permanent_code(self, permanent_code_data): permanent_code = permanent_code_data.get('permanent_code', None) ch_permanent_code = permanent_code_data.get('ch_permanent_code', None) corp_id = permanent_code_data.get('auth_corp_info', {}).get('corpid', None) if corp_id is None: return if permanent_code is not None: self.cache.permanent_code.set(corp_id, permanent_code) if ch_permanent_code is not None: self.cache.ch_permanent_code.set(corp_id, ch_permanent_code) def get_dingtalk_client(self, corp_id): return ISVDingTalkClient(corp_id, self) def get_channel_client(self, corp_id): return ISVChannelClient(corp_id, self) def proc_message(self, message): if not isinstance(message, dict): return event_type = message.get('EventType', None) if event_type == SuitePushType.SUITE_TICKET.value: suite_ticket = message.get('SuiteTicket', None) if suite_ticket: self.set_suite_ticket(suite_ticket) return elif event_type == SuitePushType.TMP_AUTH_CODE.value: auth_code = message.get('AuthCode') permanent_code_data = self.get_permanent_code(auth_code) message['__permanent_code_data'] = permanent_code_data return elif event_type == SuitePushType.SUITE_RELIEVE.value: corp_id = message.get('AuthCorpId') self.cache.permanent_code.delete(corp_id) self.cache.ch_permanent_code.delete(corp_id) return else: return def parse_message(self, msg, signature, timestamp, nonce): message = self.crypto.decrypt_message(msg, signature, timestamp, nonce) try: message = json_loads(to_text(message)) self.proc_message(message) except Exception as e: logger.error("proc_message error %s %s", message, e) return message def get_ch_permanent_code_from_cache(self, corp_id): return self.cache.ch_permanent_code.get(corp_id) def get_permanent_code_from_cache(self, corp_id): return self.cache.permanent_code.get(corp_id)
[文档] def get_suite_access_token(self): """ 获取应用套件令牌Token :return: """ return self._request( 'post', '/service/get_suite_token', data={ "suite_key": self.suite_key, "suite_secret": self.suite_secret, "suite_ticket": self.cache.suite_ticket.get() } )
[文档] def get_permanent_code(self, tmp_auth_code): """ 获取企业授权的永久授权码 :param tmp_auth_code: 回调接口(tmp_auth_code)获取的临时授权码 :return: """ permanent_code_data = self.post( '/service/get_permanent_code', {'tmp_auth_code': tmp_auth_code} ) self._handle_permanent_code(permanent_code_data) return permanent_code_data
[文档] def activate_suite(self, corp_id): """ 激活套件 :param corp_id: 授权方corpid :return: """ return self.post( '/service/activate_suite', { 'suite_key': self.suite_key, 'auth_corpid': corp_id, 'permanent_code': self.cache.permanent_code.get(corp_id)} )
[文档] def get_access_token_by_corpid(self, corp_id): """ 获取企业授权的凭证 :param corp_id: 授权方corpid :return: """ return self.post( '/service/get_corp_token', {'auth_corpid': corp_id, 'permanent_code': self.cache.permanent_code.get(corp_id)} )
[文档] def get_auth_info(self, corp_id): """ 获取企业基本信息 :param corp_id: 授权方corpid :return: """ return self.post( '/service/get_auth_info', {'auth_corpid': corp_id, 'suite_key': self.suite_key} )
[文档] def get_agent(self, corp_id, agent_id): """ 获取企业的应用信息 :param corp_id: 授权方corpid :param agent_id: 授权方应用id :return: """ return self.post( '/service/get_agent', { 'suite_key': self.suite_key, 'auth_corpid': corp_id, 'agentid': agent_id, 'permanent_code': self.get_permanent_code_from_cache(corp_id) } )
[文档] def get_unactive_corp(self, app_id): """ 获取应用未激活的企业列表 :param app_id: 套件下的微应用ID :return: """ return self.post( '/service/get_unactive_corp', {'app_id': app_id} )
[文档] def reauth_corp(self, app_id, corpid_list): """ 重新授权未激活应用的企业 :param app_id: 套件下的微应用ID :param corpid_list: 未激活的corpid列表 :return: """ return self.post( '/service/reauth_corp', {'app_id': app_id, 'corpid_list': corpid_list} )
[文档] def set_corp_ipwhitelist(self, corp_id, ip_whitelist): """ ISV为授权方的企业单独设置IP白名单 :param corp_id: 授权方corpid :param ip_whitelist: 要为其设置的IP白名单,格式支持IP段,用星号表示,注意:仅支持后两段设置为星号 :return: """ return self.post( '/service/set_corp_ipwhitelist', {'auth_corpid': corp_id, 'ip_whitelist': ip_whitelist} )
[文档] def get_channel_token_by_corpid(self, corp_id): """ ISV获取企业服务窗接口调用TOKEN :param corp_id: 授权方corpid :return: """ return self.post( '/service/get_channel_corp_token', {'auth_corpid': corp_id, 'ch_permanent_code': self.get_ch_permanent_code_from_cache(corp_id)} )