gpt4 book ai didi

python-2.7 - 如何使用 python ctypes 访问另一个进程的 PEB

转载 作者:行者123 更新时间:2023-12-01 12:30:02 32 4
gpt4 key购买 nike

到现在为止,我有这段代码(我知道它很丑,但这不是现在的重点)

我不知道如何发布以下系统并构建正确的结构来访问另一个进程的 PEB。

我想做以下事情:

  • HANDLE pHandle = OpenProcess
  • NTSTATUS status = NtQueryInformationProcess(pHandle, 0, peb, peb_len, 0)
  • 走 PEB_LDR_DATA

  • 代码:
    from ctypes import *
    from ctypes.wintypes import *
    from _multiprocessing import win32
    import argparse


    class UNICODE_STRING(Structure):
    _fields_ = [
    ("Length", USHORT),
    ("MaximumLength", USHORT),
    ("Buffer", c_wchar_p)
    ]

    class RTL_USER_PROCESS_PARAMETERS(Structure):
    _fields_ = [
    ("Reserved1", BYTE*16),
    ("Reserved2", BYTE*10),
    ("ImagePathName", UNICODE_STRING),
    ("CommandLine", UNICODE_STRING)
    ]

    class PEB(Structure):
    _fields_ = [
    ("Reserved1", BYTE*2),
    ("BeingDebugged", BYTE),
    ("Reserved2", BYTE),
    ("Rserved3", LPVOID),
    ("Ldr", LPVOID),
    ("ProcessParameters", POINTER(RTL_USER_PROCESS_PARAMETERS)),
    ("Reserved4", BYTE*104),
    ("Reserved5", LPVOID*52),
    ("PostProcessInitRoutine", LPVOID),
    ("Reserved6", BYTE*128),
    ("Reserved7", LPVOID),
    ("SessionId", ULONG)
    ]

    class PROCESS_BASIC_INFORMATION(Structure):
    _fields_ = [
    ("Reserved1", LPVOID),
    ("PebBaseAddress", POINTER(PEB)),
    ("Reserved2", LPVOID*2),
    ("UniqueProcessId", POINTER(ULONG)),
    ("Reserved3", LPVOID)
    ]


    def main():
    # Command Line Arguments Parsing
    parser = argparse.ArgumentParser()
    parser.add_argument('pid', metavar='<process id>', type=int, help='shows basic info about the process')
    parser.add_argument('-dS', metavar='dump strings', help='dump all used strings to txt file')
    parser.add_argument('-dD', metavar='dump dll', help='dump all used strings to txt file')
    args = parser.parse_args()

    var_pid = args.pid

    # WinAPi Calls

    # Variables Definition
    pHandle = HANDLE()
    NTSTATUS = ULONG()
    pbi = PROCESS_BASIC_INFORMATION()
    pPEB = PEB()
    pRTL = RTL_USER_PROCESS_PARAMETERS()
    pCMD = UNICODE_STRING()
    ReturnValue = BOOL()
    bytesRead = ULONG()

    # OpenProcess
    pHandle = windll.kernel32.OpenProcess(win32.PROCESS_ALL_ACCESS, 0, var_pid)

    # NtQueryInformationProcess
    NTSTATUS = windll.ntdll.NtQueryInformationProcess(pHandle, 0, byref(pbi), sizeof(pbi), None)

    # ReadProcessMemory
    ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pbi.PebBaseAddress, byref(pPEB), sizeof(PEB), byref(bytesRead))
    ReturnValue = windll.kernel32.ReadProcessMemory(pHandle, pPEB.ProcessParameters, byref(pRTL), sizeof(RTL_USER_PROCESS_PARAMETERS), byref(bytesRead))

    #msvcrt = cdll.msvcrt
    #msvcrt.printf("%s", pRTL.CommandLine.Buffer)

    temp = pRTL.ImagePathName.Buffer # cant read the unicode from this buffer

    ReturnValue = windll.kernel32.CloseHandle(pHandle)
    exit(0)

    if __name__ == '__main__':
    main()

    **编辑:
    我设法得到了 PEB 并在其中遍历了结构,但我无法从它们的缓冲区中读取 UNICODE 字符串。
    例如我想读取命令行参数

    最佳答案

    值得重复 NtQueryInformationProcess是 native 系统调用,在 Windows 编程中不鼓励使用。 Microsoft 不提供 ntdll.dll 的导入库,因此调用其导出函数的唯一方法是通过 GetProcAddress 动态调用。 .这当然是 ctypes 的工作原理,因此从 Python 调用 native NTAPI 函数或多或少并不困难。问题在于缺乏官方支持和文档,并且 NT 数据结构、API 和可用信息类都可能发生变化。

    另请注意,查询 ProcessBasicInformation从 64 位进程调用时检索 64 位 PEB 的地址。因此,当从 64 位进程查询 WOW64 32 位进程时,您只会看到 native 64 位模块 ntdll.dll、wow64.dll、wow64win.dll 和 wow64cpu.dll。这是 answer它提供了一种通过使用从 64 位 TEB 到 32 位 TEB 的魔术偏移量来查找 32 位 PEB 的地址的技术,该 32 位 TEB 具有指向 32 位 PEB 的指针。但是当然这个实现细节可以随时改变,破坏依赖它的代码。

    以下示例具有查询和使用 ProcessBasicInformation 所需的 ctypes 定义。对于具有相同架构的给定进程(即 native 64 位或 WOW64 32 位)。它包括一个演示用法并提供进程 ID、 session ID、图像路径、命令行和已加载模块路径的属性的类。

    该示例使用 RemotePointer ctypes._Pointer 的子类,以及 RPOINTER工厂功能。此类覆盖 __getitem__方便在另一个进程的地址空间中取消引用指针值。索引键是 index, handle[, size] 形式的元组.可选 size参数(以字节为单位)对于大小字符串很有用,例如 NTAPI UNICODE_STRING ,例如ustr.Buffer[0, hProcess, usrt.Length] .不支持以空结尾的字符串,因为 ReadProcessMemory需要一个大小的缓冲区。

    遍历加载器数据的逻辑在私有(private) _modules_iter 中。方法,它使用内存顺序链表遍历加载的模块。请注意 InMemoryOrderModuleList链接到 InMemoryOrderLinks LDR_DATA_TABLE_ENTRY 的字段结构,等等列表中的每个链接。模块迭代器必须通过该字段的偏移量来调整每个条目的基地址。在 C API 中,这将使用 CONTAINING_RECORD宏。
    ProcessInformation如果没有提供进程 ID 或句柄,构造函数默认查询当前进程。如果调用状态是错误或警告(即负 NTSTATUS ),它调用 NtError获取 OSError 的实例, 或 WindowsError在 3.3 之前。

    我有一个更详细的 NtError 版本,但没有包括在内。调用 FormatMessage获取格式化的错误消息,使用 ntdll.dll 作为源模块。我可以根据要求更新答案以包含此版本。

    该示例在 Windows 7 和 10 中使用 32 位和 64 位版本的 Python 2.7 和 3.5 进行了测试。对于远程进程测试,子进程模块用于启动第二个 Python 实例。将事件句柄传递给子进程以进行同步。如果父进程没有等待子进程完成加载并设置事件,那么子进程的加载器数据在读取时可能没有完全初始化。

    import ctypes
    from ctypes import wintypes

    ntdll = ctypes.WinDLL('ntdll')
    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

    # WINAPI Definitions

    PROCESS_VM_READ = 0x0010
    PROCESS_QUERY_INFORMATION = 0x0400

    ERROR_INVALID_HANDLE = 0x0006
    ERROR_PARTIAL_COPY = 0x012B

    PULONG = ctypes.POINTER(wintypes.ULONG)
    ULONG_PTR = wintypes.LPVOID
    SIZE_T = ctypes.c_size_t

    def _check_bool(result, func, args):
    if not result:
    raise ctypes.WinError(ctypes.get_last_error())
    return args

    kernel32.ReadProcessMemory.errcheck = _check_bool
    kernel32.ReadProcessMemory.argtypes = (
    wintypes.HANDLE, # _In_ hProcess
    wintypes.LPCVOID, # _In_ lpBaseAddress
    wintypes.LPVOID, # _Out_ lpBuffer
    SIZE_T, # _In_ nSize
    ctypes.POINTER(SIZE_T)) # _Out_ lpNumberOfBytesRead

    kernel32.CloseHandle.errcheck = _check_bool
    kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)

    kernel32.GetCurrentProcess.restype = wintypes.HANDLE
    kernel32.GetCurrentProcess.argtypes = ()

    kernel32.OpenProcess.errcheck = _check_bool
    kernel32.OpenProcess.restype = wintypes.HANDLE
    kernel32.OpenProcess.argtypes = (
    wintypes.DWORD, # _In_ dwDesiredAccess
    wintypes.BOOL, # _In_ bInheritHandle
    wintypes.DWORD) # _In_ dwProcessId

    class RemotePointer(ctypes._Pointer):
    def __getitem__(self, key):
    # TODO: slicing
    size = None
    if not isinstance(key, tuple):
    raise KeyError('must be (index, handle[, size])')
    if len(key) > 2:
    index, handle, size = key
    else:
    index, handle = key
    if isinstance(index, slice):
    raise TypeError('slicing is not supported')
    dtype = self._type_
    offset = ctypes.sizeof(dtype) * index
    address = PVOID.from_buffer(self).value + offset
    simple = issubclass(dtype, ctypes._SimpleCData)
    if simple and size is not None:
    if dtype._type_ == wintypes.WCHAR._type_:
    buf = (wintypes.WCHAR * (size // 2))()
    else:
    buf = (ctypes.c_char * size)()
    else:
    buf = dtype()
    nread = SIZE_T()
    kernel32.ReadProcessMemory(handle,
    address,
    ctypes.byref(buf),
    ctypes.sizeof(buf),
    ctypes.byref(nread))
    if simple:
    return buf.value
    return buf

    def __setitem__(self, key, value):
    # TODO: kernel32.WriteProcessMemory
    raise TypeError('remote pointers are read only')

    @property
    def contents(self):
    # a handle is required
    raise NotImplementedError

    _remote_pointer_cache = {}
    def RPOINTER(dtype):
    if dtype in _remote_pointer_cache:
    return _remote_pointer_cache[dtype]
    name = 'RP_%s' % dtype.__name__
    ptype = type(name, (RemotePointer,), {'_type_': dtype})
    _remote_pointer_cache[dtype] = ptype
    return ptype

    # NTAPI Definitions

    NTSTATUS = wintypes.LONG
    PVOID = wintypes.LPVOID
    RPWSTR = RPOINTER(wintypes.WCHAR)
    PROCESSINFOCLASS = wintypes.ULONG

    ProcessBasicInformation = 0
    ProcessDebugPort = 7
    ProcessWow64Information = 26
    ProcessImageFileName = 27
    ProcessBreakOnTermination = 29

    STATUS_UNSUCCESSFUL = NTSTATUS(0xC0000001)
    STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
    STATUS_INVALID_HANDLE = NTSTATUS(0xC0000008).value
    STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value

    class UNICODE_STRING(ctypes.Structure):
    _fields_ = (('Length', wintypes.USHORT),
    ('MaximumLength', wintypes.USHORT),
    ('Buffer', RPWSTR))

    class LIST_ENTRY(ctypes.Structure):
    pass

    RPLIST_ENTRY = RPOINTER(LIST_ENTRY)

    LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
    ('Blink', RPLIST_ENTRY))

    class LDR_DATA_TABLE_ENTRY(ctypes.Structure):
    _fields_ = (('Reserved1', PVOID * 2),
    ('InMemoryOrderLinks', LIST_ENTRY),
    ('Reserved2', PVOID * 2),
    ('DllBase', PVOID),
    ('EntryPoint', PVOID),
    ('Reserved3', PVOID),
    ('FullDllName', UNICODE_STRING),
    ('Reserved4', wintypes.BYTE * 8),
    ('Reserved5', PVOID * 3),
    ('CheckSum', PVOID),
    ('TimeDateStamp', wintypes.ULONG))

    RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)

    class PEB_LDR_DATA(ctypes.Structure):
    _fields_ = (('Reserved1', wintypes.BYTE * 8),
    ('Reserved2', PVOID * 3),
    ('InMemoryOrderModuleList', LIST_ENTRY))

    RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)

    class RTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
    _fields_ = (('Reserved1', wintypes.BYTE * 16),
    ('Reserved2', PVOID * 10),
    ('ImagePathName', UNICODE_STRING),
    ('CommandLine', UNICODE_STRING))

    RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
    PPS_POST_PROCESS_INIT_ROUTINE = PVOID

    class PEB(ctypes.Structure):
    _fields_ = (('Reserved1', wintypes.BYTE * 2),
    ('BeingDebugged', wintypes.BYTE),
    ('Reserved2', wintypes.BYTE * 1),
    ('Reserved3', PVOID * 2),
    ('Ldr', RPPEB_LDR_DATA),
    ('ProcessParameters', RPRTL_USER_PROCESS_PARAMETERS),
    ('Reserved4', wintypes.BYTE * 104),
    ('Reserved5', PVOID * 52),
    ('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
    ('Reserved6', wintypes.BYTE * 128),
    ('Reserved7', PVOID * 1),
    ('SessionId', wintypes.ULONG))

    RPPEB = RPOINTER(PEB)

    class PROCESS_BASIC_INFORMATION(ctypes.Structure):
    _fields_ = (('Reserved1', PVOID),
    ('PebBaseAddress', RPPEB),
    ('Reserved2', PVOID * 2),
    ('UniqueProcessId', ULONG_PTR),
    ('Reserved3', PVOID))

    def NtError(status):
    import sys
    descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
    if status & 0xC0000000 == 0xC0000000:
    descr += '[Error]'
    elif status & 0x80000000 == 0x80000000:
    descr += '[Warning]'
    elif status & 0x40000000 == 0x40000000:
    descr += '[Information]'
    else:
    descr += '[Success]'
    if sys.version_info[:2] < (3, 3):
    return WindowsError(status, descr)
    return OSError(None, descr, None, status)

    NtQueryInformationProcess = ntdll.NtQueryInformationProcess
    NtQueryInformationProcess.restype = NTSTATUS
    NtQueryInformationProcess.argtypes = (
    wintypes.HANDLE, # _In_ ProcessHandle
    PROCESSINFOCLASS, # _In_ ProcessInformationClass
    PVOID, # _Out_ ProcessInformation
    wintypes.ULONG, # _In_ ProcessInformationLength
    PULONG) # _Out_opt_ ReturnLength

    class ProcessInformation(object):
    _close_handle = False
    _closed = False
    _module_names = None

    def __init__(self, process_id=None, handle=None):
    if process_id is None and handle is None:
    handle = kernel32.GetCurrentProcess()
    elif handle is None:
    handle = kernel32.OpenProcess(PROCESS_VM_READ |
    PROCESS_QUERY_INFORMATION,
    False, process_id)
    self._close_handle = True
    self._handle = handle
    self._query_info()
    if process_id is not None and self._process_id != process_id:
    raise NtError(STATUS_UNSUCCESSFUL)

    def __del__(self, CloseHandle=kernel32.CloseHandle):
    if self._close_handle and not self._closed:
    try:
    CloseHandle(self._handle)
    except WindowsError as e:
    if e.winerror != ERROR_INVALID_HANDLE:
    raise
    self._closed = True

    def _query_info(self):
    info = PROCESS_BASIC_INFORMATION()
    handle = self._handle
    status = NtQueryInformationProcess(handle,
    ProcessBasicInformation,
    ctypes.byref(info),
    ctypes.sizeof(info),
    None)
    if status < 0:
    raise NtError(status)
    self._process_id = info.UniqueProcessId
    self._peb = peb = info.PebBaseAddress[0, handle]
    self._params = peb.ProcessParameters[0, handle]
    self._ldr = peb.Ldr[0, handle]

    def _modules_iter(self):
    headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
    PEB_LDR_DATA.InMemoryOrderModuleList.offset)
    offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
    pentry = self._ldr.InMemoryOrderModuleList.Flink
    while pentry:
    pentry_void = PVOID.from_buffer_copy(pentry)
    if pentry_void.value == headaddr:
    break
    pentry_void.value -= offset
    pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
    mod = pmod[0, self._handle]
    yield mod
    pentry = LIST_ENTRY.from_buffer(mod, offset).Flink

    def update_module_names(self):
    names = []
    for m in self._modules_iter():
    ustr = m.FullDllName
    name = ustr.Buffer[0, self._handle, ustr.Length]
    names.append(name)
    self._module_names = names

    @property
    def module_names(self):
    if self._module_names is None:
    self.update_module_names()
    return self._module_names

    @property
    def process_id(self):
    return self._process_id

    @property
    def session_id(self):
    return self._peb.SessionId

    @property
    def image_path(self):
    ustr = self._params.ImagePathName
    return ustr.Buffer[0, self._handle, ustr.Length]

    @property
    def command_line(self):
    ustr = self._params.CommandLine
    buf = ustr.Buffer[0, self._handle, ustr.Length]
    return buf

    示例:
    if __name__ == '__main__':
    import os
    import sys
    import subprocess
    import textwrap

    class SECURITY_ATTRIBUTES(ctypes.Structure):
    _fields_ = (('nLength', wintypes.DWORD),
    ('lpSecurityDescriptor', wintypes.LPVOID),
    ('bInheritHandle', wintypes.BOOL))
    def __init__(self, *args, **kwds):
    super(SECURITY_ATTRIBUTES, self).__init__(*args, **kwds)
    self.nLength = ctypes.sizeof(self)

    def test_remote(use_pid=True, show_modules=False):
    sa = SECURITY_ATTRIBUTES(bInheritHandle=True)
    hEvent = kernel32.CreateEventW(ctypes.byref(sa), 0, 0, None)
    try:
    script = textwrap.dedent(r"""
    import sys
    import ctypes
    kernel32 = ctypes.WinDLL('kernel32')
    kernel32.SetEvent(%d)
    sys.stdin.read()""").strip() % hEvent
    cmd = '"%s" -c "%s"' % (sys.executable, script)
    proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
    close_fds=False)
    try:
    kernel32.WaitForSingleObject(hEvent, 5000)
    if use_pid:
    pi = ProcessInformation(proc.pid)
    else:
    pi = ProcessInformation(handle=int(proc._handle))
    assert pi.process_id == proc.pid
    assert pi.image_path == sys.executable
    assert pi.command_line == cmd
    assert pi.module_names[0] == sys.executable
    if show_modules:
    print('\n'.join(pi.module_names))
    finally:
    proc.terminate()
    finally:
    kernel32.CloseHandle(hEvent)

    print('Test 1: current process')
    pi = ProcessInformation()
    assert os.getpid() == pi.process_id
    assert pi.image_path == pi.module_names[0]
    print('Test 2: remote process (Handle)')
    test_remote(use_pid=False)
    print('Test 3: remote process (PID)')
    test_remote(show_modules=True)

    在 Windows 10 中使用 64 位 Python 3.5 输出:

    Test 1: current process
    Test 2: remote process (Handle)
    Test 3: remote process (PID)
    C:\Program Files\Python35\python.exe
    C:\Windows\SYSTEM32\ntdll.dll
    C:\Windows\system32\KERNEL32.DLL
    C:\Windows\system32\KERNELBASE.dll
    C:\Program Files\Python35\python35.dll
    C:\Program Files\Python35\VCRUNTIME140.dll
    C:\Windows\SYSTEM32\ucrtbase.dll
    C:\Windows\system32\ADVAPI32.dll
    C:\Windows\system32\WS2_32.dll
    C:\Windows\system32\sechost.dll
    C:\Windows\system32\RPCRT4.dll
    C:\Windows\system32\NSI.dll
    C:\Windows\system32\msvcrt.dll
    C:\Windows\SYSTEM32\CRYPTBASE.DLL
    C:\Windows\SYSTEM32\bcryptPrimitives.dll
    C:\Windows\SYSTEM32\CRYPTSP.dll
    C:\Windows\SYSTEM32\bcrypt.dll
    C:\Windows\system32\rsaenh.dll
    C:\Program Files\Python35\python3.dll
    C:\Program Files\Python35\DLLs\_ctypes.pyd
    C:\Windows\system32\ole32.dll
    C:\Windows\system32\combase.dll
    C:\Windows\system32\GDI32.dll
    C:\Windows\system32\USER32.dll
    C:\Windows\system32\OLEAUT32.dll
    C:\Windows\system32\IMM32.DLL
    C:\Windows\system32\MSCTF.dll

    关于python-2.7 - 如何使用 python ctypes 访问另一个进程的 PEB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35106511/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com