本文整理了Java中org.apache.kafka.connect.data.Struct
类的一些代码示例,展示了Struct
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct
类的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
[英]A structured record containing a set of named fields with values, each field using an independent Schema. Struct objects must specify a complete Schema up front, and only fields specified in the Schema may be set.
The Struct's #put(String,Object) method returns the Struct itself to provide a fluent API for constructing complete objects:
Schema schema = SchemaBuilder.struct().name("com.example.Person")
.field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build()
Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)
[中]一种结构化记录,包含一组带值的命名字段,每个字段使用独立的模式。Struct对象必须预先指定完整的模式,并且只能设置模式中指定的字段。
Struct的#put(String,Object)方法返回Struct本身,为构建完整对象提供流畅的API:
Schema schema = SchemaBuilder.struct().name("com.example.Person")
.field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build()
Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)
代码示例来源:origin: debezium/debezium
protected Struct schemaChangeRecordKey(String databaseName) {
Struct result = new Struct(schemaChangeKeySchema);
result.put(Fields.DATABASE_NAME, databaseName);
return result;
}
代码示例来源:origin: debezium/debezium
private Struct updateValue(Schema newValueSchema, Struct oldValue) {
final Struct newValue = new Struct(newValueSchema);
for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
newValue.put(field.name(), oldValue.get(field));
}
return newValue;
}
代码示例来源:origin: debezium/debezium
/**
* Obtain the operation for the given source record.
*
* @param record the source record; may not be null
* @return the operation, or null if no valid operation was found in the record
*/
public static Operation operationFor(SourceRecord record) {
Struct value = (Struct) record.value();
Field opField = value.schema().field(FieldName.OPERATION);
if (opField != null) {
return Operation.forCode(value.getString(opField.name()));
}
return null;
}
}
代码示例来源:origin: debezium/debezium
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
final Struct envelope = (Struct)currentRecord.value();
final Struct source = (Struct)envelope.get("source");
if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
}
}
代码示例来源:origin: debezium/debezium
@Test
public void shouldGenerateRecordForInsertEvent() throws InterruptedException {
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
BsonTimestamp ts = new BsonTimestamp(1000, 1);
ObjectId objId = new ObjectId();
Document obj = new Document().append("_id", objId).append("name", "Sally");
Document event = new Document().append("o", obj)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "i");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
Struct key = (Struct) record.key();
Struct value = (Struct) record.value();
assertThat(key.schema()).isSameAs(record.keySchema());
assertThat(key.get("id")).isEqualTo("{ \"$oid\" : \"" + objId + "\"}");
assertThat(value.schema()).isSameAs(record.valueSchema());
// assertThat(value.getString(FieldName.BEFORE)).isNull();
assertThat(value.getString(FieldName.AFTER)).isEqualTo(obj.toJson(WRITER_SETTINGS));
assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.CREATE.code());
assertThat(value.getInt64(FieldName.TIMESTAMP)).isEqualTo(1002L);
Struct actualSource = value.getStruct(FieldName.SOURCE);
Struct expectedSource = source.lastOffsetStruct("rs0", collectionId);
assertThat(actualSource).isEqualTo(expectedSource);
}
代码示例来源:origin: debezium/debezium
private static Object[] valuesFor(Struct struct) {
Object[] array = new Object[struct.schema().fields().size()];
int index = 0;
for (Field field : struct.schema().fields()) {
array[index] = struct.get(field);
++index;
}
return array;
}
代码示例来源:origin: debezium/debezium
private void assertFieldAbsent(SourceRecord record, String fieldName) {
Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
try {
value.get(fieldName);
fail("field should not be present");
} catch (DataException e) {
//expected
}
}
代码示例来源:origin: debezium/debezium
private void assertBigintUnsignedPrecise(Struct value) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
assertThat(i).isNotNull();
//Validate the schema first, we are expecting org.apache.kafka.connect.data.Decimal:Byte since we are dealing with unsignd-bigint
//So Unsigned BIGINY would be an int32 type
assertThat(after.schema().field("c1").schema()).isEqualTo(Decimal.builder(0).schema());
assertThat(after.schema().field("c2").schema()).isEqualTo(Decimal.builder(0).schema());
//Validate the schema first, we are expecting int-64 since we are dealing with signed-bigint.
//So Signed BIGINT would be an INT64 type
assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT64_SCHEMA);
//Validate candidates values
switch (i) {
case 1:
assertThat(after.get("c1")).isEqualTo(new BigDecimal("18446744073709551615"));
assertThat(after.get("c2")).isEqualTo(new BigDecimal("18446744073709551615"));
assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L);
break;
case 2:
assertThat(after.get("c1")).isEqualTo(new BigDecimal("14446744073709551615"));
assertThat(after.get("c2")).isEqualTo(new BigDecimal("14446744073709551615"));
assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L);
break;
case 3:
assertThat(after.get("c1")).isEqualTo(new BigDecimal("0"));
assertThat(after.get("c2")).isEqualTo(new BigDecimal("0"));
assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L);
}
}
代码示例来源:origin: debezium/debezium
/**
* Verify that the given {@link SourceRecord} is a {@link Operation#READ READ} record.
*
* @param record the source record; may not be null
*/
public static void isValidRead(SourceRecord record) {
assertThat(record.key()).isNotNull();
assertThat(record.keySchema()).isNotNull();
assertThat(record.valueSchema()).isNotNull();
Struct value = (Struct) record.value();
assertThat(value).isNotNull();
assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.READ.code());
assertThat(value.get(FieldName.AFTER)).isNotNull();
assertThat(value.get(FieldName.BEFORE)).isNull();
}
代码示例来源:origin: debezium/debezium
private void assertSmallUnsigned(Struct value) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
assertThat(i).isNotNull();
//Validate the schema first, we are expecting int-32 since we are dealing with unsignd-smallint
//So Unsigned SMALLINT would be an int32 type
assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
//Validate the schema first, we are expecting int-16 since we are dealing with signed-smallint.
//So Signed SMALLINT would be an INT16 type
assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT16_SCHEMA);
//Validate candidates values
switch (i) {
case 1:
assertThat(after.getInt32("c1")).isEqualTo(65535);
assertThat(after.getInt32("c2")).isEqualTo(65535);
assertThat(after.getInt16("c3")).isEqualTo((short)32767);
break;
case 2:
assertThat(after.getInt32("c1")).isEqualTo(45535);
assertThat(after.getInt32("c2")).isEqualTo(45535);
assertThat(after.getInt16("c3")).isEqualTo((short)-12767);
break;
case 3:
assertThat(after.getInt32("c1")).isEqualTo(0);
assertThat(after.getInt32("c2")).isEqualTo(0);
assertThat(after.getInt16("c3")).isEqualTo((short)-32768);
}
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldSetNullRecordToNull() {
// When:
final SchemaAndValue msg = ProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.empty()
).get();
// Then:
final Struct struct = (Struct) msg.value();
final Struct deserializationError = struct.getStruct(DESERIALIZATION_ERROR);
assertThat(deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD), is(nullValue()));
}
代码示例来源:origin: debezium/debezium
private void assertPoint(Struct value) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
Testing.debug(after);
assertThat(i).isNotNull();
Double expectedX = after.getFloat64("expected_x");
Double expectedY = after.getFloat64("expected_y");
Integer expectedSrid = after.getInt32("expected_srid");
if (after.getStruct("point") != null) {
Double actualX = after.getStruct("point").getFloat64("x");
Double actualY = after.getStruct("point").getFloat64("y");
Integer actualSrid = after.getStruct("point").getInt32("srid");
//Validate the values
databaseDifferences.geometryAssertPoints(expectedX, expectedY, actualX, actualY);
assertThat(actualSrid).isEqualTo(expectedSrid);
//Test WKB
Point point = (Point) WkbGeometryReader.readGeometry(new ByteReader((byte[]) after.getStruct("point")
.get("wkb")));
databaseDifferences.geometryAssertPoints(expectedX, expectedY, point.getX(), point.getY());
} else if (expectedX != null) {
Assert.fail("Got a null geometry but didn't expect to");
}
}
代码示例来源:origin: confluentinc/ksql
@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
if (genericRow == null) {
return null;
}
final Struct struct = translator.toConnectRow(genericRow);
try {
return converter.fromConnectData(topic, struct.schema(), struct);
} catch (final Exception e) {
throw new SerializationException(
"Error serializing row to topic " + topic + " using Converter API", e);
}
}
代码示例来源:origin: debezium/debezium
private Struct offsetStructFor(String replicaSetName, String namespace, Position position, boolean isInitialSync) {
if (position == null) position = INITIAL_POSITION;
Struct result = super.struct();
result.put(SERVER_NAME, serverName);
result.put(REPLICA_SET_NAME, replicaSetName);
result.put(NAMESPACE, namespace);
result.put(TIMESTAMP, position.getTime());
result.put(ORDER, position.getInc());
result.put(OPERATION_ID, position.getOperationId());
if (isInitialSync) {
result.put(INITIAL_SYNC, true);
}
return result;
}
代码示例来源:origin: debezium/debezium
protected static Struct valueFor(SourceRecord record) {
Struct envelope = (Struct) record.value();
Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
if (afterField != null) {
return envelope.getStruct(afterField.name());
}
return null;
}
代码示例来源:origin: debezium/debezium
private void assertMediumUnsigned(Struct value) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
assertThat(i).isNotNull();
//Validate the schema first, we are expecting int-32 since we are dealing with unsignd-mediumint
//So Unsigned MEDIUMINT would be an int32 type
assertThat(after.schema().field("c1").schema()).isEqualTo(Schema.INT32_SCHEMA);
assertThat(after.schema().field("c2").schema()).isEqualTo(Schema.INT32_SCHEMA);
//Validate the schema first, we are expecting int-32 since we are dealing with signed-mediumint.
//So Signed MEDIUMINT would be an INT32 type
assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT32_SCHEMA);
//Validate candidates values
switch (i) {
case 1:
assertThat(after.getInt32("c1")).isEqualTo(16777215);
assertThat(after.getInt32("c2")).isEqualTo(16777215);
assertThat(after.getInt32("c3")).isEqualTo(8388607);
break;
case 2:
assertThat(after.getInt32("c1")).isEqualTo(10777215);
assertThat(after.getInt32("c2")).isEqualTo(10777215);
assertThat(after.getInt32("c3")).isEqualTo(-6388607);
break;
case 3:
assertThat(after.getInt32("c1")).isEqualTo(0);
assertThat(after.getInt32("c2")).isEqualTo(0);
assertThat(after.getInt32("c3")).isEqualTo(-8388608);
}
}
代码示例来源:origin: debezium/debezium
protected void assertSourceInfo(SourceRecord record) {
assertTrue(record.value() instanceof Struct);
Struct source = ((Struct) record.value()).getStruct("source");
assertNotNull(source.getString("db"));
assertNotNull(source.getString("schema"));
assertNotNull(source.getString("table"));
}
代码示例来源:origin: confluentinc/ksql
@Override
public void serialize(
final Struct struct,
final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider
) throws IOException {
struct.validate();
jsonGenerator.writeObject(
objectMapper.readTree(jsonConverter.fromConnectData("", struct.schema(), struct)));
}
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldTranslateNullValueCorrectly() {
final Schema rowSchema = SchemaBuilder.struct()
.field("INT", SchemaBuilder.OPTIONAL_INT32_SCHEMA)
.optional()
.build();
final Struct connectStruct = new Struct(rowSchema);
final ConnectDataTranslator connectToKsqlTranslator = new ConnectDataTranslator(rowSchema);
final GenericRow row = connectToKsqlTranslator.toKsqlRow(rowSchema, connectStruct);
assertThat(row.getColumns().size(), equalTo(1));
assertThat(row.getColumnValue(0), is(nullValue()));
}
代码示例来源:origin: debezium/debezium
protected void verifyOperation(SourceRecord record, Operation expected) {
Struct value = (Struct) record.value();
assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(expected.code());
}
内容来源于网络,如有侵权,请联系作者删除!