您的位置:首页 > 脚本大全 > > 正文

java集成钉钉发送消息(Python实现钉钉发送报警消息的方法)

更多 时间:2022-01-26 01:56:49 类别:脚本大全 浏览量:1342

java集成钉钉发送消息

Python实现钉钉发送报警消息的方法

钉钉开放平台传送门:https://open.dingtalk.com

我司使用钉钉作为内部通讯工具,基本上大家在电脑和手机上都开着,消息可以第一时间查看,报警消息的即时性要求比较高,所以适合用钉钉通知。

下面介绍如何用Python实现钉钉发送报警消息。

获取access token

要使用钉钉发送消息,首先需要获取access token,代码如下:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • def get_access_token():
  •  url = 'https://oapi.dingtalk.com/gettoken?corpid=%s&corpsecret=%s' % (corp_id, corp_secret)
  •  request = urllib2.Request(url)
  •  response = urllib2.urlopen(request)
  •  response_str = response.read()
  •  response_dict = json.loads(response_str)
  •  error_code_key = "errcode"
  •  access_token_key = "access_token"
  •  if response_dict.has_key(error_code_key) and response_dict[error_code_key] == 0 and response_dict.has_key(access_token_key):
  •   return response_dict[access_token_key]
  •  else:
  •   return ''
  • access token在2小时内有效,有效期内重复获取返回相同结果,有效期会自动延长。corp_id和corp_secret是企业的id和secret,在钉钉的管理后台可以找到。另外,上面用到了urllib2和json,需要import:

  • ?
  • 1
  • 2
  • import urllib2
  • import json
  • 消息类型

    钉钉的消息类型分为:text,image,voice,file,link和OA,具体消息格式参见:https://open-doc.dingtalk.com/docs/doc.htm?treeId=172&articleId=104972&docType=1 。

    下面以发送文本,链接和文件消息为例进行说明。

    给用户发送消息

    发送文本

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • def send_text_to_users(access_token, users, text):
  •  msg_type, msg = _gen_text_msg(text)
  •  return _send_msg_to_users(access_token, users, msg_type, msg)
  •  
  • def _gen_text_msg(text):
  •  msg_type = 'text'
  •  msg = { "content": text }
  •  return msg_type, msg
  •  
  • def _send_msg_to_users(access_token, users, msg_type, msg):
  •  to_users = '|'.join(users)
  •  body_dict = {
  •   "touser": to_users,
  •   "agentid": agent_id,
  •   "msgtype": msg_type
  •  }
  •  body_dict[msg_type] = msg
  •  body = json.dumps(body_dict)
  •  return _send_msg("https://oapi.dingtalk.com/message/send?access_token=", access_token, body)
  • 其中agent_id是一个钉钉应用的id,以钉钉应用的名义给用户发送消息。users是用户id列表,每个用户id是一个字符串。

    发送链接

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • def send_link_to_users(access_token, users, url, title, text):
  •  msg_type, msg = _gen_link_msg(url, title, text)
  •  return _send_msg_to_users(access_token, users, msg_type, msg)
  •  
  • def _gen_link_msg(url, title, text):
  •  msg_type = 'link'
  •  msg = {
  •   "messageUrl": url,
  •   "picUrl": "https://gw.alicdn.com/tps/TB1FN16LFXXXXXJXpXXXXXXXXXX-256-130.png",
  •   "title": title,
  •   "text": text
  •  }
  •  return msg_type, msg
  • 其中_send_msg_to_users方法参见前面的代码,picUrl字段设置的是钉钉官方的图片,这里用于测试。

    发送文件

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • def send_file_to_users(access_token, users, file_name):
  •  media_id = upload_file(access_token, file_name)
  •  if media_id == '':
  •   return
  •  msg_type, msg = _gen_file_msg(media_id)
  •  return _send_msg_to_users(access_token, users, msg_type, msg)
  •  
  • def upload_file(access_token, file_name):
  •  register_openers()
  •  datagen, headers = multipart_encode({'media': open(file_name, 'rb')})
  •  requst_url = 'https://oapi.dingtalk.com/media/upload?access_token=' + access_token + '&type=file'
  •  request = urllib2.Request(requst_url, datagen, headers)
  •  response = urllib2.urlopen(request)
  •  response_str = response.read()
  •  response_dict = json.loads(response_str)
  •  media_id_key = 'media_id'
  •  error_code_key = 'errcode'
  •  if response_dict.has_key(error_code_key) and response_dict[error_code_key] == 0 and response_dict.has_key(media_id_key):
  •   return response_dict[media_id_key]
  •  else:
  •   return ''
  • 需要先上传文件获得media_id,然后使用media_id将文件发送给用户。另外,这里用到了poster,可使用pip安装:

  • ?
  • 1
  • pip install poster
  • 之后引入multipart_encode和register_openers函数:

  • ?
  • 1
  • 2
  • from poster.encode import multipart_encode
  • from poster.streaminghttp import register_openers
  • 给群会话发送消息

    与给用户发送信息类似,区别是需要群会话id,而不是用户列表,以发送文本消息为例,代码如下:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • def send_text_to_chat(access_token, chat_id, text):
  •  msg_type, msg = _gen_text_msg(text)
  •  return _send_msg_to_chat(access_token, chat_id, msg_type, msg)
  •  
  • def _send_msg_to_chat(access_token, chat_id, msg_type, msg):
  •  body_dict = {
  •   "chatid": chat_id,
  •   "msgtype": msg_type
  •  }
  •  body_dict[msg_type] = msg
  •  body = json.dumps(body_dict)
  •  return _send_msg("https://oapi.dingtalk.com/chat/send?access_token=", access_token, body)
  • 其中_gen_text_msg方法参见前面的代码。

    群会话可以自行创建,参见https://open-doc.dingtalk.com/docs/doc.htm?treeId=172&articleId=104977&docType=1 。

    以上这篇Python实现钉钉发送报警消息的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持开心学习网。

    原文链接:https://blog.csdn.net/anxiaoyuthss/article/details/71908522

    标签:Python 消息 钉钉
    您可能感兴趣