本文整理了Java中org.apache.kafka.connect.data.Struct.schema()
方法的一些代码示例,展示了Struct.schema()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.schema()
方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:schema
[英]Get the schema for this Struct.
[中]获取此结构的架构。
代码示例来源:origin: confluentinc/ksql
@Override
public void serialize(
final Struct struct,
final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider
) throws IOException {
struct.validate();
jsonGenerator.writeObject(
objectMapper.readTree(jsonConverter.fromConnectData("", struct.schema(), struct)));
}
}
代码示例来源:origin: confluentinc/ksql
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
if (genericRow == null) {
return null;
}
final Struct struct = translator.toConnectRow(genericRow);
try {
return converter.fromConnectData(topic, struct.schema(), struct);
} catch (final Exception e) {
throw new SerializationException(
"Error serializing row to topic " + topic + " using Converter API", e);
}
}
代码示例来源:origin: debezium/debezium
private static Object[] valuesFor(Struct struct) {
Object[] array = new Object[struct.schema().fields().size()];
int index = 0;
for (Field field : struct.schema().fields()) {
array[index] = struct.get(field);
++index;
}
return array;
}
代码示例来源:origin: debezium/debezium
private Struct updateValue(Schema newValueSchema, Struct oldValue) {
final Struct newValue = new Struct(newValueSchema);
for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
newValue.put(field.name(), oldValue.get(field));
}
return newValue;
}
代码示例来源:origin: debezium/debezium
private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) {
final Struct newKey = new Struct(newKeySchema);
for (org.apache.kafka.connect.data.Field field : oldKey.schema().fields()) {
newKey.put(field.name(), oldKey.get(field));
}
String physicalTableIdentifier = oldTopic;
if (keyFieldRegex != null) {
physicalTableIdentifier = keyRegexReplaceCache.get(oldTopic);
if (physicalTableIdentifier == null) {
final Matcher matcher = keyFieldRegex.matcher(oldTopic);
if (matcher.matches()) {
physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement);
keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier);
}
else {
physicalTableIdentifier = oldTopic;
}
}
}
newKey.put(keyFieldName, physicalTableIdentifier);
return newKey;
}
代码示例来源:origin: debezium/debezium
private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
final Struct newEnvelope = new Struct(newEnvelopeSchema);
final Schema newValueSchema = newEnvelopeSchema.field(Envelope.FieldName.BEFORE).schema();
for (org.apache.kafka.connect.data.Field field : oldEnvelope.schema().fields()) {
final String fieldName = field.name();
Object fieldValue = oldEnvelope.get(field);
if ((Objects.equals(fieldName, Envelope.FieldName.BEFORE) || Objects.equals(fieldName, Envelope.FieldName.AFTER))
&& fieldValue != null) {
fieldValue = updateValue(newValueSchema, requireStruct(fieldValue, "Updating schema"));
}
newEnvelope.put(fieldName, fieldValue);
}
return newEnvelope;
}
代码示例来源:origin: debezium/debezium
/**
* Obtain the operation for the given source record.
*
* @param record the source record; may not be null
* @return the operation, or null if no valid operation was found in the record
*/
public static Operation operationFor(SourceRecord record) {
Struct value = (Struct) record.value();
Field opField = value.schema().field(FieldName.OPERATION);
if (opField != null) {
return Operation.forCode(value.getString(opField.name()));
}
return null;
}
}
代码示例来源:origin: debezium/debezium
protected String getAffectedDatabase(SourceRecord record) {
Struct value = (Struct) record.value();
if (value != null) {
Field dbField = value.schema().field(HistoryRecord.Fields.DATABASE_NAME);
if (dbField != null) {
return value.getString(dbField.name());
}
}
return null;
}
代码示例来源:origin: debezium/debezium
/**
* Assert that the supplied {@link Struct} is {@link Struct#validate() valid} and its {@link Struct#schema() schema}
* matches that of the supplied {@code schema}.
*
* @param struct the {@link Struct} to validate; may not be null
* @param schema the expected schema of the {@link Struct}; may not be null
*/
public static void schemaMatchesStruct(Struct struct, Schema schema) {
// First validate the struct itself ...
try {
struct.validate();
}
catch (DataException e) {
throw new AssertionError("The struct '" + struct + "' failed to validate", e);
}
Schema actualSchema = struct.schema();
assertThat(actualSchema).isEqualTo(schema);
fieldsInSchema(struct, schema);
}
代码示例来源:origin: debezium/debezium
protected String getAffectedDatabase(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field dbField = envelope.schema().field(HistoryRecord.Fields.DATABASE_NAME);
if (dbField != null) {
return envelope.getString(dbField.name());
}
return null;
}
}
代码示例来源:origin: debezium/debezium
protected static Struct valueFor(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
if (afterField != null) {
return envelope.getStruct(afterField.name());
}
return null;
}
代码示例来源:origin: debezium/debezium
protected static Struct sourceFor(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field field = envelope.schema().field(Envelope.FieldName.SOURCE);
if (field != null) {
return envelope.getStruct(field.name());
}
return null;
}
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shoudlReplacePrimitivesCorrectly() {
final Schema schema = SchemaBuilder.struct()
.field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
.optional()
.build();
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(struct.get("COLUMN_NAME"), equalTo(123L));
final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
assertThat(translatedRow, equalTo(ksqlRow));
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shoudRenameSourceDereference() {
final Schema schema = SchemaBuilder.struct()
.field("STREAM_NAME.COLUMN_NAME", Schema.OPTIONAL_INT32_SCHEMA)
.optional()
.build();
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
final GenericRow ksqlRow = new GenericRow(ImmutableList.of(123));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(
struct.schema(),
equalTo(
SchemaBuilder.struct()
.name(struct.schema().name())
.field("STREAM_NAME_COLUMN_NAME", Schema.OPTIONAL_INT32_SCHEMA)
.optional()
.build()
)
);
assertThat(struct.get("STREAM_NAME_COLUMN_NAME"), equalTo(123));
final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
assertThat(translatedRow, equalTo(ksqlRow));
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldUseExplicitSchemaName() {
final Schema schema = SchemaBuilder.struct()
.field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
.optional()
.build();
String schemaFullName = "com.custom.schema";
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, schemaFullName);
final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(struct.schema().name(), equalTo(schemaFullName));
}
}
代码示例来源:origin: debezium/debezium
private void assertSchema(Struct valueA, Schema expected) {
Assertions.assertThat(valueA.schema().field("after").schema().field("cola").schema()).isEqualTo(expected);
Assertions.assertThat(valueA.schema().field("after").schema().field("colb").schema()).isEqualTo(expected);
Assertions.assertThat(valueA.schema().field("after").schema().field("colc").schema()).isEqualTo(expected);
Assertions.assertThat(valueA.schema().field("after").schema().field("cold").schema()).isEqualTo(expected);
}
}
代码示例来源:origin: debezium/debezium
private void assertStruct(final Struct expectedStruct, final Struct actualStruct) {
expectedStruct.schema().fields().stream().forEach(field -> {
final Object expectedValue = expectedStruct.get(field);
if (expectedValue == null) {
assertNull(fieldName + " is present in the actual content", actualStruct.get(field.name()));
return;
}
final Object actualValue = actualStruct.get(field.name());
assertNotNull("No value found for " + fieldName, actualValue);
assertEquals("Incorrect value type for " + fieldName, expectedValue.getClass(), actualValue.getClass());
if (actualValue instanceof byte[]) {
assertArrayEquals("Values don't match for " + fieldName, (byte[]) expectedValue, (byte[]) actualValue);
} else if (actualValue instanceof Struct) {
assertStruct((Struct)expectedValue, (Struct)actualValue);
} else {
assertEquals("Values don't match for " + fieldName, expectedValue, actualValue);
}
});
}
代码示例来源:origin: debezium/debezium
private void assertSchema(Struct content) {
if (schema == null) {
return;
}
Schema schema = content.schema();
Field field = schema.field(fieldName);
assertNotNull(fieldName + " not found in schema " + SchemaUtil.asString(schema), field);
VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema);
}
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldReplaceNullWithNull() {
final Schema schema = SchemaBuilder.struct()
.field(
"COLUMN_NAME",
SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build())
.optional()
.build();
final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
final GenericRow ksqlRow = new GenericRow(Collections.singletonList(null));
final Struct struct = dataTranslator.toConnectRow(ksqlRow);
assertThat(struct.get("COLUMN_NAME"), nullValue());
final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
assertThat(translatedRow, equalTo(ksqlRow));
}
代码示例来源:origin: debezium/debezium
private void assertSchema(Struct content) {
if (schema == null) {
return;
}
Schema schema = content.schema();
Field field = schema.field(fieldName);
Assertions.assertThat(field).as(fieldName + " not found in schema " + schema).isNotNull();
VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema);
}
}
内容来源于网络,如有侵权,请联系作者删除!