集成智能客服到钉钉机器人
本文适合对极态有一定了解的开发者,通过对本文的学习,你将获得以下收获:
- 对JAAP(Jit Ai Application Protocol)有一定程度的理解,有能力遵循JAAP自定义元素族类。
- 将智能客服智能体集成到钉钉机器人。建议先完成《5分钟开发一个AI应用(智能客服)》的学习。
- 强烈建议准备本地开发环境,方便调试,详情参考本地开发与调试。
效果预览
前置准备
钉钉账号与企业创建
- 注册并登录钉钉账号。
- 创建属于自己的企业。
钉钉开发平台应用创建
- 登录钉钉开发者平台。
- 进入应用开发 →
企业内部应用
→钉钉应用
。 - 点击
创建应用
,设置应用名称、应用描述,创建完成后进入应用详情页。 - 进入
应用能力
→添加应用能力
,找到机器人
并添加。 - 机器人配置中的消息接收模式选择
Stream模式
。 - 发布应用。
- 创建一个企业内部群,并添加刚才创建的机器人。
- 进入
基础信息
→凭证与基础信息
→应用凭证
。你可以看到Client ID
和Client Secret
,这两个参数将被设计为Type元素的可配置参数。
开发环境准备
- 如您尚未完成桌面版安装与激活,请先安装并完成《5分钟开发一个AI应用(智能客服)》快速入门案例。
- 使用你喜欢的IDE打开JitNode目录进行本地开发与调试。
应用目录位于:
#路径格式
[JitNode安装目录]/home/environs/[EnvId]/[OrgId]/[AppId]/[Version]
#示例
/data/JitNode/home/environs/JED_c1tqsCN7Q5/whwy/myapp/0.0.0
应用目录规则属于JAAP的应用规范的一部分,请参考JAAP了解更多。
元素族类设计
极态开发框架为开发者提供大量开箱即用的元素族类,并提供了对应的可视化配置界面。当已有元素族类无法满足需求时,开发者可以遵循JAAP,定义自己的元素族类。
接下来,我们将定义一个元素族类,实现将智能客服集成到钉钉机器人的业务需求。
Meta元素
fullName:imRobots.meta
所有与三方IM机器人对接的Type元素,其type值都指向imRobots.meta
。
Meta元素还负责对Type元素的加载,当然也可以不在Meta元素中实现加载逻辑,平台会自动使用JitNode内置的默认加载器。
Type元素
fullName:imRobots.dingTalkStreamType
集成钉钉机器人SDK,封装消息收发、处理等技术实现,将可配置的参数开放出去(clientId
和clientSecret
)。
作为Type元素,还需要实现对实例元素的加载逻辑。
实例元素
fullName:imRobots.dingTalkDemo
在配置文件中配置Type元素要求的参数:clientId
和clientSecret
。
元素目录结构
├── imRobots/
│ ├── meta/
│ │ ├── e.json
│ │ └── __init__.py
│ ├── dingTalkStreamType/
│ │ ├── e.json
│ │ └── loader.py
│ │ └── handler.py
│ │ └── client_manager.py
│ │ └── __init__.py
│ └── dingTalkDemo/
│ ├── e.json
│ └── config.json
│ └── __init__.py
├── requirements.txt
└── ...
元素族类实现
本示例需要使用钉钉官方提供的dingtalk_stream包,请在App根目录下的requirements.txt
中添加dingtalk-stream
依赖。
dingtalk-stream==0.24.2
python-socks==2.7.1
imRobots.meta Meta元素
- meta/e.json
- meta/__init__.py
{
"backendBundleEntry": ".",
"description": "所有和三方IM机器人对接相关的Type元素,都归属于IM机器人族类",
"title": "IM机器人",
"type": ""
}
# ...
imRobots.dingTalkStreamType Type元素
- dingTalkStreamType/e.json
- dingTalkStreamType/handler.py
- dingTalkStreamType/client_manager.py
- dingTalkStreamType/loader.py
- dingTalkStreamType/__init__.py
{
"backendBundleEntry": ".",
"description": "封装钉钉机器人对接的细节,包括消息发送、接收、处理等,将配置参数开放",
"title": "钉钉机器人",
"type": "imRobots.meta"
}
import json
import time
import dingtalk_stream
from dingtalk_stream import AckMessage
from jit.commons.utils.logger import log as logger
class TextHandler(dingtalk_stream.ChatbotHandler):
def __init__(self, element, config):
super(dingtalk_stream.ChatbotHandler, self).__init__()
self.logger = logger
self.element = element
self.config = config
def _create_initial_card(self, question: str) -> dict:
"""创建初始思考中的卡片"""
return {
"config": {"autoLayout": True, "enableForward": True},
"header": {"title": {"type": "text", "text": question}},
"contents": [
{
"type": "markdown",
"text": "[思考]正在分析您的问题,请稍候...",
"id": f"thinking_{int(time.time() * 1000)}",
}
],
}
def _create_streaming_card(self, content: str, question: str) -> dict:
"""创建流式更新时的卡片"""
current_time = int(time.time() * 1000)
return {
"config": {"autoLayout": True, "enableForward": True},
"header": {"title": {"type": "text", "text": question}},
"contents": [{"type": "markdown", "text": content, "id": f"answer_{current_time}"}],
}
def _create_final_card(self, response: str, incoming_message: dingtalk_stream.ChatbotMessage, text: str) -> dict:
"""创建最终带按钮的卡片"""
return {
"config": {"autoLayout": True, "enableForward": True},
"header": {"title": {"type": "text", "text": text}},
"contents": [
{"type": "markdown", "text": response, "id": f"answer_{int(time.time() * 1000)}"},
{"type": "divider", "id": f"divider_{int(time.time() * 1000)}"},
{
"type": "action",
"actions": [
{
"type": "button",
"label": {
"type": "text",
"text": "😄 非常有帮助",
"id": f"text_helpful_{int(time.time() * 1000)}",
},
"actionType": "request",
"status": "normal",
"size": "small",
"id": f"button_helpful_{int(time.time() * 1000)}",
"value": json.dumps(
{
"action": "feedback",
"type": "helpful",
"message_id": incoming_message.message_id,
"original_text": text,
"response": response,
}
),
},
{
"type": "button",
"label": {
"type": "text",
"text": "😊 有帮助",
"id": f"text_helpful_{int(time.time() * 1000)}",
},
"actionType": "request",
"status": "normal",
"size": "small",
"id": f"button_helpful_{int(time.time() * 1000)}",
"value": json.dumps(
{
"action": "feedback",
"type": "helpful",
"message_id": incoming_message.message_id,
"original_text": text,
"response": response,
}
),
},
{
"type": "button",
"label": {
"type": "text",
"text": "🙂 帮助不大",
"id": f"text_unhelpful_{int(time.time() * 1000)}",
},
"actionType": "request",
"status": "normal",
"size": "small",
"id": f"button_unhelpful_{int(time.time() * 1000)}",
"value": json.dumps(
{
"action": "feedback",
"type": "unhelpful",
"message_id": incoming_message.message_id,
"original_text": text,
"response": response,
}
),
},
{
"type": "button",
"label": {
"type": "text",
"text": "😅 没帮助",
"id": f"text_unhelpful_{int(time.time() * 1000)}",
},
"actionType": "request",
"status": "normal",
"size": "small",
"id": f"button_unhelpful_{int(time.time() * 1000)}",
"value": json.dumps(
{
"action": "feedback",
"type": "unhelpful",
"message_id": incoming_message.message_id,
"original_text": text,
"response": response,
}
),
},
],
"id": f"action_{int(time.time() * 1000)}",
},
],
}
async def process(self, callback: dingtalk_stream.CallbackMessage):
"""
处理钉钉消息
"""
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
senderStuffId = incoming_message.sender_staff_id
text = incoming_message.text.content.strip()
# 发送初始卡片
initial_card_data = self._create_initial_card(text)
card_biz_id = self.reply_card(card_data=initial_card_data, incoming_message=incoming_message, at_sender=True)
# 流式回调
def create_stream_callback(card_biz_id: str, question: str) -> callable:
full_response = []
update_count = 0
last_update_time = time.time()
pending_updates = 0
MAX_UPDATES = 20
def stream_callback(chunk):
nonlocal update_count, last_update_time, pending_updates, full_response
if chunk:
content = chunk.get("data", {}).get("content", None)
if content:
full_response.append(content)
pending_updates += 1
current_time = time.time()
# 检查是否需要更新卡片
should_update = (
# 更新次数限制
update_count < MAX_UPDATES - 1 # 预留最后一次更新
# 时间间隔或消息数量条件
and (current_time - last_update_time >= 2 or pending_updates >= 5)
)
if should_update:
updated_card_data = self._create_streaming_card("".join(full_response), question)
self.update_card(card_biz_id, updated_card_data)
update_count += 1
last_update_time = current_time
pending_updates = 0
return stream_callback
response = ""
with self.element.env.getReqContext(self.element.appId):
CorpMember = app.getElement("corps.models.CorpMember")
app.currentUser = CorpMember()
agent = app.getElement(self.config.get("agent"))
response = agent.run(
params={"input_data": text},
chatId=senderStuffId,
stream_callback=create_stream_callback(card_biz_id, text),
)
# 更新最终卡片
final_card_data = self._create_final_card(response, incoming_message, text)
self.update_card(card_biz_id, final_card_data)
return AckMessage.STATUS_OK, "OK"
import asyncio
import threading
from typing import Optional
import dingtalk_stream
from jit.commons.utils.logger import log as logger
class ClientManager:
def __init__(self, client_id: str, client_secret: str):
"""
初始化 ClientManager
Args:
client_id: 钉钉应用的 Client ID
client_secret: 钉钉应用的 Client Secret
"""
self.client_id = client_id
self.client_secret = client_secret
self.client: Optional[dingtalk_stream.DingTalkStreamClient] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def start(self, message_handler):
"""
启动客户端并在单独的线程中运行
Args:
message_handler: 消息处理器实例
"""
if self._thread and self._thread.is_alive():
logger.warning("Client is already running")
return
# 创建凭证和客户端
credential = dingtalk_stream.Credential(self.client_id, self.client_secret)
self.client = dingtalk_stream.DingTalkStreamClient(credential)
# 注册消息处理器
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, message_handler)
# 创建并启动线程
self._thread = threading.Thread(target=self._run_client, daemon=True)
self._thread.start()
logger.info("Client started in background thread")
def _run_client(self):
"""在线程中运行客户端"""
try:
# 尝试获取当前事件循环
try:
loop = asyncio.get_running_loop()
logger.info("Found existing event loop")
except RuntimeError:
# 如果没有运行中的事件循环,创建一个新的
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info("Created new event loop for client thread")
self._loop = loop
# 设置超时时间(秒)
timeout = 300 # 5分钟超时
# 如果当前事件循环正在运行,使用asyncio.run_coroutine_threadsafe
if loop.is_running():
logger.info("Using run_coroutine_threadsafe for running event loop")
future = asyncio.run_coroutine_threadsafe(self.client.start_forever(), loop)
try:
future.result(timeout=timeout)
except asyncio.TimeoutError:
logger.error(f"Client startup timed out after {timeout} seconds")
return
except Exception as e:
logger.exception(f"Error occurred while running client: {str(e)}")
return
else:
# 如果事件循环没有运行,使用run_until_complete
logger.info("Using run_until_complete for new event loop")
try:
loop.run_until_complete(asyncio.wait_for(self.client.start_forever(), timeout=timeout))
except asyncio.TimeoutError:
logger.error(f"Client startup timed out after {timeout} seconds")
return
except Exception as e:
logger.exception(f"Error occurred while running client: {str(e)}")
return
except Exception as e:
logger.exception(f"Error in client thread: {str(e)}")
finally:
try:
# 清理事件循环
if self._loop and self._loop.is_running():
logger.info("Stopping event loop")
self._loop.stop()
if self._loop and not self._loop.is_closed():
logger.info("Closing event loop")
self._loop.close()
except Exception as e:
logger.error(f"Error cleaning up event loop: {e}")
finally:
self._stop_event.set()
logger.info("Client thread finished")
def stop(self):
"""停止客户端"""
if not self._thread or not self._thread.is_alive():
logger.warning("Client is not running")
return
try:
if self.client:
# 在事件循环中停止客户端
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self.client.stop)
else:
self.client.stop()
self._stop_event.wait(timeout=5) # 等待线程结束,最多等待5秒
logger.info("Client stopped")
except Exception as e:
logger.error(f"Error stopping client: {e}")
def is_running(self) -> bool:
"""检查客户端是否正在运行"""
return self._thread is not None and self._thread.is_alive()
import json
import re
from .client_manager import ClientManager
from .handler import TextHandler
class Loader(object):
def __init__(self, nodes):
self.nodes = nodes
def renderTemplateString(self, source, **context):
pattern = r"\{\{(\w+)\}\}"
def replaceVar(match):
var_name = match.group(1)
return str(context.get(var_name, ""))
rendered = re.sub(pattern, replaceVar, source)
return rendered
def load(self):
element = self.nodes[0]
file = element.getFile("config.json")
config = self.renderTemplateString(file, **app.envVars)
config = json.loads(config)
clientId = config.get("clientId")
clientSecret = config.get("clientSecret")
return self.start_client(clientId, clientSecret, config)
def start_client(self, client_id: str, client_secret: str, config: dict):
"""c
启动钉钉流式客户端
Args:
client_id: 钉钉应用的 Client ID
client_secret: 钉钉应用的 Client Secret
logger: 日志记录器
"""
# 创建消息处理器
message_handler = TextHandler(self.nodes[0], config)
# 创建并启动客户端管理器
client_manager = ClientManager(client_id, client_secret)
client_manager.start(message_handler)
return client_manager
from .loader import Loader
__all__ = ["Loader"]
imRobots.dingTalkDemo 实例元素
- dingTalkDemo/e.json
- dingTalkDemo/config.json
{
"backendBundleEntry": ".",
"backendLoadTime": "afterAppInit",
"type": "imRobots.dingTalkStreamType",
"title": "钉钉智能客服",
"description": "JitAi智能客服钉钉机器人实例,配置具体参数"
}
{
"agent":"<agent fullName>",
"clientId": "<clientId>",
"clientSecret": "<clientSecret>"
}
clientId
和clientSecret
需要从钉钉开发者平台获取,在这里填写实际值。
agent
需要填写智能客服AIAgent实例元素的fullName。
例如:元素目录在App下的相对路径是aiagents/ragTest
,则fullName为aiagents.ragTest
,可以访问应用的 开发区门户->源码视图查看元素路径。
打包并查看效果
- 删除应用目录中的
dist
目录。 - 重启JitNode桌面端。
- 访问应用(路径示例:
http://127.0.0.1:8000/whwy/myapp
),触发重新打包。 - 到钉钉群中发送消息并@机器人,查看效果。