gpt4 book ai didi

activemq - 如何使用 .NET Core 中的 AMQP.Net Lite 库以及 clientId 和订阅者名称以及主题名称来创建持久的发布者/订阅者主题

转载 作者:行者123 更新时间:2023-12-04 16:35:55 26 4
gpt4 key购买 nike

我是 ActiveMQ 的新手,但我尝试并能够创建一个持久的发布者,但我无法设置客户端 ID,因为我没有找到任何具有客户端 ID 的属性,甚至无法在 Google 中找到。如果我能得到一些示例代码,那将是很大的帮助。

笔记:
不使用 NMS 协议(protocol)。我在 .NET Core Web API 中使用 AMQP.Net Lite 和 ActiveMQ用于使用 ClientId 创建持久的发布者/订阅者。

最佳答案

为了创建对 ActiveMQ 或 ActiveMQ Artemis 的持久订阅,您的客户端需要做几件事。

  • 使用 AMQP 'ContainerId' 属性为客户端设置一个唯一的“client-id”,可以在下面的代码中看到。客户端每次连接并恢复其持久订阅时都必须使用相同的容器 ID。
  • 创建一个新 session 。
  • 为您要订阅的地址(在本例中为主题)创建一个新的接收者。持久订阅的源需要将地址设置为主题地址(在 ActiveMQ 中为主题://名称)。 Source 还需要将 expiray 策略设置为 NEVER,Source 还必须将终端持久性状态设置为 UNSETTLED_STATE,并将分发模式设置为 COPY。
  • 创建 Receiver 后,您可以在 start 中设置 onMessage 处理程序或调用 receive 来使用消息(假设您已授予代理向您发送任何消息的信用)。

  • using System;
    using Amqp;
    using Amqp.Framing;
    using Amqp.Types;
    using Amqp.Sasl;
    using System.Threading;



    namespace aorg.apache.activemq.examples
    {
    class Program
    {
    private static string DEFAULT_BROKER_URI = "amqp://localhost:5672";
    private static string DEFAULT_CONTAINER_ID = "client-1";
    private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription";
    private static string DEFAULT_TOPIC_NAME = "test-topic";



    static void Main(string[] args)
    {
    Console.WriteLine("Starting AMQP durable consumer example.");

    Console.WriteLine("Creating a Durable Subscription");
    CreateDurableSubscription();

    Console.WriteLine("Attempting to recover a Durable Subscription");
    RecoverDurableSubscription();

    Console.WriteLine("Unsubscribe a durable subscription");
    UnsubscribeDurableSubscription();

    Console.WriteLine("Attempting to recover a non-existent durable subscription");
    try
    {
    RecoverDurableSubscription();
    throw new Exception("Subscription was not deleted.");
    }
    catch (AmqpException)
    {
    Console.WriteLine("Recover failed as expected");
    }

    Console.WriteLine("Example Complete.");
    }

    // Creating a durable subscription involves creating a Receiver with a Source that
    // has the address set to the Topic name where the client wants to subscribe along
    // with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the
    // Distribution Mode set to 'Copy'. The link name of the Receiver represents the
    // desired name of the Subscription and of course the Connection must carry a container
    // ID uniqure to the client that is creating the subscription.
    private static void CreateDurableSubscription()
    {
    Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
    SaslProfile.Anonymous,
    new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);

    try
    {
    Session session = new Session(connection);

    Source source = CreateBasicSource();

    // Create a Durable Consumer Source.
    source.Address = DEFAULT_TOPIC_NAME;
    source.ExpiryPolicy = new Symbol("never");
    source.Durable = 2;
    source.DistributionMode = new Symbol("copy");

    ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null);

    session.Close();
    }
    finally
    {
    connection.Close();
    }
    }

    // Recovering an existing subscription allows the client to ask the remote
    // peer if a subscription with the given name for the current 'Container ID'
    // exists. The process involves the client attaching a receiver with a null
    // Source on a link with the desired subscription name as the link name and
    // the broker will then return a Source instance if this current container
    // has a subscription registered with that subscription (link) name.
    private static void RecoverDurableSubscription()
    {
    Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
    SaslProfile.Anonymous,
    new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);

    try
    {
    Session session = new Session(connection);
    Source recoveredSource = null;
    ManualResetEvent attached = new ManualResetEvent(false);

    OnAttached onAttached = (link, attach) =>
    {
    recoveredSource = (Source) attach.Source;
    attached.Set();
    };

    ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);

    attached.WaitOne(10000);
    if (recoveredSource == null)
    {
    // The remote had no subscription matching what we asked for.
    throw new AmqpException(new Error());
    }
    else
    {
    Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
    Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
    Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
    Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
    }

    session.Close();
    }
    finally
    {
    connection.Close();
    }
    }

    // Unsubscribing a durable subscription involves recovering an existing
    // subscription and then closing the receiver link explicitly or in AMQP
    // terms the close value of the Detach frame should be 'true'
    private static void UnsubscribeDurableSubscription()
    {
    Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
    SaslProfile.Anonymous,
    new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);

    try
    {
    Session session = new Session(connection);
    Source recoveredSource = null;
    ManualResetEvent attached = new ManualResetEvent(false);

    OnAttached onAttached = (link, attach) =>
    {
    recoveredSource = (Source) attach.Source;
    attached.Set();
    };

    ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);

    attached.WaitOne(10000);
    if (recoveredSource == null)
    {
    // The remote had no subscription matching what we asked for.
    throw new AmqpException(new Error());
    }
    else
    {
    Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
    Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
    Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
    Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
    }

    // Closing the Receiver vs. detaching it will unsubscribe
    receiver.Close();

    session.Close();
    }
    finally
    {
    connection.Close();
    }
    }

    // Creates a basic Source type that contains common attributes needed
    // to describe to the remote peer the features and expectations of the
    // Source of the Receiver link.
    private static Source CreateBasicSource()
    {
    Source source = new Source();

    // These are the outcomes this link will accept.
    Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"),
    new Symbol("amqp:rejected:list"),
    new Symbol("amqp:released:list"),
    new Symbol("amqp:modified:list") };

    // Default Outcome for deliveries not settled on this link
    Modified defaultOutcome = new Modified();
    defaultOutcome.DeliveryFailed = true;
    defaultOutcome.UndeliverableHere = false;

    // Configure Source.
    source.DefaultOutcome = defaultOutcome;
    source.Outcomes = outcomes;

    return source;
    }
    }

    }

    关于activemq - 如何使用 .NET Core 中的 AMQP.Net Lite 库以及 clientId 和订阅者名称以及主题名称来创建持久的发布者/订阅者主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43816483/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com