注意:以下文档只适用于TOP接口,请谨慎使用!

文档中心 > 聚石塔

消息同步服务组件使用

更新时间:2019/07/31 访问次数:46535

开通消息同步服务

登录至聚石塔控制台https://console.cloud.tmall.com),切换到业务域“聚石塔”,在左上角进入“组件中心”,选择启用“消息同步”。

开通MQ服务

A.开发者启动消息同步服务,订阅消息后,平台会把开发者订阅的消息写到入MQ消息队列,初次启用,需要先开通MQ服务。

B.消息队列MQ为按量付费产品,立即购买不会产生任何费用,按实际使用量进行后付费。

订购消息类型

进入控制台“组件中心”下的“消息同步服务”,根据业务需要,选择需要订购消息的应用APPKEY。

 

订阅消息成功,系统将自动在消息队列MQ服务中创建系统级topic。开发者可登陆聚石塔控制台->应用列表->选择你的应用,在互联网中间件关联MQ实例。

注意:聚石塔控制台是打造以应用为中心的PAAS平台,应用作为开发者最基本的业务单元,因此需要先创建应用对云产品进行关联授权,聚石塔控制台应用介绍可详见文档

 

关联成功后,可以在消息队列RocketMQ中查看当前消息队列TOPIC。

 

消息同步服务类型及场景

 业务类型  业务场景  业务描述
 淘宝物流  物流详情跟踪  实现跟踪物流订单流转状态,实现客户关怀及异常订单监控
淘宝交易评价   负面印象评价消息  智能分析订单评价信息,提取用户对订单的负面印象关键字,基于核心关键字实现对商品、物流、客服等维度的优化解决方案.
 淘宝交易  订单数据管理  订单数据实时推送到消息队列,可实现按用户分组从队列中过滤消息,灵活实现在大促场景下大客户的资源保障方案.
淘宝商品   商品数据管理  实现商品生命周期管理,及时监控商品变更记录.
 淘宝分销  淘宝分销订单管理 实现品牌商全渠道订单统一管理,及时获取分销商店铺经营情况. 
 淘宝退款  退款退货数据管理  订单退款退货管理,实现发货订单退款快速拦截以及oa协同退款财务审批

消息同步服务使用指南

环境准备

A.下载API SDK,登陆聚石塔控制台->“应用管理”->“应用列表”,选择开通消息同步服务的应用进入应用证书,下载最新的SDK进行开发.

B.下载MQ SDK,访问阿里云帮助中心,下载MQ对应开发语言SDK。

访问地址:https://help.aliyun.com/document_detail/114448.html?spm=a2c4g.11186623.6.565.5eb9208dHmOurV

C.订阅消息同步服务成功后,将在MQ控制台发布管理中生成一个系统级topic.用户需要在订阅管理中订阅系统级topic获取消息 

注意事项

使用MQ服务的应用程序需要部署在ECS上,如果只是测试,可以创建一个公网测试环境下的topic,这个topic可以在外网收发消息,进行一些简单的测试.

开启用户消息推送

订阅开通消息同步服务后,系统将默认为您的应用增加消息同步服务权限包,可通过api为特定的用户开通消息实时推送.

添加消息同步用户

调用taobao.jushita.jms.user.add 添加MQ消息同步用户.此api调用需要传入sessionkey【即授权码】

删除消息同步用户

调用taobao.jushita.jms.user.delete 删除MQ消息同步用户,删除后用户的消息将不会推送到聚石塔的MQ中.入参必须输入user_nick来判断删除用户是否成功.

查询消息同步用户

调用taobao.jushita.jms.user.get 查询某个用户是否同步消息.入参必须输入user_nick来判断该用户是否开通消息推送.

 

JAVA示例代码

  1. TaobaoClient client=new DefaultTaobaoClient(url, appkey, secret);
  2. JushitaJmsUserAddRequest req=new JushitaJmsUserAddRequest();
  3. JushitaJmsUserAddResponse response = client.execute(req , sessionKey);

