gpt4 book ai didi

go - 使用 QPid 和 golang 包装器 Electron 连接到 AMQP 1.0 Azure EventHub

转载 作者:数据小太阳 更新时间:2023-10-29 03:12:43 33 4
gpt4 key购买 nike

我想使用 Qpid 质子-c 库的 Electron golang 包装器连接到 Azure EventHub。

我将以下 SASL 详细信息设置为构建连接字符串所需的主机/端口/命名空间/路径,但由于某种原因,我不断收到错误消息:connection reset by peer .

package main

import (
"fmt"
"os"
"strings"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
)

var (
eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
eventHubName = "<MY_CUSTOM_NAME>"
eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)

func main() {

sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))

urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
fmt.Printf("The URL connection string: '%v'\n", urlStr)

// parse URL
url, err := amqp.ParseURL(urlStr)
if err != nil {
panic(err)
}
fmt.Printf("The AMQP parsed URL: %v\n", url)

// TCP dial
amqpHost := url.Host
fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
c, err := container.Dial(
"tcp", amqpHost,
electron.SASLEnable(),
electron.Password([]byte(eventHubSasKey)),
electron.User(eventHubSasKeyName),
)
if err != nil {
panic(err)
}
defer c.Close(nil)

// AMQP send
addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
if err != nil {
panic(err)
}
m := amqp.NewMessage()
body := fmt.Sprintf("bla bla bla %v", 42)
m.Marshal(body)
fmt.Printf("The AMQP message body: '%v'\n", m.Body())

go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan

// AMQP ACK receive
fmt.Printf("Waiting for ACKs...\n")
for {
fmt.Printf("Waiting for an ACK coming out of the channel...\n")
out := <-sentChan // Outcome of async sends.
fmt.Printf("Received something: '%v'\n", out)
}
}

编译,然后运行代码,这是输出:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

对我来说,收到的消息是 connection reset by peer看起来不像是有效的 ACK,我不确定连接尝试有什么问题?

  • proton-c的编译版本是0.18.0,我用的是go1.7.4 linux/amd64 .
  • 如果我添加 electron.SASLAllowedMechs("EXTERNAL")到连接选项,然后我收到相同的错误消息。
  • 如果我将端口更改为 5672 ,然后我得到一个 connection refused尝试通过 TCP 拨号后出现紧急错误。
  • 如果我用 base64.StdEncoding.DecodeString(eventHubSasKey) 解码 base64 密码字段并将字节传递给连接选项我不断收到相同的错误 connection reset by peer .
  • 如果我添加这个连接选项 electron.SASLAllowedMechs("ANONYMOUS") , 然后我仍然收到相同的错误消息 connection reset by peer .这样做的原因是我没有使用任何 SSL 证书,并且 Microsoft 提供的 AMQP 的 Java 包装器似乎使用这个“匿名”的东西而不是证书(实际上不需要证书来连接到 EventHub 使用Java 连接器)。

我不确定如何继续这里,因为我卡在连接部分,我相信 SASL 详细信息根据此处的文档以正确的方式传递:https://godoc.org/qpid.apache.org/electron#ConnectionOption

我仍然不确定失败的原因不是因为 SSL 证书,如果是这样的话,我正在努力了解如何将它们包含在流程中。

编辑:

我后来发现我必须通过 TCP 建立 TLS 连接,即使我没有提供任何私钥/公钥对,也指定了一个“虚拟主机”(否则 AMQP 会提示无法识别主机):

    // TLS connection details
tlsConfig := &tls.Config{}
eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
if err != nil {
panic(err)
}

// AMPQ container connection on top of TLS via TCP
eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
amqpConn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.User(eventHubSasKeyName),
electron.Password([]byte(eventHubSasKey)),
electron.VirtualHost(eventHubDomain),
// electron.SASLAllowedMechs(<SOME_MECHANISM>),
)
if err != nil {
panic(err)
}
defer amqpConn.Close(nil)

// AMQP sender (a AMQP link with target the name defined on the Azure portal)
s, err := amqpConn.Sender(electron.Target(eventHubName))
if err != nil {
panic(err)
}

然而,当使用环境变量 PN_TRACE_FRM=true 运行应用程序时(这在 proton-c 级别给了我一些详细的日志记录)现在错误是:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

这个 afaik 意味着 SASL 详细信息(用户名/密码)必须是“发件人”类型,因为我正在尝试将某些内容发送到事件中心。我在 Azure 门户上仔细检查了这些详细信息(单击“共享访问策略”> 然后使用将“声明”指定为“发送”的策略),它们是正确的。所以我不确定为什么会收到此错误。

我实际上尝试了在不同级别的 Azure 门户上定义的这些 SASL 策略,<MY_CUSTOM_NAMESPACE><MY_CUSTOM_NAME> ,但始终显示相同的错误消息。

我还尝试包括各种 SASL 机制,例如使用 electron.SASLAllowedMechs("PLAIN") 时然后我得到这个错误:no mechanism available: No worthy mechs found (Authentication failed [mech=none]) .

最佳答案

在端口 5671 的 urlStr 中使用“amqps”方案。事件中心不允许纯 tcp 连接。您还需要启用 SASL PLAIN 以发送在命名空间或事件中心实体上配置的 SAS key (用户名= key 名称,密码= key )(看起来您已经在这样做了)。我不确定 golang,但使用 Python 绑定(bind)可以将所有内容放入 Uri,如“amqps://sas-key-name:url-encoded-key@your-namespace.servicebus.windows.net:5671”。端口号是可选的。

如果底层 proton-c 引擎看到不同的受支持 SASL 机制,则它可能不会使用 SASL PLAIN。要强制执行 PLAIN,您可以在容器上设置允许的机制。在 go 中,SASLAllowedMechs 函数似乎为您提供了一个连接选项,您可以在创建连接时提供该选项。

这是 Python code这与事件中心配合得很好。

关于go - 使用 QPid 和 golang 包装器 Electron 连接到 AMQP 1.0 Azure EventHub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46910778/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com