- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我必须编写一些一次性的 Beam/Dataflow 管道,这些管道从 BigQuery 读取、提取两个字段,然后将它们写入其他地方。我计划只索引到 GenericRecord
,而不是尝试根据 BigQuery 架构设置自动生成的 Avro 代码。使用 BigQueryIO.read(SerializableFunction<SchemaAndRecord, T>
,然后将我关心的字段转换为其类型。
不幸的是,我找不到任何有关 BigQuery 架构类型映射到 Java 类型的文档。经过一番研究,映射看起来是这样的:
INTEGER
-> Integer
STRING
-> org.apache.avro.util.Utf8
BYTES
-> java.nio.ByteBuffer
TIMESTAMP
->?RECORD
->?是否有有关 BQ 类型如何映射到 Beam 中的 Java 类型的文档?有谁知道完整的映射/有没有比反复试验更好的方法来解决这个问题?
最佳答案
正如 Elliott 在评论中指出的那样,看来您正是从 Apache 软件基金会本身寻找这个库:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* A set of utilities for working with Avro files.
*
* <p>These utilities are based on the <a href="https://avro.apache.org/docs/1.8.1/spec.html">Avro
* 1.8.1</a> specification.
*/
class BigQueryAvroUtils {
public static final ImmutableMap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
ImmutableMap.<String, Type>builder()
.put("STRING", Type.STRING)
.put("BYTES", Type.BYTES)
.put("INTEGER", Type.LONG)
.put("FLOAT", Type.DOUBLE)
.put("NUMERIC", Type.BYTES)
.put("BOOLEAN", Type.BOOLEAN)
.put("TIMESTAMP", Type.LONG)
.put("RECORD", Type.RECORD)
.put("DATE", Type.STRING)
.put("DATETIME", Type.STRING)
.put("TIME", Type.STRING)
.build();
/**
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
* immutable.
*/
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
// Package private for BigQueryTableRowIterator to use.
static String formatTimestamp(String timestamp) {
// timestamp is in "seconds since epoch" format, with scientific notation.
// e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
long timestampMicros = (long) timestampDoubleMicros;
long seconds = timestampMicros / 1000000;
int micros = (int) (timestampMicros % 1000000);
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
// No sub-second component.
if (micros == 0) {
return String.format("%s UTC", dayAndTime);
}
// Sub-second component.
int digits = 6;
int subsecond = micros;
while (subsecond % 10 == 0) {
digits--;
subsecond /= 10;
}
String formatString = String.format("%%0%dd", digits);
String fractionalSeconds = String.format(formatString, subsecond);
return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
}
/**
* Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
*
* <p>See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">"Avro
* format"</a> for more information.
*/
static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
return convertGenericRecordToTableRow(record, schema.getFields());
}
private static TableRow convertGenericRecordToTableRow(
GenericRecord record, List<TableFieldSchema> fields) {
TableRow row = new TableRow();
for (TableFieldSchema subSchema : fields) {
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
// is required, so it may not be null.
Field field = record.getSchema().getField(subSchema.getName());
Object convertedValue =
getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
if (convertedValue != null) {
// To match the JSON files exported by BigQuery, do not include null values in the output.
row.set(field.name(), convertedValue);
}
}
return row;
}
@Nullable
private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
// is optional (and so it may be null), but defaults to "NULLABLE".
String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
switch (mode) {
case "REQUIRED":
return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v);
case "REPEATED":
return convertRepeatedField(schema, fieldSchema, v);
case "NULLABLE":
return convertNullableField(schema, fieldSchema, v);
default:
throw new UnsupportedOperationException(
"Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
}
}
private static List<Object> convertRepeatedField(
Schema schema, TableFieldSchema fieldSchema, Object v) {
Type arrayType = schema.getType();
verify(
arrayType == Type.ARRAY,
"BigQuery REPEATED field %s should be Avro ARRAY, not %s",
fieldSchema.getName(),
arrayType);
// REPEATED fields are represented as Avro arrays.
if (v == null) {
// Handle the case of an empty repeated field.
return new ArrayList<>();
}
@SuppressWarnings("unchecked")
List<Object> elements = (List<Object>) v;
ArrayList<Object> values = new ArrayList<>();
Type elementType = schema.getElementType().getType();
LogicalType elementLogicalType = schema.getElementType().getLogicalType();
for (Object element : elements) {
values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element));
}
return values;
}
private static Object convertRequiredField(
Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
// is required, so it may not be null.
String bqType = fieldSchema.getType();
Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType);
verify(
avroType == expectedAvroType,
"Expected Avro schema type %s, not %s, for BigQuery %s field %s",
expectedAvroType,
avroType,
bqType,
fieldSchema.getName());
// For historical reasons, don't validate avroLogicalType except for with NUMERIC.
// BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
switch (fieldSchema.getType()) {
case "STRING":
case "DATE":
case "DATETIME":
case "TIME":
// Avro will use a CharSequence to represent String objects, but it may not always use
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
return v.toString();
case "INTEGER":
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
return ((Long) v).toString();
case "FLOAT":
verify(v instanceof Double, "Expected Double, got %s", v.getClass());
return v;
case "NUMERIC":
// NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
// converted back to Strings with precision and scale determined by the logical type.
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
verifyNotNull(avroLogicalType, "Expected Decimal logical type");
verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
BigDecimal numericValue =
new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
return numericValue.toString();
case "BOOLEAN":
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
return v;
case "TIMESTAMP":
// TIMESTAMP data types are represented as Avro LONG types. They are converted back to
// Strings with variable-precision (up to six digits) to match the JSON files export
// by BigQuery.
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
Double doubleValue = ((Long) v) / 1000000.0;
return formatTimestamp(doubleValue.toString());
case "RECORD":
verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
case "BYTES":
verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
ByteBuffer byteBuffer = (ByteBuffer) v;
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
return BaseEncoding.base64().encode(bytes);
default:
throw new UnsupportedOperationException(
String.format(
"Unexpected BigQuery field schema type %s for field named %s",
fieldSchema.getType(), fieldSchema.getName()));
}
}
@Nullable
private static Object convertNullableField(
Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
// NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
verify(
avroSchema.getType() == Type.UNION,
"Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
avroSchema.getType(),
fieldSchema.getName());
List<Schema> unionTypes = avroSchema.getTypes();
verify(
unionTypes.size() == 2,
"BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
fieldSchema.getName(),
unionTypes);
if (v == null) {
return null;
}
Type firstType = unionTypes.get(0).getType();
if (!firstType.equals(Type.NULL)) {
return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v);
}
return convertRequiredField(
unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);
}
static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fieldSchemas) {
List<Field> avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
avroFields.add(convertField(bigQueryField));
}
return Schema.createRecord(
schemaName,
"org.apache.beam.sdk.io.gcp.bigquery",
"Translated Avro Schema for " + schemaName,
false,
avroFields);
}
private static Field convertField(TableFieldSchema bigQueryField) {
Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
Schema elementSchema;
if (avroType == Type.RECORD) {
elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());
} else {
elementSchema = Schema.create(avroType);
}
Schema fieldSchema;
if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
} else if ("REQUIRED".equals(bigQueryField.getMode())) {
fieldSchema = elementSchema;
} else if ("REPEATED".equals(bigQueryField.getMode())) {
fieldSchema = Schema.createArray(elementSchema);
} else {
throw new IllegalArgumentException(
String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
}
return new Field(
bigQueryField.getName(),
fieldSchema,
bigQueryField.getDescription(),
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
}
}
关于java - Avro GenericRecords、BigQuery 和 Beam,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51459902/
我有一个 GenericRecord,并且想要迭代整个键/值集合。记录是一个java数据结构,相当于一个普通的json字符串。例如: {"key1":"val1","key2":val2",.
我正在使用 Avro,我有一个 GenericRecord .我想提取 clientId , deviceName , holder从中。在 Avro 架构中,clientId是整数,deviceNa
给定一个 GenericRecord ,与对象相反,检索类型化值的推荐方法是什么?我们是否期望转换这些值,如果是,来自 Avro types 的映射是什么?到 Java 类型?例如,Avro Arra
我正在尝试发布 Avro(到 Kafka)并在尝试使用 BinaryEncoder 编写 Avro 对象时得到一个 NullPointerException。 这是简化的堆栈跟踪: java.lang
我正在尝试将 GenericRecord 转换为 json 字符串,以便我可以将其传递给 JSONObject 之类的东西。我正在考虑使用 JsonEncoder 来做到这一点。现在我有类似的东西:
我必须编写一些一次性的 Beam/Dataflow 管道,这些管道从 BigQuery 读取、提取两个字段,然后将它们写入其他地方。我计划只索引到 GenericRecord,而不是尝试根据 BigQ
我有以下架构: { "name": "AgentRecommendationList", "type": "record", "fields": [ {
我的 KafkaProducer 能够使用 KafkaAvroSerializer 将对象序列化到我的主题。但是,KafkaConsumer.poll() 返回反序列化的 GenericRecord
我想使用 Apache Avro 来序列化我的数据,我的客户端是用 C++ 编写的,而我的服务器是用 Java 编写的。 我的服务器 java 代码如下所示: Schema scm = new Sch
我有一段代码可以使用函数 avroToRowConverter() 将我的 avro 记录转换为 Row directKafkaStream.foreachRDD(rdd -> { J
如何将 Avro GenericRecord 转换为 Json,同时使用从毫秒到日期时间的时间戳字段? 目前使用 Avro 1.8.2 Timestamp tsp = new Timestam
我使用 Avro(序列化器和反序列化器)从 kafka 主题获取推文。然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。现在我想将每个
我使用 Avro(序列化器和反序列化器)从 kafka 主题获取推文。然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。现在我想将每个
我有一个通用记录,如下所示,其中 holder 是一个值为字符串的映射。 { "name" : "holder", "type" : { "type" : "map",
我有来自 avro 的自动生成的 Agr.java 模式文件。我在尝试转换为 GenericRecord 时看到此错误。 (FileStreamer.java:processFile(181)) -
我有一个带有嵌套字段的 GenericRecord。当我使用 genericRecord.get(1) 时,它返回一个包含嵌套 AVRO 数据的对象。 我希望能够像 genericRecord.get
在 apache beam 步骤中,我有一个 PCollection KV>>> 。我想将可迭代中的所有记录写入同一个 Parquet 文件中。我的代码片段如下 p.apply(ParDo.of(ne
Avro SpecificRecord(即生成的 java 类)是否与模式演变兼容? IE。如果我有 Avro 消息源(在我的例子中是 kafka)并且我想将这些消息反序列化为特定记录,是否可以安全地
我正在构建一个读取 Avro 通用记录的管道。要在阶段之间传递 GenericRecord,我需要注册 AvroCoder。文档说如果我使用通用记录,架构参数可以是任意的:https://beam.a
我正在尝试创建一个 Java 生产者,将 Avro 流式传输到 kafka 主题。我尝试重现 Confluent's official documentation 中提供的示例 但是无法找到 Gene
我是一名优秀的程序员,十分优秀!