gpt4 book ai didi

为 Postgres COPY 编写二进制格式的 Java 库?

转载 作者:搜寻专家 更新时间:2023-10-31 19:40:10 26 4
gpt4 key购买 nike

有没有人遇到过 Java 库(或一些代码)来编写 Postgres 的 COPY command 使用的 binary 格式? ?

它看起来很简单,但如果有人已经找到正确的元组数据格式,我也可以从这里开始。

实际上,即使只是描述所有数据类型的格式也会有所帮助。

谢谢。

最佳答案

你可以试试 PgBulkInsert ,它实现了 PostgreSQL 的二进制复制协议(protocol):

它也可以从 Maven 中央存储库中获得。

免责声明:我是项目作者。

PostgreSQL 二进制复制协议(protocol)

我不想简单地宣传我的项目,还要写协议(protocol)。

首先我写了一个类PgBinaryWriter ,它包装了一个 DataOutputStream并具有写入二进制协议(protocol) header 的方法,一种开始新行的方法(二进制复制协议(protocol)要求您为要插入的每一行写入列数)和一个 write方法,它需要一个 IValueHandler<TTargetType>用于编写给定的 Java 类型。

PgBinaryWriter实现一个 AutoClosable , 因为有必要写一个 -1在冲洗和关闭流之前添加到流中。

IValueHandler<TTargetType>需要 DataOutputStream和一个值。它负责使用 PostgreSQL 二进制协议(protocol)格式写入给定值。

PgBinaryWriter

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;


import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;

public class PgBinaryWriter implements AutoCloseable {

/** The ByteBuffer to write the output. */
private transient DataOutputStream buffer;

public PgBinaryWriter() {
}

public void open(final OutputStream out) {
buffer = new DataOutputStream(new BufferedOutputStream(out));

writeHeader();
}

private void writeHeader() {
try {

// 11 bytes required header
buffer.writeBytes("PGCOPY\n\377\r\n\0");
// 32 bit integer indicating no OID
buffer.writeInt(0);
// 32 bit header extension area length
buffer.writeInt(0);

} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}

public void startRow(int numColumns) {
try {
buffer.writeShort(numColumns);
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}

public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
handler.handle(buffer, value);
}

@Override
public void close() {
try {
buffer.writeShort(-1);

buffer.flush();
buffer.close();
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
}

值处理器

IValueHandler是一个简单的界面,其中有一个 handle采取DataOutputStream的方法和一个值。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public interface IValueHandler<TTargetType> extends ValueHandler {

void handle(DataOutputStream buffer, final TTargetType value);

Type getTargetType();

}

了解协议(protocol)很重要,您必须编写 -1当值为空时。为此,我编写了一个抽象基类来处理这种情况。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

@Override
public void handle(DataOutputStream buffer, final T value) {
try {
if (value == null) {
buffer.writeInt(-1);
return;
}
internalHandle(buffer, value);
} catch (Exception e) {
throw new BinaryWriteFailedException(e);
}
}

protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}

然后可以实现各种 Java 类型的处理程序。这是 long 的示例.你可以找到GitHub 存储库 ( handlers ) 中的其他实现。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public class LongValueHandler extends BaseValueHandler<Long> {

@Override
protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
buffer.writeInt(8);
buffer.writeLong(value);
}

@Override
public Type getTargetType() {
return Long.class;
}
}

使用 PgBinaryWriter

现在终于要连接零件了。请注意,我已经抽象了更多部分。可能需要在代码中查找更多实现细节。

public abstract class PgBulkInsert<TEntity> {

// ...

public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

CopyManager cpManager = connection.getCopyAPI();
CopyIn copyIn = cpManager.copyIn(getCopyCommand());

int columnCount = columns.size();

try (PgBinaryWriter bw = new PgBinaryWriter()) {

// Wrap the CopyOutputStream in our own Writer:
bw.open(new PGCopyOutputStream(copyIn));

// Insert all entities:
entities.forEach(entity -> {

// Start a New Row:
bw.startRow(columnCount);

// Insert the Column Data:
columns.forEach(column -> {
try {
column.getWrite().invoke(bw, entity);
} catch (Exception e) {
throw new SaveEntityFailedException(e);
}
});
});
}
}

private String getCopyCommand()
{
String commaSeparatedColumns = columns.stream()
.map(x -> x.columnName)
.collect(Collectors.joining(", "));

return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
table.GetFullQualifiedTableName(),
commaSeparatedColumns);
}
}

PgBulkInsert

PgBulkInsert 支持以下 PostgreSQL 数据类型。

基本用法

想象一下,应该将大量人员批量插入到 PostgreSQL 数据库中。每个Person有名字、姓氏和出生日期。

数据库表

PostgreSQL 数据库中的表可能如下所示:

CREATE TABLE sample.unit_test
(
first_name text,
last_name text,
birth_date date
);

领域模型

应用程序中的域模型可能如下所示:

private class Person {

private String firstName;

private String lastName;

private LocalDate birthDate;

public Person() {}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

public LocalDate getBirthDate() {
return birthDate;
}

public void setBirthDate(LocalDate birthDate) {
this.birthDate = birthDate;
}

}

批量插入器

然后你必须实现 PgBulkInsert<Person> ,它定义了表和域模型之间的映射。

public class PersonBulkInserter extends PgBulkInsert<Person>
{
public PersonBulkInserter() {
super("sample", "unit_test");

MapString("first_name", Person::getFirstName);
MapString("last_name", Person::getLastName);
MapDate("birth_date", Person::getBirthDate);
}
}

使用批量插入器

最后我们可以编写单元测试来插入 100000人员入库。您可以在 GitHub 上找到整个单元测试:IntegrationTest.java .

@Test
public void bulkInsertPersonDataTest() throws SQLException {
// Create a large list of Persons:
List<Person> persons = getPersonList(100000);

// Create the BulkInserter:
PersonBulkInserter personBulkInserter = new PersonBulkInserter();

// Now save all entities of a given stream:
personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());

// And assert all have been written to the database:
Assert.assertEquals(100000, getRowCount());
}

private List<Person> getPersonList(int numPersons) {
List<Person> persons = new ArrayList<>();

for (int pos = 0; pos < numPersons; pos++) {
Person p = new Person();

p.setFirstName("Philipp");
p.setLastName("Wagner");
p.setBirthDate(LocalDate.of(1986, 5, 12));

persons.add(p);
}

return persons;
}

关于为 Postgres COPY 编写二进制格式的 Java 库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14242117/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com