日期:2014-05-17  浏览次数:21007 次

[分享] .Net实现 WebSphere MQ与Oracle数据库的XA事务管理
WebSphere MQ以下简称为WMQ.
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库

该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.

实现一个MQGet类:
C# code

using System;
using System.Collections.Generic;
using System.Text;

using System.Collections;
using System.Transactions;
using IBM.WMQ;


namespace WMQClient_WithXA
{
    public class MqGet
    {
        private String _host = "127.0.0.1";
        private int _port;
        private String _channelName = "SYSTEM.DEF.SVRCONN";
        private String _queueManagerName = null;
        private String _queueName = null;
        private int _charSet;
        private WMQTransactionType _transactionType;
        private bool isTopic = false;
        private String _transportMode = "managed";
        private bool commit = true;

        private MQQueueManager queueManager;
        private MQQueue queue;

        private MyMqObject myMqObj;
        private Hashtable properties;
        private MQMessage message;
        private MQGetMessageOptions getMessageOptions;

        public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed")
        {
            getMessageOptions = new MQGetMessageOptions();
            _host = sHost;
            _port = iPort;
            _channelName = sChannelName;
            _queueManagerName = sMqQmgrName;
            _queueName = sQueueName;
            _transportMode = sTransportMode;
            _charSet = iCharacterSet;
            _transactionType = TransactionType;

            getMessageOptions.Options += MQC.MQGMO_WAIT;
            getMessageOptions.WaitInterval = 20000;  // 20 seconds wait

            properties = new Hashtable();
            properties.Add(MQC.HOST_NAME_PROPERTY, _host);
            properties.Add(MQC.PORT_PROPERTY, _port);
            properties.Add(MQC.CHANNEL_PROPERTY, _channelName);

            switch (TransactionType)
            {
                case WMQTransactionType.NORMAL_TRANSACTION:
                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
                    break;
                case WMQTransactionType.XA_TRANSACTION:
                    if (_transportMode == "managed")
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);                                         // for managed mode                        
                    }
                    else
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT);
                    }
                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
                    break;
            }
        }

        private MQQueueManager getMqManager(MQQueueManager qmg)
        {
            if (qmg == null)
            {
                try
                {
                    qmg = new MQQueueManager(_queueManagerName, properties);
                    return qmg;
                }
                catch(Exception err)
                {
                    Console.WriteLine(err.Message);
                    return null;
                }
            }
            else
                return qmg;
        }
        private MQQueue getMqQ(MQQueue q)
        {
            if (q == null)
            {
                try
                {
                    q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                    return q;
                }
                catch (Exception err)