org.apache.kafka.connect.data.Struct.schema()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(311)

本文整理了Java中org.apache.kafka.connect.data.Struct.schema()方法的一些代码示例,展示了Struct.schema()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Struct.schema()方法的具体详情如下:
包路径:org.apache.kafka.connect.data.Struct
类名称:Struct
方法名:schema

Struct.schema介绍

[英]Get the schema for this Struct.
[中]获取此结构的架构。

代码示例

代码示例来源:origin: confluentinc/ksql

@Override
 public void serialize(
   final Struct struct,
   final JsonGenerator jsonGenerator,
   final SerializerProvider serializerProvider
 ) throws IOException {
  struct.validate();
  jsonGenerator.writeObject(
    objectMapper.readTree(jsonConverter.fromConnectData("", struct.schema(), struct)));
 }
}

代码示例来源:origin: confluentinc/ksql

@Override
public byte[] serialize(final String topic, final GenericRow genericRow) {
 if (genericRow == null) {
  return null;
 }
 final Struct struct = translator.toConnectRow(genericRow);
 try {
  return converter.fromConnectData(topic, struct.schema(), struct);
 } catch (final Exception e) {
  throw new SerializationException(
    "Error serializing row to topic " + topic + " using Converter API", e);
 }
}

代码示例来源:origin: debezium/debezium

private static Object[] valuesFor(Struct struct) {
  Object[] array = new Object[struct.schema().fields().size()];
  int index = 0;
  for (Field field : struct.schema().fields()) {
    array[index] = struct.get(field);
    ++index;
  }
  return array;
}

代码示例来源:origin: debezium/debezium

private Struct updateValue(Schema newValueSchema, Struct oldValue) {
  final Struct newValue = new Struct(newValueSchema);
  for (org.apache.kafka.connect.data.Field field : oldValue.schema().fields()) {
    newValue.put(field.name(), oldValue.get(field));
  }
  return newValue;
}

代码示例来源:origin: debezium/debezium

private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) {
  final Struct newKey = new Struct(newKeySchema);
  for (org.apache.kafka.connect.data.Field field : oldKey.schema().fields()) {
    newKey.put(field.name(), oldKey.get(field));
  }
  String physicalTableIdentifier = oldTopic;
  if (keyFieldRegex != null) {
    physicalTableIdentifier = keyRegexReplaceCache.get(oldTopic);
    if (physicalTableIdentifier == null) {
      final Matcher matcher = keyFieldRegex.matcher(oldTopic);
      if (matcher.matches()) {
        physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement);
        keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier);
      }
      else {
        physicalTableIdentifier = oldTopic;
      }
    }
  }
  newKey.put(keyFieldName, physicalTableIdentifier);
  return newKey;
}

代码示例来源:origin: debezium/debezium

private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
  final Struct newEnvelope = new Struct(newEnvelopeSchema);
  final Schema newValueSchema = newEnvelopeSchema.field(Envelope.FieldName.BEFORE).schema();
  for (org.apache.kafka.connect.data.Field field : oldEnvelope.schema().fields()) {
    final String fieldName = field.name();
    Object fieldValue = oldEnvelope.get(field);
    if ((Objects.equals(fieldName, Envelope.FieldName.BEFORE) || Objects.equals(fieldName, Envelope.FieldName.AFTER))
        && fieldValue != null) {
      fieldValue = updateValue(newValueSchema, requireStruct(fieldValue, "Updating schema"));
    }
    newEnvelope.put(fieldName, fieldValue);
  }
  return newEnvelope;
}

代码示例来源:origin: debezium/debezium

/**
   * Obtain the operation for the given source record.
   *
   * @param record the source record; may not be null
   * @return the operation, or null if no valid operation was found in the record
   */
  public static Operation operationFor(SourceRecord record) {
    Struct value = (Struct) record.value();
    Field opField = value.schema().field(FieldName.OPERATION);
    if (opField != null) {
      return Operation.forCode(value.getString(opField.name()));
    }
    return null;
  }
}

代码示例来源:origin: debezium/debezium

protected String getAffectedDatabase(SourceRecord record) {
  Struct value = (Struct) record.value();
  if (value != null) {
    Field dbField = value.schema().field(HistoryRecord.Fields.DATABASE_NAME);
    if (dbField != null) {
      return value.getString(dbField.name());
    }
  }
  return null;
}

代码示例来源:origin: debezium/debezium

/**
 * Assert that the supplied {@link Struct} is {@link Struct#validate() valid} and its {@link Struct#schema() schema}
 * matches that of the supplied {@code schema}.
 *
 * @param struct the {@link Struct} to validate; may not be null
 * @param schema the expected schema of the {@link Struct}; may not be null
 */
public static void schemaMatchesStruct(Struct struct, Schema schema) {
  // First validate the struct itself ...
  try {
    struct.validate();
  }
  catch (DataException e) {
    throw new AssertionError("The struct '" + struct + "' failed to validate", e);
  }
  Schema actualSchema = struct.schema();
  assertThat(actualSchema).isEqualTo(schema);
  fieldsInSchema(struct, schema);
}

代码示例来源:origin: debezium/debezium

protected String getAffectedDatabase(SourceRecord record) {
    Struct envelope = (Struct) record.value();
    Field dbField = envelope.schema().field(HistoryRecord.Fields.DATABASE_NAME);
    if (dbField != null) {
      return envelope.getString(dbField.name());
    }
    return null;
  }
}

