我尝试使用多个Goroutine读取同一个文件,其中每个Goroutine都被分配了一个开始阅读的字节和一些读取lineLimit
的行。
通过将csv.ChunkSize
选项设置为chunkSize
变量,当文件适合内存时,我成功地做到了这一点。但是,当文件大于内存时,我需要减少csv.ChunkSize
选项。我也在尝试这样的事情
package main
import (
"io"
"log"
"os"
"sync"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
// A reader to read lines from the file starting from the byteOffset. The number
// of lines is specified by linesLimit.
func produce(
id int,
ch chan<- arrow.Record,
byteOffset int64,
linesLimit int64,
filename string,
wg *sync.WaitGroup,
) {
defer wg.Done()
fd, _ := os.Open(filename)
fd.Seek(byteOffset, io.SeekStart)
var remainder int64 = linesLimit % 10
limit := linesLimit - remainder
chunkSize := limit / 10
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(chunkSize)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(true),
csv.WithColumnTypes(map[string]arrow.DataType{
"Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
"End_Time": arrow.FixedWidthTypes.Timestamp_ns,
"Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
}))
reader.Retain()
defer reader.Release()
var count int64
for reader.Next() {
rec := reader.Record()
rec.Retain() // released at the other end of the channel
ch <- rec
count += rec.NumRows()
if count == limit {
if remainder != 0 {
flush(id, ch, fd, remainder)
}
break
} else if count > limit {
log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
}
}
if reader.Err() != nil {
log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
}
}
func flush(id int,
ch chan<- arrow.Record,
fd *os.File,
limit int64,
) {
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(limit)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(false),
)
reader.Retain()
defer reader.Release()
record := reader.Record()
record.Retain() // nil pointer dereference error here
ch <- record
}
我尝试了前面代码的多个版本,包括:
1.复制文件描述符
1.复制文件描述符的偏移量,打开相同的文件并查找该偏移量。
1.在调用flush
之前关闭第一个读取器或关闭第一个fd
。
无论我如何更改代码,错误似乎都是一样的。请注意,任何对flush
的读取器的调用都会引发错误。包括reader.Next
和reader.Err()
。
我是不是用错了csv阅读器?这是重复使用相同文件的问题吗?
编辑:我不知道这是否有帮助,但是在flush
中打开一个新的fd而不使用任何Seek
可以避免错误(不知何故,任何Seek
都会导致原始错误出现)。但是,如果没有Seek
(即删除Seek
会导致文件的一部分无法被任何Goroutine读取)。
1条答案
按热度按时间qv7cva1a1#
主要的问题是,csv读取器在下面使用了一个
bufio.Reader
,它的默认缓冲区大小为4096。这意味着reader.Next()
将读取比所需更多的字节,并缓存额外的字节。如果直接从reader.Next()
之后的文件中读取,将丢失缓存的字节。下面的演示展示了这种行为:
看起来第二个读取器的目的是防止它阅读入另一个csv数据块。如果你事先知道下一个csv数据块的偏移量,你可以将文件 Package 在一个
io.SectionReader
中,使其只读取当前的csv数据块。目前的问题没有提供足够的信息,也许我们应该把它留给另一个问题。备注:
fd, _ := os.Open(filename)
:永远不要忽略错误。至少把它们记录下来fd
大多数时候表示文件描述符。不要将它用于*os.File
类型的变量,特别是当*os.File
具有Fd
方法时。