gpt4 book ai didi

python - Apache 节俭 : Multitask single Server and Client

转载 作者:太空宇宙 更新时间:2023-11-04 05:12:48 25 4
gpt4 key购买 nike

我读过 thisthis .但是,我的情况不同。我不需要服务器上的多路复用服务,也不需要与服务器的多个连接。

背景:
对于我的大数据项目,我需要计算给定大数据的核心集。Coreset 是大数据的一个子集,保留了大数据最重要的数学关系。

工作流程:

  • 将大数据分割成更小的 block
  • 客户端解析chunk并发送给服务器
  • 服务器计算核心集并保存结果

我的问题:
整个过程作为单线程执行。客户端解析一个 block ,然后等待服务器完成核心集的计算,然后解析另一个 block ,依此类推。

目标:
利用多处理。客户端同时解析多个 block ,对于每个 compute coreset 请求,服务器都会分配一个线程来处理它。线程数量有限的地方。类似于游泳池的东西。

我知道我需要使用与 TSimpleServer 不同的协议(protocol),并转向 TThreadPoolServer 或 TThreadedServer。我只是无法确定选择哪一个,因为两者似乎都不适合我?

TThreadedServer spawns a new thread for each client connection, and each thread remains alive until the client connection is closed.


In TThreadedServer each client connection gets its own dedicated server thread. Server thread goes back to the thread pool after client closes the connection for reuse.

我不需要每个连接一个线程,我想要一个连接,并且服务器同时处理多个服务请求。 可视化:

Client:
Thread1: parses(chunk1) --> Request compute coreset
Thread2: parses(chunk2) --> Request compute coreset
Thread3: parses(chunk3) --> Request compute coreset

Server: (Pool of 2 threads)
Thread1: Handle compute Coreset
Thread2: handle compute Coreset
.
.
Thread1 becomes available and handles another compute coreset

代码:
api.thrift:

struct CoresetPoint {
1: i32 row,
2: i32 dim,
}

struct CoresetAlgorithm {
1: string path,
}

struct CoresetWeightedPoint {
1: CoresetPoint point,
2: double weight,
}

struct CoresetPoints {
1: list<CoresetWeightedPoint> points,
}

service CoresetService {

void initialize(1:CoresetAlgorithm algorithm, 2:i32 coresetSize)

oneway void compressPoints(1:CoresetPoints message)

CoresetPoints getTotalCoreset()
}


服务器:(为了更好看,删除了实现)

class CoresetHandler:
def initialize(self, algorithm, coresetSize):

def _add(self, leveledSlice):

def compressPoints(self, message):

def getTotalCoreset(self):


if __name__ == '__main__':
logging.basicConfig()
handler = CoresetHandler()
processor = CoresetService.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)

# You could do one of these for a multithreaded server
# server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
# server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)

print 'Starting the server...'
server.serve()
print 'done.'


客户:

try:
# Make socket
transport = TSocket.TSocket('localhost', 9090)

# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)

# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)

# Create a client to use the protocol encoder
client = CoresetService.Client(protocol)

# Connect!
transport.open()


// Here data is sliced, and in a loop I move on all files
Saved in the directory I specified, then they are parsed and
client.compressPoints(data) is invoked.

SliceFile(...)
p = CoresetAlgorithm(...)
client.initialize(p, 200)
for filename in os.listdir('/home/tony/DanLab/slicedFiles'):
if filename.endswith(".txt"):
data = _parse(filename)
client.compressPoints(data)
compressedData = client.getTotalCoreset()


# Close!
transport.close()

except Thrift.TException, tx:
print '%s' % (tx.message)

问题:Thrift 有可能吗?我应该使用什么协议(protocol)?我通过在函数声明中添加 oneway 解决了客户端等待服务器完成计算的部分问题to表示客户端只发出请求,根本不等待任何响应。

最佳答案

从本质上讲,这更像是一个架构问题,而不是 Thrift 问题。鉴于前提

I don't need a thread per connection, I want a single connection, and the server to handle multiple service requests the same time. Visiualization:

I solved the partial problem of client waiting for server to finish computation by adding oneway to function declaration to indicates that the client only makes a request and does not wait for any response at all.

准确地描述了用例,你想要这样:

+---------------------+
| Client |
+---------+-----------+
|
|
+---------v-----------+
| Server |
+---------+-----------+
|
|
+---------v-----------+ +---------------------+
| Queue<WorkItems> <----------+ Worker Thread Pool |
+---------------------+ +---------------------+

服务器唯一的任务是获取请求并尽快将它们插入工作项队列。这些工作项由独立的工作线程池处理,否则完全独立于服务器部分。唯一共享的部分是工作项队列,这当然需要正确同步的访问方法。

关于 serevr 的选择:如果服务器足够快地处理请求,甚至 TSimpleServer 也可以。

关于python - Apache 节俭 : Multitask single Server and Client,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42574390/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com