我已经创建了转换json->avro的php代码。
# !/usr/bin/php
<?php
require_once 'vendor/autoload.php';
error_reporting(E_ALL);
ini_set('display_errors', 1);
//$outputFile = __DIR__ . '/test_avro_out.avr';
$avroJsonSchema = file_get_contents(__DIR__ . '/HttpRequestEvent.avsc');
//Open $file_name for writing, using the given writer's schema
$avroWriter = AvroDataIO::open_file('php://stdout', 'w', $avroJsonSchema);
$counter = 1;
while (($buf = fgets(STDIN)) !== false) {
try {
//replace ,null: with ,"null": to prevent map keys which are not strings.
$original = array("null:","userIp","[null]");
$replaceWith = array("\"null\":", "userIP","[]");
$data = json_decode(str_replace($original, $replaceWith, $buf), true);
if ($data === false || $data == null ) {
throw new InvalidArgumentException("Unable to parse JSON line");
}
$mapped = map_request_event($data);
$avroWriter->append($mapped);
} catch (Exception $ex) {
fprintf(STDERR, "Caught exception: %s\n", $ex->getMessage());
fprintf(STDERR, "Line num: %s\n",$counter);
fprintf(STDERR, "buf: %s\n", $buf);
}
$counter++;
}
$avroWriter->close();
function map_request_event($data)
{
if (empty($data['data']['__props']) || empty($data['data']['__props']['eventData']) || !isset($data['data']['__props']['eventData']['__props'])) {
throw new InvalidArgumentException("Got invalid event object");
}
$tsData = $data['data']['__props'];
$data = $data['data']['__props']['eventData']['__props'];
$reqData = $data['generalInformation']['__props'];
$mapped = array(
'request_id' => safe_read_key($reqData,'requestId'),
'timestamp' => (double)safe_read_key($tsData,'timeStamp'),
'account_id' => intval(safe_read_key($reqData,'accountId')),
'site_id' => intval(safe_read_key($reqData,'siteId')),
'user_id' => (string)safe_read_key($reqData,'userId'),
'http_return_code' => (string)safe_read_key($reqData,'httpReturnCode'),
'request_headers' => safe_read_key($reqData,'requestHeaders'), // FIXME: Make sure header values do not contain NULL
'session_id' => (string)safe_read_key($reqData,'sessionId'),
'request_method' => safe_read_key($reqData,'httpRequestMethod'),
'request_time' => (double)safe_read_key($reqData,'requestTime'),
'user_agent' => (string)safe_read_key($reqData,'userAgent'),
'referrer_uri' => (string)safe_read_key($reqData,'referrerURI'),
'request_uri' => safe_read_key($reqData,'requestURI'),
//'url_params_get' => safe_read_key($reqData,'urlParamsGet'),
//'url_params_post' => safe_read_key($reqData,'urlParamsPost'),
'server' => safe_read_key($reqData,'server'),
'user_ip' => safe_read_key($reqData,'userIP'),
'charset' => safe_read_key($reqData,'charset'),
'language' => safe_read_key($reqData,'language'),
//'full_url' => safe_read_key($reqData,'fullUrl'),
'activities' => array_map('map_activity', safe_read_key($data,'logEventArray')),
);
return $mapped;
}
function map_activity($data)
{
$activity = $data['__props'];
$mapped = array(
'activity_time' => $activity['dateTime'],
'activity_id' => intval($activity['activityId']),
'scenario' => (string)$activity['scenario'],
'count' => intval($activity['count']),
);
return $mapped;
}
function safe_read_key($data, $key, $default = null)
{
if (!isset($data[$key]) && !array_key_exists($key, $data)) {
if ($default !== null) {
return $default;
} else {
throw new Exception('Key Does not exist');
}
}
return $data[$key];
}
我正在使用emr流和s3作为输入/输出。此代码设置为Map器,而reducer设置为none。
在同一输入文件上运行此代码时:
在本地机器上->我得到一个普通的avro文件。
在emr->文件已损坏,无法读取。
以下是压缩的两个结果文件的链接:http://speedy.sh/p9fhd/test.7z
part-00000 is the EMR Avro result file.
test.avro is the local one.
我试着比较这些。有一个模式,但不足以理解哪里出了问题。
我使用Map绘制器有什么问题吗?比较两个二进制文件可以扣除什么?
暂无答案!
目前还没有任何答案,快来回答吧!