gpt4 book ai didi

python - 使用 Twisted 模拟 DNS 服务器

转载 作者:行者123 更新时间:2023-12-04 21:01:22 24 4
gpt4 key购买 nike

我正在尝试编写一个基于 Twisted 的模拟 DNS 服务器来做一些测试。

this guide 中汲取灵感我写了一个非常简单的服务器,它只是将所有内容解析为 127.0.0.1 :

from twisted.internet import defer, reactor
from twisted.names import dns, error, server


class MockDNSResolver:

def _doDynamicResponse(self, query):
name = query.name.name
record = dns.Record_A(address=b"127.0.0.1")
answer = dns.RRHeader(name=name, payload=record)
authority = []
additional = []
return [answer], authority, additional

def query(self, query, timeout=None):
print("Incoming query for:", query.name)
if query.type == dns.A:
return defer.succeed(self._doDynamicResponse(query))
else:
return defer.fail(error.DomainError())


if __name__ == "__main__":
clients = [MockDNSResolver()]
factory = server.DNSServerFactory(clients=clients)
protocol = dns.DNSDatagramProtocol(controller=factory)
reactor.listenUDP(10053, protocol)
reactor.listenTCP(10053, factory)
reactor.run()


以上与 dig 一起工作得很好和 nslookup (来自不同的终端):
$ dig -p 10053 @localhost something.example.org A +short
127.0.0.1

$ nslookup something.else.example.org 127.0.0.1 -port=10053
Server: 127.0.0.1
Address: 127.0.0.1#10053

Non-authoritative answer:
Name: something.else.example.org
Address: 127.0.0.1

我还在运行服务器的终端上获得了相应的点击:
Incoming query for: something.example.org
Incoming query for: something.else.example.org

然后,我基于 this section about making requests 编写了以下代码段和 this section about installing a custom resolver :

from twisted.internet import reactor
from twisted.names.client import createResolver
from twisted.web.client import Agent


d = Agent(reactor).request(b'GET', b'http://does.not.exist')
reactor.installResolver(createResolver(servers=[('127.0.0.1', 10053)]))


def callback(result):
print('Result:', result)


d.addBoth(callback)
d.addBoth(lambda _: reactor.stop())

reactor.run()

但这失败了(并且我在服务器终端中没有线路)。看起来好像查询不会发送到模拟服务器,而是发送到系统定义的服务器:
Result: [Failure instance: Traceback: <class 'twisted.internet.error.DNSLookupError'>: DNS lookup failed: no results for hostname lookup: does.not.exist.
/.../venv/lib/python3.6/site-packages/twisted/internet/_resolver.py:137:deliverResults
/.../venv/lib/python3.6/site-packages/twisted/internet/endpoints.py:921:resolutionComplete
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:460:callback
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:568:_startRunCallbacks
--- <exception caught here> ---
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:654:_runCallbacks
/.../venv/lib/python3.6/site-packages/twisted/internet/endpoints.py:975:startConnectionAttempts
]

我正在使用:
  • macOS 10.14.6 Python 3.6.6,扭曲 18.9.0
  • Linux Mint 19.1、Python 3.6.9、Twisted 19.7.0

  • 感谢您的帮助,如果需要其他信息,请告诉我。

    谢谢!

    最佳答案

    解决方案是:

  • (客户端)按照@Hadus 的建议,交换安装解析器的行的顺序并为请求创建延迟。我认为这无关紧要,因为 react 堆还没有运行。
  • (服务器)工具 lookupAllRecords ,重用现有的 _doDynamicResponse方法。

  • # server
    from twisted.internet import defer, reactor
    from twisted.names import dns, error, server


    class MockDNSResolver:
    """
    Implements twisted.internet.interfaces.IResolver partially
    """

    def _doDynamicResponse(self, name):
    print("Resolving name:", name)
    record = dns.Record_A(address=b"127.0.0.1")
    answer = dns.RRHeader(name=name, payload=record)
    return [answer], [], []

    def query(self, query, timeout=None):
    if query.type == dns.A:
    return defer.succeed(self._doDynamicResponse(query.name.name))
    return defer.fail(error.DomainError())

    def lookupAllRecords(self, name, timeout=None):
    return defer.succeed(self._doDynamicResponse(name))


    if __name__ == "__main__":
    clients = [MockDNSResolver()]
    factory = server.DNSServerFactory(clients=clients)
    protocol = dns.DNSDatagramProtocol(controller=factory)
    reactor.listenUDP(10053, protocol)
    reactor.listenTCP(10053, factory)
    reactor.run()

    # client
    from twisted.internet import reactor
    from twisted.names.client import createResolver
    from twisted.web.client import Agent


    reactor.installResolver(createResolver(servers=[('127.0.0.1', 10053)]))
    d = Agent(reactor).request(b'GET', b'http://does.not.exist:8000')


    def callback(result):
    print('Result:', result)


    d.addBoth(callback)
    d.addBoth(lambda _: reactor.stop())

    reactor.run()
    $ python client.py
    Result: <twisted.web._newclient.Response object at 0x101077f98>

    (我正在另一个终端中运行一个带有 python3 -m http.server 的简单 Web 服务器,否则我会得到一个合理的 twisted.internet.error.ConnectionRefusedError 异常)。

    关于python - 使用 Twisted 模拟 DNS 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59902042/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com