作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
为了加快 lucene 索引(v6.1)的速度,我想将 Slick 3.1(Scala)中的数据拆分为多个部分( block ),以便在线程中传递不同的数据集以加快索引过程。我在 Scala 中编写了以下代码来从 MySQL 获取数据。
class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {
import databaseService._
import databaseService.driver.api._
import com.github.t3hnar.bcrypt._
def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)
}
case class NoteEntity(id: Option[Long] = None, title: String, teaser: String, description: String)
NotesService 代码
class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {
import databaseService._
import databaseService.driver.api._
import com.github.t3hnar.bcrypt._
def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)
}
要从我使用的 NotesService 获取数据:
def setI = {
val NUM_THREADS = Runtime.getRuntime().availableProcessors()
val IndexStoreDir = Paths.get("/var/www/html/Index")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND)
writerConfig.setRAMBufferSizeMB(500)
.setMaxBufferedDocs(2)
.setMergeScheduler(new ConcurrentMergeScheduler())
val directory = FSDirectory.open(IndexStoreDir)
var writer = new IndexWriter(directory, writerConfig)
val threads = Array.ofDim[IndexTh](NUM_THREADS)
val notes = notesService.getNotes()
for (i <- 0 until NUM_THREADS){
threads(i) = new IndexTh(notesService, writer)
//here on this line I want to pass different sets of data to thread.
}
for (i <- 0 until NUM_THREADS) {
threads(i).start()
println("Thread " + i + " Started!")
}
}
在线:
threads(i) = new IndexTh(notesService, writer)
如何从notesService中拆分数据以传递给线程?如何将笔记中的数据拆分为多个 block ?我想要这样的数据:
假设 notesService.getNotes()
检索 20000 行数据。现在我想将这些行分成 5 个部分,每部分 4000 行,以便每 4000 行数据可以传递到不同的线程。
最佳答案
经过长时间的研究终于找到了答案:
使用线程:
def setI = {
val NUM_THREADS = Runtime.getRuntime().availableProcessors()
val curNotes = notesService.getNotes()
val totalRows = Await.result(curNotes, Duration.Inf).length
var totalPages = totalRows / NUM_THREADS
if(totalPages != totalPages.toInt){
totalPages = totalPages + 1
}
var tmp = Await.result(curNotes, Duration.Inf).grouped(totalPages).toList
val rows = tmp(tmp.length-2) ++ tmp.last
val threads = Array.ofDim[Index](NUM_THREADS)
val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND)
writerConfig.setRAMBufferSizeMB(500)
.setMaxBufferedDocs(10)
.setMergeScheduler(new ConcurrentMergeScheduler())
val directory = FSDirectory.open(IndexStoreDir)
val writer = new IndexWriter(directory, writerConfig)
var count = 0
for(i <- 0 until tmp.length - 2){
count = i
threads(i) = new Index(tmp(i), writer, i)
}
count = count + 1
threads(count) = new Index(rows, writer, count)
for (i <- 0 until NUM_THREADS) {
println("Thread :" + threads(i).getName + " => " + (i + 1) + " Started!")
threads(i).start()
}
}
使用 Scala Future:
def setFutureIndex = {
val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex")
val analyzer = new StandardAnalyzer()
val writerConfig = new IndexWriterConfig(analyzer)
writerConfig.setOpenMode(OpenMode.CREATE)
writerConfig.setRAMBufferSizeMB(500)
val directory = FSDirectory.open(IndexStoreDir)
val writer = new IndexWriter(directory, writerConfig)
val notes = notesService.getNotes() //Gets all notes from slick. Data is coming in getNotes()
var doc = new Document()
def indexingFuture = {
val list = Seq (
notes.map(_.foreach {
case (note) =>
writeToDoc(note, writer)
})
)
Future.sequence(list)
}
Await.result(indexingFuture, Duration.Inf)
/*indexingFuture.onComplete {
case Success(value) => println(value)
case Failure(e) => e.printStackTrace()
}*/
}
def writeToDoc(note: NoteEntity, writer: IndexWriter) = Future {
println("*****Indexing: " + note.id.get)
var doc = new Document()
var field = new TextField("title", " {##" + note.id.get + "##} " + note.title, Field.Store.YES)
doc.add(field)
field = new TextField("teaser", note.teaser, Field.Store.YES)
doc.add(field)
field = new TextField("description", note.description, Field.Store.YES)
doc.add(field)
writer.addDocument(doc)
writer.commit()
println("*****Completed: " + note.id.get)
var status = "*****Completed: " + note.id.get
}
关于mysql - 如何将 Slick 3.1(Scala) 中的数据分成 4 部分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39634851/
我是一名优秀的程序员,十分优秀!