你的位置:首页 > 数据库

[数据库]Service Broker应用(1):简介、同server不同DB间的数据传输

简介:SQL Server Service Broker,以下简称SSB,是一种完全基于MSSQL数据库的数据处理技术,为短时间内处理大量数据提供了一种可靠、稳定、高效的解决方案。一次同步的数据最大可达2G,采用二进制传输,多线程处理数据。可以理解为数据库中的消息中间件。

  根据负载类型分,SSB有Windows负载类型和证书类型,由于证书类型不支持跨集群的数据传输,故不讨论,用Windows负载类型。后续脚本都是Windows负载类型的脚本。

  SQL Server版本在2008及以上。

应用场景

  • 外部数据进入DB(如
  • DB之间的数据同步:包括同server不同DB,不同server之间,集群之间的数据同步

优点:安全、稳定可靠、解耦合、多线程、处理数据很快、对网络的实时连接要求不高

缺点:跨集群的解决方案很复杂,实施起来不易;查找数据处理失败的异常原因比较困难;

 

实施方案--同server不同DB:新建数据库的步骤省略,跨server的实施方案见下一个篇

消息发送方配置脚本:

  --第一步:用户数据库开启service broker服务
  USE master
  GO
    select is_broker_enabled,service_broker_guid,* from sys.databases where name IN ('QEC_Interface')--is_broker_enabled是1表示开启,0反之
    ALTER DATABASE QEC_Interface SET NEW_BROKER WITH ROLLBACK IMMEDIATE --遇到无法执行下面的语句时,先执行这一条语句:回滚事务

    ALTER DATABASE QEC_Interface SET ENABLE_BROKER
    ALTER DATABASE QEC_Interface SET TRUSTWORTHY ON

  GO

  --第二步:发送方用户数据库上创建消息类型:消息类型的名称要与接收方的名称完全相同(区分大小写)
  use QEC_Interface
  go
    create message type [//qciscm05/QEC_Interface/NoneCustomsSend]
    validation = well_formed_
    --For Receiving MessageType
    create message type [//qciscm05/QEC_Interface/NoneCustomsReceived]
    validation = well_formed_
  --第三步:用户数据库上创建约定:约定的名称要与接收方的名称完全相同(区分大小写)
  create contract [//qciscm05/QEC_Interface/NoneCustomsContract]
  ( [//qciscm05/QEC_Interface/NoneCustomsSend] SENT BY INITIATOR,
  [//qciscm05/QEC_Interface/NoneCustomsReceived] SENT BY TARGET
  )
  --第四步:用户数据库上创建队列
  create queue InterfaceNoneCustomsQueue
  with status=on
  --第五步:用户数据库上创建服务
  create service [//qciscm05/QEC_Interface/NoneCustomsService]
  on queue InterfaceNoneCustomsQueue
  ([//qciscm05/QEC_Interface/NoneCustomsContract])

  -------------------发送方配置完成----------------------------------
  --目标队列绑定SP,自动处理反馈消息
  alter queue InterfaceNoneCustomsQueue
  with status = on,
  activation(
  status = ON,
  procedure_name= [dbo].[SP_SSB_HandleReceiveNoneCustoms],
  max_queue_readers=50,
  execute as self
  );

  ---=手动发送消息测试

  exec SP_SSB_SendNoneCustoms 'TEHP1301996QN','<
  <RESPONSECONTENT>
  <RESPONSE_MESID>C4EE2C5C758A4656800B6617BAC5B541</RESPONSE_MESID>
  <APPL_TYPE>A</APPL_TYPE>
  <DECLARE_TYPE>GJN</DECLARE_TYPE>
  <STEP_ID />
  <SEQ_NO>TEHP1301996QN</SEQ_NO>
  <JOB_NO>GJAtestOK</JOB_NO>
  <RESULT_CODE>OK01</RESULT_CODE>
  <RESULT_INFO />
  </RESPONSECONTENT>
  </

  --==基本查询操作
  select conversation_handle,state_desc,* from sys.conversation_endpoints--查看当前数据库中开启的会话
  select conversation_handle,cast(message_body as
  select transmission_status,cast(message_body as

消息接受方配置脚本:

  --第一步:用户数据库开启service broker服务
  use master
  go
    select is_broker_enabled,service_broker_guid,* from sys.databases where name IN ('QEC')
    ALTER DATABASE QEC SET NEW_BROKER WITH ROLLBACK IMMEDIATE --遇到无法执行下面的语句时,先执行这一条语句

    ALTER DATABASE QEC set ENABLE_BROKER
    ALTER DATABASE QEC SET TRUSTWORTHY ON

  go

  --第二步:发送方用户数据库上创建消息类型:消息类型的名称要与发送方的名称完全相同(区分大小写)
  use QEC
  go
    create message type [//qciscm05/QEC_Interface/NoneCustomsSend]
    validation = well_formed_
    --For Receiving MessageType
    create message type [//qciscm05/QEC_Interface/NoneCustomsReceived]
    validation = well_formed_
  --第三步:用户数据库上创建约定:约定的名称要月发送方的名称完全相同(区分大小写)
   create contract [//qciscm05/QEC_Interface/NoneCustomsContract]
    ( [//qciscm05/QEC_Interface/NoneCustomsSend] SENT BY INITIATOR,
      [//qciscm05/QEC_Interface/NoneCustomsReceived] SENT BY TARGET
    )
  --第四步:用户数据库上创建队列
  create queue QECNoneCustomsQueue
  with status=on
  --第五步:用户数据库上创建服务
  create service [//qciscm05/QEC/NoneCustomsService]
  on queue QECNoneCustomsQueue
  ([//qciscm05/QEC_Interface/NoneCustomsContract])

  ------------------接收方配置完成-----------------------------------

  --==队列绑定SP,自动处理队列中接收到的消息

  alter queue QECNoneCustomsQueue
  with status = on, --on设置队列为启用,off相反
  activation(
  status = on, --on设置绑定的SP为激活状态,off相反
  procedure_name=SP_SSB_HandleMessageForNoneCustoms, --指定要绑定的SP
  max_queue_readers=50, --设置并发数:SP同时处理消息的进程数,多线程(最大值32767)
  execute as self --指定SP以当前用户身份执行
  );
  --==
  select conversation_handle,conversation_id,state_desc,* from sys.conversation_endpoints--查看当前数据库中开启的会话
  select conversation_handle,cast(message_body as
  select transmission_status,cast(message_body as