代码示例来源:origin: debezium/debezium

protected static Struct valueFor(SourceRecord record) {
  Struct envelope = (Struct) record.value();
  Field afterField = envelope.schema().field(Envelope.FieldName.AFTER);
  if (afterField != null) {
    return envelope.getStruct(afterField.name());
  }
  return null;
}

代码示例来源:origin: debezium/debezium

protected static Struct sourceFor(SourceRecord record) {
    Struct envelope = (Struct) record.value();
    Field field = envelope.schema().field(Envelope.FieldName.SOURCE);
    if (field != null) {
      return envelope.getStruct(field.name());
    }
    return null;
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shoudlReplacePrimitivesCorrectly() {
 final Schema schema = SchemaBuilder.struct()
   .field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
   .optional()
   .build();
 final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
 final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
 final Struct struct = dataTranslator.toConnectRow(ksqlRow);
 assertThat(struct.get("COLUMN_NAME"), equalTo(123L));
 final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
 assertThat(translatedRow, equalTo(ksqlRow));
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shoudRenameSourceDereference() {
 final Schema schema = SchemaBuilder.struct()
   .field("STREAM_NAME.COLUMN_NAME", Schema.OPTIONAL_INT32_SCHEMA)
   .optional()
   .build();
 final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
 final GenericRow ksqlRow = new GenericRow(ImmutableList.of(123));
 final Struct struct = dataTranslator.toConnectRow(ksqlRow);
 assertThat(
   struct.schema(),
   equalTo(
     SchemaBuilder.struct()
       .name(struct.schema().name())
       .field("STREAM_NAME_COLUMN_NAME", Schema.OPTIONAL_INT32_SCHEMA)
       .optional()
       .build()
   )
 );
 assertThat(struct.get("STREAM_NAME_COLUMN_NAME"), equalTo(123));
 final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
 assertThat(translatedRow, equalTo(ksqlRow));
}

代码示例来源:origin: confluentinc/ksql

@Test
 public void shouldUseExplicitSchemaName() {
  final Schema schema = SchemaBuilder.struct()
      .field("COLUMN_NAME", Schema.OPTIONAL_INT64_SCHEMA)
      .optional()
      .build();

  String schemaFullName = "com.custom.schema";

  final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, schemaFullName);
  final GenericRow ksqlRow = new GenericRow(Collections.singletonList(123L));
  final Struct struct = dataTranslator.toConnectRow(ksqlRow);

  assertThat(struct.schema().name(), equalTo(schemaFullName));
 }
}

代码示例来源:origin: debezium/debezium

private void assertSchema(Struct valueA, Schema expected) {
    Assertions.assertThat(valueA.schema().field("after").schema().field("cola").schema()).isEqualTo(expected);
    Assertions.assertThat(valueA.schema().field("after").schema().field("colb").schema()).isEqualTo(expected);
    Assertions.assertThat(valueA.schema().field("after").schema().field("colc").schema()).isEqualTo(expected);
    Assertions.assertThat(valueA.schema().field("after").schema().field("cold").schema()).isEqualTo(expected);
  }
}

代码示例来源:origin: debezium/debezium

private void assertStruct(final Struct expectedStruct, final Struct actualStruct) {
  expectedStruct.schema().fields().stream().forEach(field -> {
    final Object expectedValue = expectedStruct.get(field);
    if (expectedValue == null) {
      assertNull(fieldName + " is present in the actual content", actualStruct.get(field.name()));
      return;
    }
    final Object actualValue = actualStruct.get(field.name());
    assertNotNull("No value found for " + fieldName, actualValue);
    assertEquals("Incorrect value type for " + fieldName, expectedValue.getClass(), actualValue.getClass());
    if (actualValue instanceof byte[]) {
      assertArrayEquals("Values don't match for " + fieldName, (byte[]) expectedValue, (byte[]) actualValue);
    } else if (actualValue instanceof Struct) {
      assertStruct((Struct)expectedValue, (Struct)actualValue);
    } else {
      assertEquals("Values don't match for " + fieldName, expectedValue, actualValue);
    }
  });
}

代码示例来源:origin: debezium/debezium

private void assertSchema(Struct content) {
    if (schema == null) {
      return;
    }
    Schema schema = content.schema();
    Field field = schema.field(fieldName);
    assertNotNull(fieldName + " not found in schema " + SchemaUtil.asString(schema), field);
    VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema);
  }
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldReplaceNullWithNull() {
 final Schema schema = SchemaBuilder.struct()
   .field(
     "COLUMN_NAME",
     SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build())
   .optional()
   .build();
 final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME);
 final GenericRow ksqlRow = new GenericRow(Collections.singletonList(null));
 final Struct struct = dataTranslator.toConnectRow(ksqlRow);
 assertThat(struct.get("COLUMN_NAME"), nullValue());
 final GenericRow translatedRow = dataTranslator.toKsqlRow(struct.schema(), struct);
 assertThat(translatedRow, equalTo(ksqlRow));
}

代码示例来源:origin: debezium/debezium

private void assertSchema(Struct content) {
    if (schema == null) {
      return;
    }
    Schema schema = content.schema();
    Field field = schema.field(fieldName);
    Assertions.assertThat(field).as(fieldName + " not found in schema " + schema).isNotNull();

    VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema);
  }
}

相关文章