接收消息

接收消息的前提条件

1.应用appkey订阅消息需要推送的消息.

2.调用taobao.jushita.jms.user.add添加需要推送的用户.

3.确保应用appkey拥有同步MQ消息权限包.

集群方式消费消息

Java代码:

 

  1. public class ConsumerTest {  
  2.   public static void main(String[] args) { 
  3.     Properties properties = new Properties();
  4.     properties.put(PropertyKeyConst.ConsumerId, "CID_001");// ons控制台订阅管理中获取ConsumerID
  5.     properties.put(PropertyKeyConst.AccessKey, "appkey");// 应用appkey,根据用户实际参数修改
  6.     properties.put(PropertyKeyConst.SecretKey, "secretkey");// 应用密钥,根据用户实际参数修改
  7.     properties.put(PropertyKeyConst.OnsChannel,  "CLOUD"); // cloud为聚石塔标识
  8.     properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);//集群消费,默认为集群消费模式
  9.    // properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.BROADCASTING);广播消费模式
  10.     Consumer consumer = ONSFactory.createConsumer(properties); 
  11.     consumer.subscribe("TopicTestONS", "*", new MessageListener() {// 消息队列名称,根据用户实际参数修改
  12.        public Action consume(Message message, ConsumeContext context) {
  13.            String msg_body=new String(message.getBody(),Charsets.UTF-8);
  14.            JSONObject messageobject=JSONObject.fromObject(msg_body);
  15.            String usernick=messageobject.getString("user_nick");
  16.            System.out.println("usernick"+ usernick);
  17.           return Action.CommitMessage;
  18.        }
  19.     });
  20.     consumer.start();
  21.     System.out.println("Consumer Started");
  22.   }
  23. } 
.NET代码:

 

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Runtime.InteropServices;
  6. using ons;
  7. using std;
  8. namespace ons {
  9.   // pushconsumer拉取到消息后,会主动调用该实例的consume函数
  10.   public class MyMsgListener : MessageListener {
  11.      public MyMsgListener(){}
  12.      ~MyMsgListener(){}
  13.      public override Action consume(ref Message value){
  14.        // 形参value是返回的消息实例,可以根据业务逻辑提取message的各个字段
  15.        Console.WriteLine("\nCallback topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}",value.getTopic(),value.getTag(),value.getKey(),value.getMsgID(),value.getBody());
  16.        return ons.Action.CommitMessage;
  17.      }
  18.   }
  19.  class onscsharp{
  20.    static void Main(string[] args){
  21.      // pushconsumer创建和工作需要的参数,必须输入
  22.      ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  23.        factoryInfo.setFactoryProperty(factoryInfo.getConsumerIdName(), "CID_xxx");// ons控制台订阅管理中获取ConsumerID
  24.        factoryInfo.setFactoryProperty(factoryInfo.getPublishTopicsName(), "xxxxx");//cid_xxx,从xxx地方获取
  25.        factoryInfo.setFactoryProperty(factoryInfo.getAccessKeyName(),"xxx");// 应用appkey
  26.        factoryInfo.setFactoryProperty(factoryInfo.getSecretKeyName(), "xxxx");// 应用密钥
  27.        // create consumerONS
  28.     ONSFactory onsfactory = new ONSFactory();
  29.     PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
  30.     // register msg listener and subscribe msg topic
  31.     MessageListener msgListener = new MyMsgListener();
  32.     pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", ref msgListener);
  33.     // start consumer
  34.     pConsumer.start();
  35.     // consumer启动后,会自动拉取消息,拉取到消息后,会自动调用mymsglistener实例的consume函数
  36.     // 确定消费完成后,调用shutdown函数,在应用退出前,必须销毁consumer对象,否则会导致内存泄露等问题
  37.     pConsumer.shutdown();
  38.    }
  39.  }
  40. } 

FAQ

关于此文档暂时还没有FAQ
返回
顶部