gpt4 book ai didi

mysql - 如何将 Slick 3.1(Scala) 中的数据分成 4 部分

转载 作者:行者123 更新时间:2023-11-29 20:18:01 27 4
gpt4 key购买 nike

为了加快 lucene 索引(v6.1)的速度,我想将 Slick 3.1(Scala)中的数据拆分为多个部分( block ),以便在线程中传递不同的数据集以加快索引过程。我在 Scala 中编写了以下代码来从 MySQL 获取数据。

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {    
import databaseService._
import databaseService.driver.api._
import com.github.t3hnar.bcrypt._
def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)
}
case class NoteEntity(id: Option[Long] = None, title: String, teaser: String, description: String)

NotesService 代码

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {

import databaseService._
import databaseService.driver.api._
import com.github.t3hnar.bcrypt._

def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)

}

要从我使用的 NotesService 获取数据:

def setI = {
val NUM_THREADS = Runtime.getRuntime().availableProcessors()
val IndexStoreDir = Paths.get("/var/www/html/Index")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND)
writerConfig.setRAMBufferSizeMB(500)
.setMaxBufferedDocs(2)
.setMergeScheduler(new ConcurrentMergeScheduler())
val directory = FSDirectory.open(IndexStoreDir)
var writer = new IndexWriter(directory, writerConfig)

val threads = Array.ofDim[IndexTh](NUM_THREADS)
val notes = notesService.getNotes()

for (i <- 0 until NUM_THREADS){

threads(i) = new IndexTh(notesService, writer)
//here on this line I want to pass different sets of data to thread.
}
for (i <- 0 until NUM_THREADS) {
threads(i).start()
println("Thread " + i + " Started!")
}
}

在线:

threads(i) = new IndexTh(notesService, writer)

如何从notesService中拆分数据以传递给线程?如何将笔记中的数据拆分为多个 block ?我想要这样的数据:

假设 notesService.getNotes() 检索 20000 行数据。现在我想将这些行分成 5 个部分,每部分 4000 行,以便每 4000 行数据可以传递到不同的线程。

最佳答案

经过长时间的研究终于找到了答案:

使用线程:

def setI = {
val NUM_THREADS = Runtime.getRuntime().availableProcessors()
val curNotes = notesService.getNotes()

val totalRows = Await.result(curNotes, Duration.Inf).length
var totalPages = totalRows / NUM_THREADS
if(totalPages != totalPages.toInt){
totalPages = totalPages + 1
}
var tmp = Await.result(curNotes, Duration.Inf).grouped(totalPages).toList
val rows = tmp(tmp.length-2) ++ tmp.last
val threads = Array.ofDim[Index](NUM_THREADS)

val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND)
writerConfig.setRAMBufferSizeMB(500)
.setMaxBufferedDocs(10)
.setMergeScheduler(new ConcurrentMergeScheduler())
val directory = FSDirectory.open(IndexStoreDir)
val writer = new IndexWriter(directory, writerConfig)
var count = 0

for(i <- 0 until tmp.length - 2){
count = i
threads(i) = new Index(tmp(i), writer, i)
}
count = count + 1
threads(count) = new Index(rows, writer, count)

for (i <- 0 until NUM_THREADS) {
println("Thread :" + threads(i).getName + " => " + (i + 1) + " Started!")
threads(i).start()
}
}

使用 Scala Future:

def setFutureIndex = {
val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE)
writerConfig.setRAMBufferSizeMB(500)
val directory = FSDirectory.open(IndexStoreDir)
val writer = new IndexWriter(directory, writerConfig)
val notes = notesService.getNotes() //Gets all notes from slick. Data is coming in getNotes()
var doc = new Document()

def indexingFuture = {
val list = Seq (
notes.map(_.foreach {
case (note) =>
writeToDoc(note, writer)
})
)
Future.sequence(list)
}

Await.result(indexingFuture, Duration.Inf)

/*indexingFuture.onComplete {
case Success(value) => println(value)
case Failure(e) => e.printStackTrace()
}*/
}

def writeToDoc(note: NoteEntity, writer: IndexWriter) = Future {
println("*****Indexing: " + note.id.get)
var doc = new Document()
var field = new TextField("title", " {##" + note.id.get + "##} " + note.title, Field.Store.YES)
doc.add(field)

field = new TextField("teaser", note.teaser, Field.Store.YES)
doc.add(field)

field = new TextField("description", note.description, Field.Store.YES)
doc.add(field)

writer.addDocument(doc)

writer.commit()
println("*****Completed: " + note.id.get)
var status = "*****Completed: " + note.id.get
}

关于mysql - 如何将 Slick 3.1(Scala) 中的数据分成 4 部分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39634851/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com