Splayed table upsert leading to error: `cast
Splayed table upsert leading to error: `cast
我构建了一个数据加载器原型,可将 CSV 保存到展开的 table 中。工作流程如下:
第一次创建架构,例如volatilitysurface
table:
volatilitysurface::([date:`datetime$(); ccypair:`symbol$()] atm_convention:`symbol$(); premium_included:`boolean$(); smile_type:`symbol$(); vs_type:`symbol$(); delta_ratio:`float$(); delta_setting:`float$(); wing_extrapolation:`float$(); spread_type:`symbol$());
对于原始数据文件夹中的每个文件导入它:
myfiles:@[system;"dir /b /o:gn ",string `$getenv[`KDBRAWDATA],"*.volatilitysurface.csv 2> nul";()];
if[myfiles~();.lg.o[`load;"no volatilitysurface files found!"];:0N];
.lg.o[`load;"loading data files ..."];
/ load each file
{
mypath:"" sv (string `$getenv[`KDBRAWDATA];x);
.lg.o[`load;"loading file name '",mypath,"' ..."];
myfile:hsym`$mypath;
tmp1:select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from ("ZSSSSSFFFS";enlist ",")0:myfile;
`volatilitysurface upsert tmp1;
} @/: myfiles;
delete tmp1 from `.;
.Q.gc[];
.lg.o[`done;"loading volatilitysurface data done"];
.lg.o[`save;"saving volatilitysurface schema to ",string afolder];
volatilitysurface::0!volatilitysurface;
.Q.dpft[afolder;`;`ccypair;`volatilitysurface];
.lg.o[`cleanup;"removing volatilitysurface from memory"];
delete volatilitysurface from `.;
.Q.gc[];
.lg.o[`done;"saving volatilitysurface schema done"];
这非常有效。我经常使用 .Q.gc[];
以避免碰到 wsfull
。当新的 CSV 文件可用时,我打开现有的模式,将其插入并再次保存,有效地覆盖现有的 HDB 文件系统。
打开架构:
.lg.o[`open;"tables already exists, opening the schema ..."];
@[system;"l ",(string afolder) _ 0;{.lg.e[`open;"failed to load hdb directory: ", x]; 'x}];
/ Re-create table index
volatilitysurface::`date`ccypair xkey select from volatilitysurface;
Re-运行 步骤 #2 将新的 CSV 文件附加到现有的 volatilitysurface
table,它完美地更新了第一个 CSV 但第二个 CSV 失败与:
error: `cast
我调试到错误点并仔细检查我发现 tmp1
和 volatilitysurface
的元数据完全相同。任何想法为什么会这样?我对任何其他 table 也有同样的问题。我尝试在每次更新后从 table 中清除密钥,但没有帮助,即
volatilitysurface::0!volatilitysurface;
volatilitysurface::`date`ccypair xkey volatilitysurface;
以及转换错误点的元数据比较:
meta tmp1
c | t f a
------------------| -----
date | z
ccypair | s
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
meta volatilitysurface
c | t f a
------------------| -----
date | z
ccypair | s p
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
UPDATE 使用下面的答案输入我尝试使用 Torq 的 .loader.loadallfiles
函数(它没有失败但也没有任何反应,table 没有在内存中创建,数据没有写入数据库):
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;`:hdb; {[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); `:rawdata]
UDPATE2 这是我从 TorQ 得到的输出:
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|**** LOADING :rawdata/20171102_113420.disccurve.csv ****
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|reading in data chunk
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Read 10000 rows
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|processing data
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Enumerating
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4525 rows to :hdb/2017.09.12/volatilitysurface/
2017.11.20D08:46:12.581819000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4744 rows to :hdb/2017.09.13/volatilitysurface/
2017.11.20D08:46:12.659823000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 731 rows to :hdb/2017.09.14/volatilitysurface/
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|init|retrieving sort settings from :C:/Dev/torq//config/sort.csv
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sort|sorting the volatilitysurface table
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sortfunction|sorting :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time
2017.11.20D08:46:12.753428000|wsp18497wn|dataloader|dataloader1|ERR|sortfunction|failed to sort :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time. The error was: hdb/2017.09.
我收到以下错误 sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
这个 sorttab 记录在哪里?它默认使用 table PK 吗?
UPDATE3 好的,通过在我的 config
文件夹下提供非默认 sort.csv
修复了 UPDATE2:
tabname,att,column,sort
default,p,sym,1
default,,time,1
volatilitysurface,,date,1
volatilitysurface,,ccypair,1
但现在我发现,如果我对同一个文件多次调用该函数,它只会附加重复的数据,而不是 upsert
ing 它。
UPDATE4 还没有...假设我可以检查以确保没有使用重复文件。当我加载然后启动数据库时,我得到了一些类似于某种字典的结构,而不是 table.
2017.10.31| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.01| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.02| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.03| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
sym | `AUDNOK`AUDCNH`AUDJPY`AUDHKD`AUDCHF`AUDSGD`AUDCAD`AUDDKK`CADSGD`C..
请注意,日期实际上是日期时间 Z 而不仅仅是日期。我的函数调用的完整和最新版本是:
target:hsym `$("" sv ("./";getenv[`KDBHDB];"/volatilitysurface"));
rawdatadir:hsym `$getenv[`KDBRAWDATA];
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;target;`date;{[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); rawdatadir];
`转换错误是指未被枚举的值
我在这里看不到任何枚举,磁盘上的展开表需要枚举符号列。例如,这可以在调用 .Q.dpft
之前使用以下行来完成
volatilitysurface:.Q.en[afolder;volatilitysurface];
您可以考虑使用示例 CSV 加载程序来加载数据。一个这样的例子包含在 TorQ 中,这是由 AquaQ Analytics 开发的 KDB 框架(作为免责声明,我在 AquaQ 工作)
框架可在此处(免费)获得:https://github.com/AquaQAnalytics/TorQ
您可能感兴趣的特定组件是 dataloader.q 并在此处记录:http://aquaqanalytics.github.io/TorQ/utilities/#dataloaderq
此脚本将处理所有必要的事情,加载所有文件、枚举、在磁盘上排序、应用属性等,以及使用 .Q.fsn 防止 运行 内存不足
我将在此处添加第二个答案以尝试解决有关使用 TorQ 的数据加载器的问题。
我想澄清一下您在 运行 执行此函数后得到的输出是什么?应该有一些日志消息输出,你能post这些吗?例如,当我 运行 函数时:
jmcmurray@homer ~/deploy/TorQ (master) $ q torq.q -procname loader -proctype loader -debug
<torq startup messages removed>
q).loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(c;"TSSFJFFJJBS";enlist",";`quotes;`:testdb;`date;{[p;t] select date:.z.d,time:TIME,sym:INSTRUMENT,BID,ASK from t});`:csvtest]
2017.11.17D15:03:20.312336000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140421.csv ****
2017.11.17D15:03:20.319110000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.339414000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.339463000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.339519000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.340061000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.341669000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140422.csv ****
2017.11.17D15:03:20.349606000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.370793000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.370858000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.370911000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.371441000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.460118000|homer.aquaq.co.uk|loader|loader|INF|init|retrieving sort settings from :/home/jmcmurray/deploy/TorQ/config/sort.csv
2017.11.17D15:03:20.466690000|homer.aquaq.co.uk|loader|loader|INF|sort|sorting the quotes table
2017.11.17D15:03:20.466763000|homer.aquaq.co.uk|loader|loader|INF|sorttab|No sort parameters have been specified for : quotes. Using default parameters
2017.11.17D15:03:20.466820000|homer.aquaq.co.uk|loader|loader|INF|sortfunction|sorting :testdb/2017.11.17/quotes/ by these columns : sym, time
2017.11.17D15:03:20.527216000|homer.aquaq.co.uk|loader|loader|INF|applyattr|applying p attr to the sym column in :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.535095000|homer.aquaq.co.uk|loader|loader|INF|sort|finished sorting the quotes table
完成这一切后,我可以 运行 \l testdb
并且有一个名为 "quotes" 的 table 包含我加载的数据
如果您可以 post 记录这样的消息,可能有助于了解发生了什么。
更新
"But now I see that if I call the function multiple times on the same files, it simply appends duplicated data instead of upserting it."
如果我对问题的理解正确,听起来您可能不应该对同一个文件多次调用该函数。 TorQ 中的另一个进程在这里可能会有用,“file alerter”。此进程将监视新文件和更新文件的目录,并可以对出现的任何文件调用一个函数(因此您可以让它自动为每个新文件调用加载程序函数)。它有许多选项,例如处理后移动文件(这样你就可以 "archive" 加载 CSV)
请注意,文件警报器要求函数恰好采用两个参数 - 目录和文件名。这实际上意味着您将需要围绕加载程序函数的 "wrapper" 函数,它需要一个字典和一个目录。我认为 TorQ 不包含类似于 .loader.loadallfiles 的单个文件功能,因此可能需要将目标文件复制到临时目录, 运行 在该目录上加载所有文件,然后删除在加载下一个文件之前从那里下载文件。
我构建了一个数据加载器原型,可将 CSV 保存到展开的 table 中。工作流程如下:
第一次创建架构,例如
volatilitysurface
table:volatilitysurface::([date:`datetime$(); ccypair:`symbol$()] atm_convention:`symbol$(); premium_included:`boolean$(); smile_type:`symbol$(); vs_type:`symbol$(); delta_ratio:`float$(); delta_setting:`float$(); wing_extrapolation:`float$(); spread_type:`symbol$());
对于原始数据文件夹中的每个文件导入它:
myfiles:@[system;"dir /b /o:gn ",string `$getenv[`KDBRAWDATA],"*.volatilitysurface.csv 2> nul";()]; if[myfiles~();.lg.o[`load;"no volatilitysurface files found!"];:0N]; .lg.o[`load;"loading data files ..."]; / load each file { mypath:"" sv (string `$getenv[`KDBRAWDATA];x); .lg.o[`load;"loading file name '",mypath,"' ..."]; myfile:hsym`$mypath; tmp1:select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from ("ZSSSSSFFFS";enlist ",")0:myfile; `volatilitysurface upsert tmp1; } @/: myfiles; delete tmp1 from `.; .Q.gc[]; .lg.o[`done;"loading volatilitysurface data done"]; .lg.o[`save;"saving volatilitysurface schema to ",string afolder]; volatilitysurface::0!volatilitysurface; .Q.dpft[afolder;`;`ccypair;`volatilitysurface]; .lg.o[`cleanup;"removing volatilitysurface from memory"]; delete volatilitysurface from `.; .Q.gc[]; .lg.o[`done;"saving volatilitysurface schema done"];
这非常有效。我经常使用 .Q.gc[];
以避免碰到 wsfull
。当新的 CSV 文件可用时,我打开现有的模式,将其插入并再次保存,有效地覆盖现有的 HDB 文件系统。
打开架构:
.lg.o[`open;"tables already exists, opening the schema ..."]; @[system;"l ",(string afolder) _ 0;{.lg.e[`open;"failed to load hdb directory: ", x]; 'x}]; / Re-create table index volatilitysurface::`date`ccypair xkey select from volatilitysurface;
Re-运行 步骤 #2 将新的 CSV 文件附加到现有的
volatilitysurface
table,它完美地更新了第一个 CSV 但第二个 CSV 失败与:error: `cast
我调试到错误点并仔细检查我发现 tmp1
和 volatilitysurface
的元数据完全相同。任何想法为什么会这样?我对任何其他 table 也有同样的问题。我尝试在每次更新后从 table 中清除密钥,但没有帮助,即
volatilitysurface::0!volatilitysurface;
volatilitysurface::`date`ccypair xkey volatilitysurface;
以及转换错误点的元数据比较:
meta tmp1
c | t f a
------------------| -----
date | z
ccypair | s
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
meta volatilitysurface
c | t f a
------------------| -----
date | z
ccypair | s p
atm_convention | s
premium_included | b
smile_type | s
vs_type | s
delta_ratio | f
delta_setting | f
wing_extrapolation| f
spread_type | s
UPDATE 使用下面的答案输入我尝试使用 Torq 的 .loader.loadallfiles
函数(它没有失败但也没有任何反应,table 没有在内存中创建,数据没有写入数据库):
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;`:hdb; {[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); `:rawdata]
UDPATE2 这是我从 TorQ 得到的输出:
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|**** LOADING :rawdata/20171102_113420.disccurve.csv ****
2017.11.20D08:46:12.550618000|wsp18497wn|dataloader|dataloader1|INF|dataloader|reading in data chunk
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Read 10000 rows
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|processing data
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|Enumerating
2017.11.20D08:46:12.566218000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4525 rows to :hdb/2017.09.12/volatilitysurface/
2017.11.20D08:46:12.581819000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 4744 rows to :hdb/2017.09.13/volatilitysurface/
2017.11.20D08:46:12.659823000|wsp18497wn|dataloader|dataloader1|INF|dataloader|writing 731 rows to :hdb/2017.09.14/volatilitysurface/
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|init|retrieving sort settings from :C:/Dev/torq//config/sort.csv
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sort|sorting the volatilitysurface table
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
2017.11.20D08:46:12.737827000|wsp18497wn|dataloader|dataloader1|INF|sortfunction|sorting :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time
2017.11.20D08:46:12.753428000|wsp18497wn|dataloader|dataloader1|ERR|sortfunction|failed to sort :hdb/2017.09.05/volatilitysurface/ by these columns : sym, time. The error was: hdb/2017.09.
我收到以下错误 sorttab|No sort parameters have been specified for : volatilitysurface. Using default parameters
这个 sorttab 记录在哪里?它默认使用 table PK 吗?
UPDATE3 好的,通过在我的 config
文件夹下提供非默认 sort.csv
修复了 UPDATE2:
tabname,att,column,sort
default,p,sym,1
default,,time,1
volatilitysurface,,date,1
volatilitysurface,,ccypair,1
但现在我发现,如果我对同一个文件多次调用该函数,它只会附加重复的数据,而不是 upsert
ing 它。
UPDATE4 还没有...假设我可以检查以确保没有使用重复文件。当我加载然后启动数据库时,我得到了一些类似于某种字典的结构,而不是 table.
2017.10.31| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.01| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.02| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
2017.11.03| (,`volatilitysurface)!,+`date`ccypair`atm_convention`premium_incl..
sym | `AUDNOK`AUDCNH`AUDJPY`AUDHKD`AUDCHF`AUDSGD`AUDCAD`AUDDKK`CADSGD`C..
请注意,日期实际上是日期时间 Z 而不仅仅是日期。我的函数调用的完整和最新版本是:
target:hsym `$("" sv ("./";getenv[`KDBHDB];"/volatilitysurface"));
rawdatadir:hsym `$getenv[`KDBRAWDATA];
.loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(`x`ccypair`atm_convention`premium_included`smile_type`vs_type`delta_ratio`delta_setting`wing_extrapolation`spread_type;"ZSSSSSFFFS";enlist ",";`volatilitysurface;target;`date;{[p;t] select date,ccypair,atm_convention,premium_included,smile_type,vs_type,delta_ratio,delta_setting,wing_extrapolation,spread_type from update date:x, premium_included:?[premium_included = `$"true";1b;0b] from t}); rawdatadir];
`转换错误是指未被枚举的值
我在这里看不到任何枚举,磁盘上的展开表需要枚举符号列。例如,这可以在调用 .Q.dpft
之前使用以下行来完成volatilitysurface:.Q.en[afolder;volatilitysurface];
您可以考虑使用示例 CSV 加载程序来加载数据。一个这样的例子包含在 TorQ 中,这是由 AquaQ Analytics 开发的 KDB 框架(作为免责声明,我在 AquaQ 工作)
框架可在此处(免费)获得:https://github.com/AquaQAnalytics/TorQ
您可能感兴趣的特定组件是 dataloader.q 并在此处记录:http://aquaqanalytics.github.io/TorQ/utilities/#dataloaderq
此脚本将处理所有必要的事情,加载所有文件、枚举、在磁盘上排序、应用属性等,以及使用 .Q.fsn 防止 运行 内存不足
我将在此处添加第二个答案以尝试解决有关使用 TorQ 的数据加载器的问题。
我想澄清一下您在 运行 执行此函数后得到的输出是什么?应该有一些日志消息输出,你能post这些吗?例如,当我 运行 函数时:
jmcmurray@homer ~/deploy/TorQ (master) $ q torq.q -procname loader -proctype loader -debug
<torq startup messages removed>
q).loader.loadallfiles[`headers`types`separator`tablename`dbdir`partitioncol`dataprocessfunc!(c;"TSSFJFFJJBS";enlist",";`quotes;`:testdb;`date;{[p;t] select date:.z.d,time:TIME,sym:INSTRUMENT,BID,ASK from t});`:csvtest]
2017.11.17D15:03:20.312336000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140421.csv ****
2017.11.17D15:03:20.319110000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.339414000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.339463000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.339519000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.340061000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.341669000|homer.aquaq.co.uk|loader|loader|INF|dataloader|**** LOADING :csvtest/tradesandquotes20140422.csv ****
2017.11.17D15:03:20.349606000|homer.aquaq.co.uk|loader|loader|INF|dataloader|reading in data chunk
2017.11.17D15:03:20.370793000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Read 11000 rows
2017.11.17D15:03:20.370858000|homer.aquaq.co.uk|loader|loader|INF|dataloader|processing data
2017.11.17D15:03:20.370911000|homer.aquaq.co.uk|loader|loader|INF|dataloader|Enumerating
2017.11.17D15:03:20.371441000|homer.aquaq.co.uk|loader|loader|INF|dataloader|writing 11000 rows to :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.460118000|homer.aquaq.co.uk|loader|loader|INF|init|retrieving sort settings from :/home/jmcmurray/deploy/TorQ/config/sort.csv
2017.11.17D15:03:20.466690000|homer.aquaq.co.uk|loader|loader|INF|sort|sorting the quotes table
2017.11.17D15:03:20.466763000|homer.aquaq.co.uk|loader|loader|INF|sorttab|No sort parameters have been specified for : quotes. Using default parameters
2017.11.17D15:03:20.466820000|homer.aquaq.co.uk|loader|loader|INF|sortfunction|sorting :testdb/2017.11.17/quotes/ by these columns : sym, time
2017.11.17D15:03:20.527216000|homer.aquaq.co.uk|loader|loader|INF|applyattr|applying p attr to the sym column in :testdb/2017.11.17/quotes/
2017.11.17D15:03:20.535095000|homer.aquaq.co.uk|loader|loader|INF|sort|finished sorting the quotes table
完成这一切后,我可以 运行 \l testdb
并且有一个名为 "quotes" 的 table 包含我加载的数据
如果您可以 post 记录这样的消息,可能有助于了解发生了什么。
更新
"But now I see that if I call the function multiple times on the same files, it simply appends duplicated data instead of upserting it."
如果我对问题的理解正确,听起来您可能不应该对同一个文件多次调用该函数。 TorQ 中的另一个进程在这里可能会有用,“file alerter”。此进程将监视新文件和更新文件的目录,并可以对出现的任何文件调用一个函数(因此您可以让它自动为每个新文件调用加载程序函数)。它有许多选项,例如处理后移动文件(这样你就可以 "archive" 加载 CSV)
请注意,文件警报器要求函数恰好采用两个参数 - 目录和文件名。这实际上意味着您将需要围绕加载程序函数的 "wrapper" 函数,它需要一个字典和一个目录。我认为 TorQ 不包含类似于 .loader.loadallfiles 的单个文件功能,因此可能需要将目标文件复制到临时目录, 运行 在该目录上加载所有文件,然后删除在加载下一个文件之前从那里下载文件。